IGNITE-9607: Service Grid redesign - Phase 1 - Fixes #4434.
authorVyacheslav Daradur <daradurvs@gmail.com>
Wed, 26 Dec 2018 20:17:30 +0000 (23:17 +0300)
committerNikolay Izhikov <nizhikov@apache.org>
Wed, 26 Dec 2018 20:17:30 +0000 (23:17 +0300)
Signed-off-by: Nikolay Izhikov <nizhikov@apache.org>
83 files changed:
modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignments.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignmentsKey.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeployment.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentKey.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/PreparedConfigurations.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceChangeAbstractRequest.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceChangeBatchRequest.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResultBatch.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessId.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentRequest.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDescriptorImpl.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorCommonDiscoveryData.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorJoinNodeDiscoveryData.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResultBatch.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceUndeploymentRequest.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/services/ServiceConfiguration.java
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
modules/core/src/test/java/org/apache/ignite/internal/GridDeploymentSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeployTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFutureSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentExceptionPropagationTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorBatchDeploySelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeConfigSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorSingleNodeSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceReassignmentSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentDiscoveryListenerNotificationOrderTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentNonSerializableStaticConfigurationTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnActivationTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnClientDisconnectTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOutsideBaselineTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessIdSelfTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessingOnCoordinatorFailTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessingOnCoordinatorLeftTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessingOnNodesFailTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessingOnNodesLeftTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceInfoSelfTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServicePredicateAccessCacheTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceReassignmentFunctionSelfTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/service/inner/LongInitializedTestService.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceConfigVariationsFullApiTestSuite.java
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs

index 4cfb361..f58f1aa 100644 (file)
@@ -1059,6 +1059,19 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE = "IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE";
 
     /**
+     * Manages the type of the implementation of the service processor (implementation of the {@link IgniteServices}).
+     * All nodes in the cluster must have the same value of this property.
+     * <p/>
+     * If the property is {@code true} then event-driven implementation of the service processor will be used.
+     * <p/>
+     * If the property is {@code false} then internal cache based implementation of service processor will be used.
+     * <p/>
+     * Default is {@code true}.
+     */
+    public static final String IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED
+        = "IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED";
+
+    /**
      * When set to {@code true}, cache metrics are not included into the discovery metrics update message (in this
      * case message contains only cluster metrics). By default cache metrics are included into the message and
      * calculated each time the message is sent.
index 607217e..1f9e02e 100644 (file)
@@ -70,7 +70,10 @@ public interface GridComponent {
         CACHE_CRD_PROC,
 
         /** Encryption manager. */
-        ENCRYPTION_MGR
+        ENCRYPTION_MGR,
+
+        /** Service processor. */
+        SERVICE_PROC
     }
 
     /**
index fdb8ebc..691fe37 100644 (file)
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
 import org.apache.ignite.internal.stat.IoStatisticsManager;
 import org.apache.ignite.internal.processors.compress.CompressionProcessor;
+import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter;
 import org.apache.ignite.internal.worker.WorkersRegistry;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
@@ -67,7 +68,6 @@ import org.apache.ignite.internal.processors.rest.GridRestProcessor;
 import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter;
 import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
 import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
-import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
@@ -224,7 +224,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
      *
      * @return Service processor.
      */
-    public GridServiceProcessor service();
+    public ServiceProcessorAdapter service();
 
     /**
      * Gets port processor.
index 8131899..1219d00 100644 (file)
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.failover.GridFailoverManager;
 import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
 import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
+import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
 import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
@@ -84,7 +85,6 @@ import org.apache.ignite.internal.processors.rest.GridRestProcessor;
 import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter;
 import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
 import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
-import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
@@ -208,7 +208,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringInclude
-    private GridServiceProcessor svcProc;
+    private ServiceProcessorAdapter srvcProc;
 
     /** */
     @GridToStringInclude
@@ -608,8 +608,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
             portProc = (GridPortProcessor)comp;
         else if (comp instanceof GridClosureProcessor)
             closProc = (GridClosureProcessor)comp;
-        else if (comp instanceof GridServiceProcessor)
-            svcProc = (GridServiceProcessor)comp;
+        else if (comp instanceof ServiceProcessorAdapter)
+            srvcProc = (ServiceProcessorAdapter)comp;
         else if (comp instanceof IgniteScheduleProcessorAdapter)
             scheduleProc = (IgniteScheduleProcessorAdapter)comp;
         else if (comp instanceof GridSegmentationProcessor)
@@ -762,8 +762,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public GridServiceProcessor service() {
-        return svcProc;
+    @Override public ServiceProcessorAdapter service() {
+        return srvcProc;
     }
 
     /** {@inheritDoc} */
index 95d7717..437ee4d 100644 (file)
@@ -136,7 +136,10 @@ public enum GridTopic {
     TOPIC_CACHE_COORDINATOR,
 
     /** */
-    TOPIC_GEN_ENC_KEY;
+    TOPIC_GEN_ENC_KEY,
+
+    /** */
+    TOPIC_SERVICES;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();
index 95bf49c..b84771a 100644 (file)
@@ -116,6 +116,7 @@ import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
 import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
 import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.internal.processors.GridProcessor;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
@@ -163,6 +164,7 @@ import org.apache.ignite.internal.processors.rest.GridRestProcessor;
 import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
 import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
 import org.apache.ignite.internal.processors.service.GridServiceProcessor;
+import org.apache.ignite.internal.processors.service.IgniteServiceProcessor;
 import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
@@ -224,6 +226,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_ASCII;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_REST_START_ON_CLIENT;
@@ -251,6 +254,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
@@ -1016,7 +1020,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 startProcessor(new GridCacheProcessor(ctx));
                 startProcessor(new GridQueryProcessor(ctx));
                 startProcessor(new ClientListenerProcessor(ctx));
-                startProcessor(new GridServiceProcessor(ctx));
+                startProcessor(createServiceProcessor());
                 startProcessor(new GridTaskSessionProcessor(ctx));
                 startProcessor(new GridJobProcessor(ctx));
                 startProcessor(new GridTaskProcessor(ctx));
@@ -1357,6 +1361,20 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /**
+     * Creates service processor depend on {@link IgniteSystemProperties#IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED}.
+     *
+     * @return Service processor.
+     */
+    private GridProcessorAdapter createServiceProcessor() {
+        final boolean srvcProcMode = getBoolean(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, true);
+
+        if (srvcProcMode)
+            return new IgniteServiceProcessor(ctx);
+
+        return new GridServiceProcessor(ctx);
+    }
+
+    /**
      * Validates common configuration parameters.
      *
      * @param cfg Configuration.
@@ -1643,6 +1661,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 ctx.addNodeAttribute(e.getKey(), e.getValue());
             }
         }
+
+        ctx.addNodeAttribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED,
+            ctx.service() instanceof IgniteServiceProcessor);
     }
 
     /**
index 45ca234..1740072 100644 (file)
@@ -208,6 +208,10 @@ public final class IgniteNodeAttributes {
     /** Supported features. */
     public static final String ATTR_IGNITE_FEATURES = ATTR_PREFIX + ".features";
 
+    /** Ignite services processor mode. */
+    public static final String ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED = ATTR_PREFIX +
+        ".event.driven.service.processor.enabled";
+
     /**
      * Enforces singleton.
      */
index b1c023a..5e7811b 100644 (file)
@@ -184,6 +184,9 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
 import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
 import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest;
 import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse;
+import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResult;
+import org.apache.ignite.internal.processors.service.ServiceDeploymentProcessId;
+import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResultBatch;
 import org.apache.ignite.internal.util.GridByteArrayList;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.GridLongList;
@@ -1120,6 +1123,21 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 167:
+                msg = new ServiceDeploymentProcessId();
+
+                break;
+
+            case 168:
+                msg = new ServiceSingleNodeDeploymentResultBatch();
+
+                break;
+
+            case 169:
+                msg = new ServiceSingleNodeDeploymentResult();
+
+                break;
+
             // [-3..119] [124..129] [-23..-28] [-36..-55] - this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
index 78319a0..556af19 100644 (file)
@@ -143,6 +143,7 @@ import org.jetbrains.annotations.Nullable;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SECURITY_COMPATIBILITY_MODE;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -157,6 +158,7 @@ import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_REGIONS_OFFHEAP_SIZE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2;
@@ -789,6 +791,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                         ctx.cache().context().exchange().onLocalJoin(discoEvt, discoCache);
 
+                        ctx.service().onLocalJoin(discoEvt, discoCache);
+
                         ctx.authentication().onLocalJoin();
 
                         ctx.encryption().onLocalJoin();
@@ -847,6 +851,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                     ctx.cache().context().exchange().onLocalJoin(localJoinEvent(), discoCache);
 
+                    ctx.service().onLocalJoin(localJoinEvent(), discoCache);
+
                     ctx.cluster().clientReconnectFuture().listen(new CI1<IgniteFuture<?>>() {
                         @Override public void apply(IgniteFuture<?> fut) {
                             try {
@@ -1223,6 +1229,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
         boolean locDelayAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
 
+        Boolean locSrvcProcMode = locNode.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED);
         Boolean locSecurityCompatibilityEnabled = locNode.attribute(ATTR_SECURITY_COMPATIBILITY_MODE);
 
         for (ClusterNode n : nodes) {
@@ -1308,6 +1315,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     ", rmtAddrs=" + U.addressesAsString(n) + ", rmtNode=" + U.toShortString(n) + "]");
             }
 
+            Boolean rmtSrvcProcModeAttr = n.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED);
+
+            final boolean rmtSrvcProcMode = rmtSrvcProcModeAttr != null ? rmtSrvcProcModeAttr : false;
+
+            if (!F.eq(locSrvcProcMode, rmtSrvcProcMode)) {
+                throw new IgniteCheckedException("Local node's " + IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED +
+                    " property value differs from remote node's value " +
+                    "(to make sure all nodes in topology have identical service processor mode, " +
+                    "configure system property explicitly) " +
+                    "[locSrvcProcMode=" + locSrvcProcMode +
+                    ", rmtSrvcProcMode=" + rmtSrvcProcMode +
+                    ", locNodeAddrs=" + U.addressesAsString(locNode) +
+                    ", rmtNodeAddrs=" + U.addressesAsString(n) +
+                    ", locNodeId=" + locNode.id() + ", rmtNode=" + U.toShortString(n) + "]");
+            }
+
             if (n.version().compareToIgnoreTimestamp(SERVICE_PERMISSIONS_SINCE) >= 0
                 && ctx.security().enabled() // Matters only if security enabled.
                ) {
index d85e29b..9dc1195 100644 (file)
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.service.ServiceDeploymentActions;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -52,6 +54,10 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     /** Restarting caches. */
     private Set<String> restartingCaches;
 
+    /** Affinity (cache related) services updates to be processed on services deployment process. */
+    @GridToStringExclude
+    @Nullable private transient ServiceDeploymentActions serviceDeploymentActions;
+
     /**
      * @param reqs Requests.
      */
@@ -118,6 +124,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     }
 
     /**
+     * @return Services deployment actions to be processed on services deployment process.
+     */
+    @Nullable public ServiceDeploymentActions servicesDeploymentActions() {
+        return serviceDeploymentActions;
+    }
+
+    /**
+     * @param serviceDeploymentActions Services deployment actions to be processed on services deployment process.
+     */
+    public void servicesDeploymentActions(ServiceDeploymentActions serviceDeploymentActions) {
+        this.serviceDeploymentActions = serviceDeploymentActions;
+    }
+
+    /**
      * @return {@code True} if required to start all caches on client node.
      */
     public boolean startCaches() {
index b919024..1c44eaf 100644 (file)
@@ -137,6 +137,7 @@ import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchang
 import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
 import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.util.F0;
@@ -1019,7 +1020,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (!active)
             return;
 
-        ctx.service().onUtilityCacheStarted();
+        if (ctx.service() instanceof GridServiceProcessor)
+            ((GridServiceProcessor)ctx.service()).onUtilityCacheStarted();
 
         final AffinityTopologyVersion startTopVer = ctx.discovery().localJoin().joinTopologyVersion();
 
@@ -3047,7 +3049,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (exchActions.systemCachesStarting() && exchActions.stateChangeRequest() == null) {
             ctx.dataStructures().restoreStructuresState(ctx);
 
-            ctx.service().updateUtilityCache();
+            if (ctx.service() instanceof GridServiceProcessor)
+                ((GridServiceProcessor)ctx.service()).updateUtilityCache();
         }
 
         if (err == null)
index dbc51f7..d4d89bc 100644 (file)
@@ -99,6 +99,7 @@ import org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.TimeBag;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -1136,7 +1137,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 try {
                     cctx.kernalContext().dataStructures().onDeActivate(cctx.kernalContext());
 
-                    cctx.kernalContext().service().onDeActivate(cctx.kernalContext());
+                    if (cctx.kernalContext().service() instanceof GridServiceProcessor)
+                        ((GridServiceProcessor)cctx.kernalContext().service()).onDeActivate(cctx.kernalContext());
 
                     assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started.";
 
index 51a65bb..b1ff048 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.service.ServiceDeploymentActions;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -65,6 +66,10 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
     @GridToStringExclude
     private transient ExchangeActions exchangeActions;
 
+    /** Services deployment actions to be processed on services deployment process. */
+    @GridToStringExclude
+    @Nullable private transient ServiceDeploymentActions serviceDeploymentActions;
+
     /**
      * @param reqId State change request ID.
      * @param initiatingNodeId Node initiated state change.
@@ -117,6 +122,20 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
         this.exchangeActions = exchangeActions;
     }
 
+    /**
+     * @return Services deployment actions to be processed on services deployment process.
+     */
+    @Nullable public ServiceDeploymentActions servicesDeploymentActions() {
+        return serviceDeploymentActions;
+    }
+
+    /**
+     * @param serviceDeploymentActions Services deployment actions to be processed on services deployment process.
+     */
+    public void servicesDeploymentActions(ServiceDeploymentActions serviceDeploymentActions) {
+        this.serviceDeploymentActions = serviceDeploymentActions;
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteUuid id() {
         return id;
index 9d2adae..b347d39 100644 (file)
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -1165,9 +1166,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
                 Exception e = null;
 
                 try {
-                    ctx.service().onUtilityCacheStarted();
+                    if (ctx.service() instanceof GridServiceProcessor) {
+                        GridServiceProcessor srvcProc = (GridServiceProcessor)ctx.service();
 
-                    ctx.service().onActivate(ctx);
+                        srvcProc.onUtilityCacheStarted();
+
+                        srvcProc.onActivate(ctx);
+                    }
 
                     ctx.dataStructures().onActivate(ctx);
 
index 22d6997..2456efa 100644 (file)
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
+import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -295,7 +296,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         ctx.cacheObjects().onContinuousProcessorStarted(ctx);
 
-        ctx.service().onContinuousProcessorStarted(ctx);
+        if (ctx.service() instanceof GridServiceProcessor)
+            ((GridServiceProcessor)ctx.service()).onContinuousProcessorStarted(ctx);
 
         if (log.isDebugEnabled())
             log.debug("Continuous processor started.");
index 66b2816..290fbf4 100644 (file)
@@ -30,7 +30,10 @@ import org.apache.ignite.services.ServiceConfiguration;
 
 /**
  * Service per-node assignment.
+ *
+ * @deprecated Services internals use messages for deployment management instead of the utility cache, since Ignite 2.8.
  */
+@Deprecated
 public class GridServiceAssignments implements Serializable, GridCacheInternal {
     /** Serialization version. */
     private static final long serialVersionUID = 0L;
index 5b26f9f..87cb0ba 100644 (file)
@@ -22,7 +22,10 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Service configuration key.
+ *
+ * @deprecated Services internals use messages for deployment management instead of the utility cache, since Ignite 2.8.
  */
+@Deprecated
 public class GridServiceAssignmentsKey extends GridCacheUtilityKey<GridServiceAssignmentsKey> {
     /** */
     private static final long serialVersionUID = 0L;
index f25c38e..ae8670f 100644 (file)
@@ -25,7 +25,10 @@ import org.apache.ignite.services.ServiceConfiguration;
 
 /**
  * Service deployment.
+ *
+ * @deprecated Services internals use messages for deployment management instead of the utility cache, since Ignite 2.8.
  */
+@Deprecated
 public class GridServiceDeployment implements GridCacheInternal, Serializable {
     /** */
     private static final long serialVersionUID = 0L;
index 45ccc24..f776285 100644 (file)
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.service;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,9 +34,9 @@ import org.jetbrains.annotations.Nullable;
  * IgniteInternalFuture#get get()} method after all futures complete or fail. Inner exception will contain
  * configurations of failed services.
  */
-public class GridServiceDeploymentCompoundFuture extends GridCompoundFuture<Object, Object> {
-    /** Names of services written to cache during current deployment. */
-    private Collection<String> svcsToRollback;
+public class GridServiceDeploymentCompoundFuture<T extends Serializable> extends GridCompoundFuture<Object, Object> {
+    /** Ids of services written to cache during current deployment. */
+    private Collection<T> svcsToRollback;
 
     /** */
     private volatile ServiceDeploymentException err;
@@ -71,21 +72,21 @@ public class GridServiceDeploymentCompoundFuture extends GridCompoundFuture<Obje
      * @param fut Child future.
      * @param own If {@code true}, then corresponding service will be cancelled on failure.
      */
-    public void add(GridServiceDeploymentFuture fut, boolean own) {
+    public void add(GridServiceDeploymentFuture<T> fut, boolean own) {
         super.add(fut);
 
         if (own) {
             if (svcsToRollback == null)
                 svcsToRollback = new ArrayList<>();
 
-            svcsToRollback.add(fut.configuration().getName());
+            svcsToRollback.add(fut.serviceId());
         }
     }
 
     /**
-     * @return Collection of names of services that were written to cache during current deployment.
+     * @return Collection of ids of services that were written to cache during current deployment.
      */
-    public Collection<String> servicesToRollback() {
+    public Collection<T> servicesToRollback() {
         if (svcsToRollback != null)
             return svcsToRollback;
         else
index c87cc6e..ba4422e 100644 (file)
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.service;
 
+import java.io.Serializable;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.services.ServiceConfiguration;
@@ -24,15 +25,27 @@ import org.apache.ignite.services.ServiceConfiguration;
 /**
  * Service deployment future.
  */
-public class GridServiceDeploymentFuture extends GridFutureAdapter<Object> {
+public class GridServiceDeploymentFuture<T extends Serializable> extends GridFutureAdapter<Object> {
     /** */
     private final ServiceConfiguration cfg;
 
+    /** */
+    private final T srvcId;
+
     /**
      * @param cfg Configuration.
      */
     public GridServiceDeploymentFuture(ServiceConfiguration cfg) {
+        this(cfg, null);
+    }
+
+    /**
+     * @param cfg Configuration.
+     * @param srvcId Service id.
+     */
+    public GridServiceDeploymentFuture(ServiceConfiguration cfg, T srvcId) {
         this.cfg = cfg;
+        this.srvcId = srvcId;
     }
 
     /**
@@ -42,8 +55,15 @@ public class GridServiceDeploymentFuture extends GridFutureAdapter<Object> {
         return cfg;
     }
 
+    /**
+     * @return Service id.
+     */
+    public T serviceId() {
+        return srvcId;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridServiceDeploymentFuture.class, this);
     }
-}
\ No newline at end of file
+}
index 80415bb..e5b85dc 100644 (file)
@@ -22,7 +22,10 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Service configuration key.
+ *
+ * @deprecated Services internals use messages for deployment management instead of the utility cache, since Ignite 2.8.
  */
+@Deprecated
 public class GridServiceDeploymentKey extends GridCacheUtilityKey<GridServiceDeploymentKey> {
     /** */
     private static final long serialVersionUID = 0L;
index 3adc282..8fcbe4a 100644 (file)
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
-import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
@@ -118,9 +117,18 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
 
 /**
  * Grid service processor.
+ * <p/>
+ * Obsolete implementation of service processor, based on replicated system cache.
+ * <p/>
+ * NOTE: if you fix a bug in this class, please take a look in {@link IgniteServiceProcessor}, perhaps the class
+ * contains a similar block of code which also should be updated.
+ *
+ * @see IgniteServiceProcessor
+ * @deprecated Here is improved, but uncompatible implementation {@link IgniteServiceProcessor}, see IEP-17 for details.
  */
+@Deprecated
 @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions"})
-public class GridServiceProcessor extends GridProcessorAdapter implements IgniteChangeGlobalStateSupport {
+public class GridServiceProcessor extends ServiceProcessorAdapter implements IgniteChangeGlobalStateSupport {
     /** Time to wait before reassignment retries. */
     private static final long RETRY_TIMEOUT = 1000;
 
@@ -136,7 +144,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
     private final Map<String, Collection<ServiceContextImpl>> locSvcs = new HashMap<>();
 
     /** Deployment futures. */
-    private final ConcurrentMap<String, GridServiceDeploymentFuture> depFuts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, GridServiceDeploymentFuture<String>> depFuts = new ConcurrentHashMap<>();
 
     /** Deployment futures. */
     private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new ConcurrentHashMap<>();
@@ -456,58 +464,38 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                 throw new IgniteException("Service configuration check failed (" + desc + ")");
     }
 
-    /**
-     * @param name Service name.
-     * @param svc Service.
-     * @return Future.
-     */
-    public IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service svc) {
-        return deployMultiple(prj, name, svc, 0, 1);
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service srvc) {
+        return deployMultiple(prj, name, srvc, 0, 1);
     }
 
-    /**
-     * @param name Service name.
-     * @param svc Service.
-     * @return Future.
-     */
-    public IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service svc) {
-        return deployMultiple(prj, name, svc, 1, 1);
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service srvc) {
+        return deployMultiple(prj, name, srvc, 1, 1);
     }
 
-    /**
-     * @param name Service name.
-     * @param svc Service.
-     * @param totalCnt Total count.
-     * @param maxPerNodeCnt Max per-node count.
-     * @return Future.
-     */
-    public IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service svc, int totalCnt,
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service srvc, int totalCnt,
         int maxPerNodeCnt) {
         ServiceConfiguration cfg = new ServiceConfiguration();
 
         cfg.setName(name);
-        cfg.setService(svc);
+        cfg.setService(srvc);
         cfg.setTotalCount(totalCnt);
         cfg.setMaxPerNodeCount(maxPerNodeCnt);
 
         return deployAll(prj, Collections.singleton(cfg));
     }
 
-    /**
-     * @param name Service name.
-     * @param svc Service.
-     * @param cacheName Cache name.
-     * @param affKey Affinity key.
-     * @return Future.
-     */
-    public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service svc, String cacheName,
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service srvc, String cacheName,
         Object affKey) {
         A.notNull(affKey, "affKey");
 
         ServiceConfiguration cfg = new ServiceConfiguration();
 
         cfg.setName(name);
-        cfg.setService(svc);
+        cfg.setService(srvc);
         cfg.setCacheName(cacheName);
         cfg.setAffinityKey(affKey);
         cfg.setTotalCount(1);
@@ -522,13 +510,13 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
      * @param dfltNodeFilter Default NodeFilter.
      * @return Configurations to deploy.
      */
-    private PreparedConfigurations prepareServiceConfigurations(Collection<ServiceConfiguration> cfgs,
+    private PreparedConfigurations<String> prepareServiceConfigurations(Collection<ServiceConfiguration> cfgs,
         IgnitePredicate<ClusterNode> dfltNodeFilter) {
         List<ServiceConfiguration> cfgsCp = new ArrayList<>(cfgs.size());
 
         Marshaller marsh = ctx.config().getMarshaller();
 
-        List<GridServiceDeploymentFuture> failedFuts = null;
+        List<GridServiceDeploymentFuture<String>> failedFuts = null;
 
         for (ServiceConfiguration cfg : cfgs) {
             Exception err = null;
@@ -578,7 +566,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                 if (failedFuts == null)
                     failedFuts = new ArrayList<>();
 
-                GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(cfg);
+                GridServiceDeploymentFuture<String> fut = new GridServiceDeploymentFuture<>(cfg);
 
                 fut.onDone(err);
 
@@ -586,15 +574,11 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             }
         }
 
-        return new PreparedConfigurations(cfgsCp, failedFuts);
+        return new PreparedConfigurations<>(cfgsCp, failedFuts);
     }
 
-    /**
-     * @param prj Grid projection.
-     * @param cfgs Service configurations.
-     * @return Future for deployment.
-     */
-    public IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs) {
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs) {
         if (prj == null)
             // Deploy to servers by default if no projection specified.
             return deployAll(cfgs,  ctx.cluster().get().forServers().predicate());
@@ -614,11 +598,11 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         @Nullable IgnitePredicate<ClusterNode> dfltNodeFilter) {
         assert cfgs != null;
 
-        PreparedConfigurations srvCfg = prepareServiceConfigurations(cfgs, dfltNodeFilter);
+        PreparedConfigurations<String> srvCfg = prepareServiceConfigurations(cfgs, dfltNodeFilter);
 
         List<ServiceConfiguration> cfgsCp = srvCfg.cfgs;
 
-        List<GridServiceDeploymentFuture> failedFuts = srvCfg.failedFuts;
+        List<GridServiceDeploymentFuture<String>> failedFuts = srvCfg.failedFuts;
 
         Collections.sort(cfgsCp, new Comparator<ServiceConfiguration>() {
             @Override public int compare(ServiceConfiguration cfg1, ServiceConfiguration cfg2) {
@@ -626,10 +610,10 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             }
         });
 
-        GridServiceDeploymentCompoundFuture res;
+        GridServiceDeploymentCompoundFuture<String> res;
 
         while (true) {
-            res = new GridServiceDeploymentCompoundFuture();
+            res = new GridServiceDeploymentCompoundFuture<>();
 
             if (ctx.deploy().enabled())
                 ctx.cache().context().deploy().ignoreOwnership(true);
@@ -684,7 +668,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                     "Failed to deploy services, client node disconnected: " + cfgs);
 
             for (String name : res.servicesToRollback()) {
-                GridServiceDeploymentFuture fut = depFuts.remove(name);
+                GridServiceDeploymentFuture<String> fut = depFuts.remove(name);
 
                 if (fut != null)
                     fut.onDone(err);
@@ -694,7 +678,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         }
 
         if (failedFuts != null) {
-            for (GridServiceDeploymentFuture fut : failedFuts)
+            for (GridServiceDeploymentFuture<String> fut : failedFuts)
                 res.add(fut, false);
         }
 
@@ -708,13 +692,13 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
      * @param cfg Service configuration.
      * @throws IgniteCheckedException If operation failed.
      */
-    private void writeServiceToCache(GridServiceDeploymentCompoundFuture res, ServiceConfiguration cfg)
+    private void writeServiceToCache(GridServiceDeploymentCompoundFuture<String> res, ServiceConfiguration cfg)
         throws IgniteCheckedException {
         String name = cfg.getName();
 
-        GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(cfg);
+        GridServiceDeploymentFuture<String> fut = new GridServiceDeploymentFuture<>(cfg);
 
-        GridServiceDeploymentFuture old = depFuts.putIfAbsent(name, fut);
+        GridServiceDeploymentFuture<String> old = depFuts.putIfAbsent(name, fut);
 
         try {
             if (old != null) {
@@ -772,11 +756,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         }
     }
 
-    /**
-     * @param name Service name.
-     * @return Future.
-     */
-    public IgniteInternalFuture<?> cancel(String name) {
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> cancel(String name) {
         while (true) {
             try {
                 return removeServiceFromCache(name).fut;
@@ -795,10 +776,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         }
     }
 
-    /**
-     * @return Future.
-     */
-    public IgniteInternalFuture<?> cancelAll() {
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> cancelAll() {
         Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE);
 
         List<String> svcNames = new ArrayList<>();
@@ -812,13 +791,10 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         return cancelAll(svcNames);
     }
 
-    /**
-     * @param svcNames Name of service to deploy.
-     * @return Future.
-     */
+    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    public IgniteInternalFuture<?> cancelAll(Collection<String> svcNames) {
-        List<String> svcNamesCp = new ArrayList<>(svcNames);
+    @Override public IgniteInternalFuture<?> cancelAll(Collection<String> servicesNames) {
+        List<String> svcNamesCp = new ArrayList<>(servicesNames);
 
         Collections.sort(svcNamesCp);
 
@@ -830,7 +806,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             List<String> toRollback = new ArrayList<>();
 
             try (Transaction tx = serviceCache().txStart(PESSIMISTIC, READ_COMMITTED)) {
-                for (String name : svcNames) {
+                for (String name : servicesNames) {
                     if (res == null)
                         res = new GridCompoundFuture<>();
 
@@ -923,18 +899,13 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         }
     }
 
-    /**
-     * @param name Service name.
-     * @param timeout If greater than 0 limits task execution time. Cannot be negative.
-     * @return Service topology.
-     * @throws IgniteCheckedException On error.
-     */
-    public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException {
+    /** {@inheritDoc} */
+    @Override public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException {
         IgniteInternalCache<Object, Object> cache = serviceCache();
 
         ClusterNode node = cache.affinity().mapKeyToNode(name);
 
-        final ServiceTopologyCallable call = new ServiceTopologyCallable(name);
+        final ServiceTopologyCallable call = new ServiceTopologyCallable(name, pendingJobCtxs);
 
         return ctx.closure().callAsyncNoFailover(
             GridClosureCallMode.BROADCAST,
@@ -958,10 +929,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         return val != null ? val.assigns() : null;
     }
 
-    /**
-     * @return Collection of service descriptors.
-     */
-    public Collection<ServiceDescriptor> serviceDescriptors() {
+    /** {@inheritDoc} */
+    @Override public Collection<ServiceDescriptor> serviceDescriptors() {
         Collection<ServiceDescriptor> descs = new ArrayList<>();
 
         Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE);
@@ -992,12 +961,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         return descs;
     }
 
-    /**
-     * @param name Service name.
-     * @param <T> Service type.
-     * @return Service by specified service name.
-     */
-    public <T> T service(String name) {
+    /** {@inheritDoc} */
+    @Override public <T> T service(String name) {
         ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE, null);
 
         Collection<ServiceContextImpl> ctxs;
@@ -1024,11 +989,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         }
     }
 
-    /**
-     * @param name Service name.
-     * @return Service by specified service name.
-     */
-    public ServiceContextImpl serviceContext(String name) {
+    /** {@inheritDoc} */
+    @Override public ServiceContextImpl serviceContext(String name) {
         Collection<ServiceContextImpl> ctxs;
 
         synchronized (locSvcs) {
@@ -1051,17 +1013,9 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         }
     }
 
-    /**
-     * @param prj Grid projection.
-     * @param name Service name.
-     * @param svcItf Service class.
-     * @param sticky Whether multi-node request should be done.
-     * @param timeout If greater than 0 limits service acquire time. Cannot be negative.
-     * @param <T> Service interface type.
-     * @return The proxy of a service by its name and class.
-     * @throws IgniteException If failed to create proxy.
-     */
-    public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> svcItf, boolean sticky, long timeout)
+    /** {@inheritDoc} */
+    @Override public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> srvcCls, boolean sticky,
+        long timeout)
         throws IgniteException {
         ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE, null);
 
@@ -1072,16 +1026,16 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                 Service svc = ctx.service();
 
                 if (svc != null) {
-                    if (!svcItf.isAssignableFrom(svc.getClass()))
+                    if (!srvcCls.isAssignableFrom(svc.getClass()))
                         throw new IgniteException("Service does not implement specified interface [svcItf=" +
-                            svcItf.getName() + ", svcCls=" + svc.getClass().getName() + ']');
+                            srvcCls.getName() + ", svcCls=" + svc.getClass().getName() + ']');
 
                     return (T)svc;
                 }
             }
         }
 
-        return new GridServiceProxy<T>(prj, name, svcItf, sticky, timeout, ctx).proxy();
+        return new GridServiceProxy<T>(prj, name, srvcCls, sticky, timeout, ctx).proxy();
     }
 
     /**
@@ -1097,12 +1051,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         return false;
     }
 
-    /**
-     * @param name Service name.
-     * @param <T> Service type.
-     * @return Services by specified service name.
-     */
-    public <T> Collection<T> services(String name) {
+    /** {@inheritDoc} */
+    @Override public <T> Collection<T> services(String name) {
         ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE, null);
 
         Collection<ServiceContextImpl> ctxs;
@@ -1973,7 +1923,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                 t = th;
             }
 
-            GridServiceDeploymentFuture fut = depFuts.get(assigns.name());
+            GridServiceDeploymentFuture<String> fut = depFuts.get(assigns.name());
 
             if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) {
                 depFuts.remove(assigns.name(), fut);
@@ -2129,11 +2079,16 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         @LoggerResource
         private transient IgniteLogger log;
 
+        /** */
+        private final List<ComputeJobContext> pendingCtxs;
+
         /**
          * @param svcName Service name.
+         * @param pendingCtxs Pending compute job contexts that waiting for utility cache initialization.
          */
-        public ServiceTopologyCallable(String svcName) {
+        public ServiceTopologyCallable(String svcName, List<ComputeJobContext> pendingCtxs) {
             this.svcName = svcName;
+            this.pendingCtxs = pendingCtxs;
         }
 
         /** {@inheritDoc} */
@@ -2141,8 +2096,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             IgniteInternalCache<Object, Object> cache = ignite.context().cache().utilityCache();
 
             if (cache == null) {
-                List<ComputeJobContext> pendingCtxs = ignite.context().service().pendingJobCtxs;
-
                 synchronized (pendingCtxs) {
                     // Double check cache reference after lock acqusition.
                     cache = ignite.context().cache().utilityCache();
index 4d98187..aa6ce44 100644 (file)
@@ -197,19 +197,15 @@ public class GridServiceProxy<T> implements Serializable {
                             true).get();
                     }
                 }
-                catch (GridServiceNotFoundException | ClusterTopologyCheckedException e) {
-                    if (log.isDebugEnabled())
-                        log.debug("Service was not found or topology changed (will retry): " + e.getMessage());
-                }
                 catch (RuntimeException | Error e) {
                     throw e;
                 }
                 catch (IgniteCheckedException e) {
                     // Check if ignorable exceptions are in the cause chain.
-                    Throwable ignorableCause = X.cause(e, GridServiceNotFoundException.class);
+                    Throwable ignorableCause = X.cause(e, ClusterTopologyCheckedException.class);
 
-                    if (ignorableCause == null)
-                        ignorableCause = X.cause(e, ClusterTopologyCheckedException.class);
+                    if (ignorableCause == null && ctx.service() instanceof GridServiceProcessor)
+                        ignorableCause = X.cause(e, GridServiceNotFoundException.class);
 
                     if (ignorableCause != null) {
                         if (log.isDebugEnabled())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
new file mode 100644 (file)
index 0000000..1bc1fd7
--- /dev/null
@@ -0,0 +1,1784 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.SkipDaemon;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceDeploymentException;
+import org.apache.ignite.services.ServiceDescriptor;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.thread.IgniteThreadFactory;
+import org.apache.ignite.thread.OomExceptionHandler;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
+import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.SERVICE_PROC;
+
+/**
+ * Ignite service processor.
+ * <p/>
+ * Event-driven implementation of the service processor. Service deployment is managed via {@link DiscoverySpi} and
+ * {@link CommunicationSpi} messages.
+ *
+ * @see ServiceDeploymentManager
+ * @see ServiceDeploymentTask
+ * @see ServiceDeploymentActions
+ * @see ServiceChangeBatchRequest
+ */
+@SkipDaemon
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+public class IgniteServiceProcessor extends ServiceProcessorAdapter implements IgniteChangeGlobalStateSupport {
+    /** Local service instances. */
+    private final ConcurrentMap<IgniteUuid, Collection<ServiceContextImpl>> locServices = new ConcurrentHashMap<>();
+
+    /**
+     * Collection of services information that were registered in the cluster. <b>It is updated from discovery
+     * thread</b>. It will be included in the initial data bag to be sent on newly joining node, it means a new node
+     * will have all services' information to be able to work with services in the whole cluster.
+     * <p/>
+     * This collection is used, to make fast verification for requests of change service's state and prepare services
+     * deployments actions (possible long-running) which will be processed from a queue by deployment worker.
+     * <p/>
+     * Collection reflects a services' state which will be reached as soon as a relevant deployment task will be
+     * processed.
+     *
+     * @see ServiceDeploymentActions
+     * @see ServiceDeploymentManager
+     */
+    private final ConcurrentMap<IgniteUuid, ServiceInfo> registeredServices = new ConcurrentHashMap<>();
+
+    /**
+     * Collection of services information that were processed by deployment worker. <b>It is updated from deployment
+     * worker</b>.
+     * <p/>
+     * Collection reflects a state of deployed services for a moment of the latest deployment task processed by
+     * deployment worker.
+     * <p/>
+     * It is catching up the state of {@link #registeredServices}.
+     *
+     * @see ServiceDeploymentManager#readyTopologyVersion()
+     * @see ServiceDeploymentTask
+     */
+    private final ConcurrentMap<IgniteUuid, ServiceInfo> deployedServices = new ConcurrentHashMap<>();
+
+    /** Deployment futures. */
+    private final ConcurrentMap<IgniteUuid, GridServiceDeploymentFuture<IgniteUuid>> depFuts = new ConcurrentHashMap<>();
+
+    /** Undeployment futures. */
+    private final ConcurrentMap<IgniteUuid, GridFutureAdapter<?>> undepFuts = new ConcurrentHashMap<>();
+
+    /** Thread factory. */
+    private final ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName(), "service",
+        new OomExceptionHandler(ctx));
+
+    /** Marshaller for serialization/deserialization of service's instance. */
+    private final Marshaller marsh = new JdkMarshaller();
+
+    /** Services deployment manager. */
+    private volatile ServiceDeploymentManager depMgr = new ServiceDeploymentManager(ctx);
+
+    /** Services topologies update mutex. */
+    private final Object servicesTopsUpdateMux = new Object();
+
+    /**
+     * Operations lock. The main purpose is to avoid a hang of users operation futures.
+     * <p/>
+     * Read lock is being acquired on users operations (deploy, cancel).
+     * <p/>
+     * Write lock is being acquired on change service processor's state: {@link #onKernalStop}, {@link #onDisconnected),
+     * {@link #onDeActivate(GridKernalContext)}} to guarantee that deployed services will be cancelled only once, also
+     * it protects from registering new operations futures which may be missed during completion collections of users
+     * futures.
+     * <pre>
+     * {@link #enterBusy()} and {@link #leaveBusy()} are being used to protect modification of shared collections during
+     * changing service processor state. If a call can't enter in the busy state a default value will be returned (a
+     * value which will be reached by the time when write lock will be released).
+     * These methods can't be used for users operations (deploy, undeploy) because if the processor will become
+     * disconnected or stopped we should return different types of exceptions (it's not about just a errors message,
+     * the disconnected exception also contains reconnect future).
+     * </pre>
+     */
+    private final ReentrantReadWriteLock opsLock = new ReentrantReadWriteLock();
+
+    /** Disconnected flag. */
+    private volatile boolean disconnected;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public IgniteServiceProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        IgniteConfiguration cfg = ctx.config();
+
+        DeploymentMode depMode = cfg.getDeploymentMode();
+
+        if (cfg.isPeerClassLoadingEnabled() && (depMode == PRIVATE || depMode == ISOLATED) &&
+            !F.isEmpty(cfg.getServiceConfiguration()))
+            throw new IgniteCheckedException("Cannot deploy services in PRIVATE or ISOLATED deployment mode: " + depMode);
+
+        ctx.discovery().setCustomEventListener(ServiceChangeBatchRequest.class,
+            new CustomEventListener<ServiceChangeBatchRequest>() {
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
+                    ServiceChangeBatchRequest msg) {
+                    processServicesChangeRequest(snd, msg);
+                }
+            });
+
+        ctx.discovery().setCustomEventListener(ChangeGlobalStateMessage.class,
+            new CustomEventListener<ChangeGlobalStateMessage>() {
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
+                    ChangeGlobalStateMessage msg) {
+                    processChangeGlobalStateRequest(msg);
+                }
+            });
+
+        ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
+            new CustomEventListener<DynamicCacheChangeBatch>() {
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
+                    DynamicCacheChangeBatch msg) {
+                    processDynamicCacheChangeRequest(msg);
+                }
+            });
+
+        ctx.discovery().setCustomEventListener(ServiceClusterDeploymentResultBatch.class,
+            new CustomEventListener<ServiceClusterDeploymentResultBatch>() {
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
+                    ServiceClusterDeploymentResultBatch msg) {
+                    processServicesFullDeployments(msg);
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+        depMgr.startProcessing();
+
+        if (log.isDebugEnabled())
+            log.debug("Started service processor.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        opsLock.writeLock().lock();
+
+        try {
+            if (disconnected)
+                return;
+
+            stopProcessor(new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+        }
+        finally {
+            opsLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * @param stopError Error to shutdown resources.
+     */
+    private void stopProcessor(IgniteCheckedException stopError) {
+        assert opsLock.isWriteLockedByCurrentThread();
+
+        depMgr.stopProcessing(stopError);
+
+        cancelDeployedServices();
+
+        registeredServices.clear();
+
+        // If user requests sent to network but not received back to handle in deployment manager.
+        Stream.concat(depFuts.values().stream(), undepFuts.values().stream()).forEach(fut -> {
+            try {
+                fut.onDone(stopError);
+            }
+            catch (Exception ignore) {
+                // No-op.
+            }
+        });
+
+        depFuts.clear();
+        undepFuts.clear();
+
+        if (log.isDebugEnabled())
+            log.debug("Stopped service processor.");
+    }
+
+    /**
+     * Cancels deployed services.
+     */
+    private void cancelDeployedServices() {
+        assert opsLock.isWriteLockedByCurrentThread();
+
+        deployedServices.clear();
+
+        locServices.values().stream().flatMap(Collection::stream).forEach(srvcCtx -> {
+            cancel(srvcCtx);
+
+            if (ctx.isStopping()) {
+                try {
+                    if (log.isInfoEnabled()) {
+                        log.info("Shutting down distributed service [name=" + srvcCtx.name() + ", execId8=" +
+                            U.id8(srvcCtx.executionId()) + ']');
+                    }
+
+                    srvcCtx.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+                }
+                catch (InterruptedException ignore) {
+                    Thread.currentThread().interrupt();
+
+                    U.error(log, "Got interrupted while waiting for service to shutdown (will continue " +
+                        "stopping node): " + srvcCtx.name());
+                }
+            }
+        });
+
+        locServices.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (dataBag.commonDataCollectedFor(SERVICE_PROC.ordinal()))
+            return;
+
+        ServiceProcessorCommonDiscoveryData clusterData = new ServiceProcessorCommonDiscoveryData(
+            new ArrayList<>(registeredServices.values())
+        );
+
+        dataBag.addGridCommonData(SERVICE_PROC.ordinal(), clusterData);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        if (data.commonData() == null)
+            return;
+
+        ServiceProcessorCommonDiscoveryData clusterData = (ServiceProcessorCommonDiscoveryData)data.commonData();
+
+        for (ServiceInfo desc : clusterData.registeredServices())
+            registeredServices.put(desc.serviceId(), desc);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        ArrayList<ServiceInfo> staticServicesInfo = staticallyConfiguredServices(true);
+
+        dataBag.addJoiningNodeData(SERVICE_PROC.ordinal(), new ServiceProcessorJoinNodeDiscoveryData(staticServicesInfo));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
+        if (data.joiningNodeData() == null)
+            return;
+
+        ServiceProcessorJoinNodeDiscoveryData joinData = (ServiceProcessorJoinNodeDiscoveryData)data.joiningNodeData();
+
+        for (ServiceInfo desc : joinData.services()) {
+            assert desc.topologySnapshot().isEmpty();
+
+            ServiceInfo oldDesc = registeredServices.get(desc.serviceId());
+
+            if (oldDesc != null) { // In case of a collision of IgniteUuid.randomUuid() (almost impossible case)
+                U.warn(log, "Failed to register service configuration received from joining node : " +
+                    "[nodeId=" + data.joiningNodeId() + ", cfgName=" + desc.name() + "]. " +
+                    "Service with the same service id already exists, cfg=" + oldDesc.configuration());
+
+                continue;
+            }
+
+            oldDesc = lookupInRegisteredServices(desc.name());
+
+            if (oldDesc == null) {
+                registeredServices.put(desc.serviceId(), desc);
+
+                continue;
+            }
+
+            if (oldDesc.configuration().equalsIgnoreNodeFilter(desc.configuration())) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Ignore service configuration received from joining node : " +
+                        "[nodeId=" + data.joiningNodeId() + ", cfgName=" + desc.name() + "]. " +
+                        "The same service configuration already registered.");
+                }
+            }
+            else {
+                U.warn(log, "Failed to register service configuration received from joining node : " +
+                    "[nodeId=" + data.joiningNodeId() + ", cfgName=" + desc.name() + "]. " +
+                    "Service already exists with different configuration, cfg=" + desc.configuration());
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return SERVICE_PROC;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onActivate(GridKernalContext kctx) {
+        // No-op.
+    }
+
+    /**
+     * Invokes from services deployment worker.
+     * <p/>
+     * {@inheritDoc}
+     */
+    @Override public void onDeActivate(GridKernalContext kctx) {
+        opsLock.writeLock().lock();
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("DeActivate service processor [nodeId=" + ctx.localNodeId() +
+                    " topVer=" + ctx.discovery().topologyVersionEx() + " ]");
+            }
+
+            cancelDeployedServices();
+        }
+        finally {
+            opsLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+        assert !disconnected;
+
+        opsLock.writeLock().lock();
+
+        try {
+            disconnected = true;
+
+            stopProcessor(new IgniteClientDisconnectedCheckedException(
+                ctx.cluster().clientReconnectFuture(), "Client node disconnected, the operation's result is unknown."));
+        }
+        finally {
+            opsLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> onReconnected(boolean active) throws IgniteCheckedException {
+        assert disconnected;
+
+        opsLock.writeLock().lock();
+
+        try {
+            disconnected = false;
+
+            depMgr = new ServiceDeploymentManager(ctx);
+
+            onKernalStart(active);
+
+            return null;
+        }
+        finally {
+            opsLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Validates service configuration.
+     *
+     * @param c Service configuration.
+     * @throws IgniteException If validation failed.
+     */
+    private void validate(ServiceConfiguration c) throws IgniteException {
+        IgniteConfiguration cfg = ctx.config();
+
+        DeploymentMode depMode = cfg.getDeploymentMode();
+
+        if (cfg.isPeerClassLoadingEnabled() && (depMode == PRIVATE || depMode == ISOLATED))
+            throw new IgniteException("Cannot deploy services in PRIVATE or ISOLATED deployment mode: " + depMode);
+
+        ensure(c.getName() != null, "getName() != null", null);
+        ensure(c.getTotalCount() >= 0, "getTotalCount() >= 0", c.getTotalCount());
+        ensure(c.getMaxPerNodeCount() >= 0, "getMaxPerNodeCount() >= 0", c.getMaxPerNodeCount());
+        ensure(c.getService() != null, "getService() != null", c.getService());
+        ensure(c.getTotalCount() > 0 || c.getMaxPerNodeCount() > 0,
+            "c.getTotalCount() > 0 || c.getMaxPerNodeCount() > 0", null);
+    }
+
+    /**
+     * @param cond Condition.
+     * @param desc Description.
+     * @param v Value.
+     */
+    private void ensure(boolean cond, String desc, @Nullable Object v) {
+        if (!cond)
+            if (v != null)
+                throw new IgniteException("Service configuration check failed (" + desc + "): " + v);
+            else
+                throw new IgniteException("Service configuration check failed (" + desc + ")");
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service srvc) {
+        return deployMultiple(prj, name, srvc, 0, 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service srvc) {
+        return deployMultiple(prj, name, srvc, 1, 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service srvc, int totalCnt,
+        int maxPerNodeCnt) {
+        ServiceConfiguration cfg = new ServiceConfiguration();
+
+        cfg.setName(name);
+        cfg.setService(srvc);
+        cfg.setTotalCount(totalCnt);
+        cfg.setMaxPerNodeCount(maxPerNodeCnt);
+
+        return deployAll(prj, Collections.singleton(cfg));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service srvc, String cacheName,
+        Object affKey) {
+        A.notNull(affKey, "affKey");
+
+        ServiceConfiguration cfg = new ServiceConfiguration();
+
+        cfg.setName(name);
+        cfg.setService(srvc);
+        cfg.setCacheName(cacheName);
+        cfg.setAffinityKey(affKey);
+        cfg.setTotalCount(1);
+        cfg.setMaxPerNodeCount(1);
+
+        // Ignore projection here.
+        return deployAll(Collections.singleton(cfg), null);
+    }
+
+    /**
+     * @param cfgs Service configurations.
+     * @param dfltNodeFilter Default NodeFilter.
+     * @return Configurations to deploy.
+     */
+    @SuppressWarnings("deprecation")
+    private PreparedConfigurations<IgniteUuid> prepareServiceConfigurations(Collection<ServiceConfiguration> cfgs,
+        IgnitePredicate<ClusterNode> dfltNodeFilter) {
+        List<ServiceConfiguration> cfgsCp = new ArrayList<>(cfgs.size());
+
+        List<GridServiceDeploymentFuture<IgniteUuid>> failedFuts = null;
+
+        for (ServiceConfiguration cfg : cfgs) {
+            Exception err = null;
+
+            // Deploy to projection node by default
+            // or only on server nodes if no projection.
+            if (cfg.getNodeFilter() == null && dfltNodeFilter != null)
+                cfg.setNodeFilter(dfltNodeFilter);
+
+            try {
+                validate(cfg);
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to validate service configuration [name=" + cfg.getName() +
+                    ", srvc=" + cfg.getService() + ']', e);
+
+                err = e;
+            }
+
+            if (err == null)
+                err = checkPermissions(cfg.getName(), SecurityPermission.SERVICE_DEPLOY);
+
+            if (err == null) {
+                try {
+                    byte[] srvcBytes = U.marshal(marsh, cfg.getService());
+
+                    cfgsCp.add(new LazyServiceConfiguration(cfg, srvcBytes));
+                }
+                catch (Exception e) {
+                    U.error(log, "Failed to marshal service with configured marshaller " +
+                        "[name=" + cfg.getName() + ", srvc=" + cfg.getService() + ", marsh=" + marsh + "]", e);
+
+                    err = e;
+                }
+            }
+
+            if (err != null) {
+                if (failedFuts == null)
+                    failedFuts = new ArrayList<>();
+
+                GridServiceDeploymentFuture<IgniteUuid> fut = new GridServiceDeploymentFuture<>(cfg, null);
+
+                fut.onDone(err);
+
+                failedFuts.add(fut);
+            }
+        }
+
+        return new PreparedConfigurations<>(cfgsCp, failedFuts);
+    }
+
+    /**
+     * Checks security permissions for service with given name.
+     *
+     * @param name Service name.
+     * @param perm Security permissions.
+     * @return {@code null} if success, otherwise instance of {@link SecurityException}.
+     */
+    private SecurityException checkPermissions(String name, SecurityPermission perm) {
+        try {
+            ctx.security().authorize(name, perm, null);
+
+            return null;
+        }
+        catch (SecurityException e) {
+            U.error(log, "Failed to authorize service access [name=" + name + ", perm=" + perm + ']', e);
+
+            return e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs) {
+        if (prj == null)
+            // Deploy to servers by default if no projection specified.
+            return deployAll(cfgs, ctx.cluster().get().forServers().predicate());
+        else if (prj.predicate() == F.<ClusterNode>alwaysTrue())
+            return deployAll(cfgs, null);
+        else
+            // Deploy to predicate nodes by default.
+            return deployAll(cfgs, prj.predicate());
+    }
+
+    /**
+     * @param cfgs Service configurations.
+     * @param dfltNodeFilter Default NodeFilter.
+     * @return Future for deployment.
+     */
+    private IgniteInternalFuture<?> deployAll(@NotNull Collection<ServiceConfiguration> cfgs,
+        @Nullable IgnitePredicate<ClusterNode> dfltNodeFilter) {
+        opsLock.readLock().lock();
+
+        try {
+            if (disconnected) {
+                return new GridFinishedFuture<>(new IgniteClientDisconnectedCheckedException(
+                    ctx.cluster().clientReconnectFuture(), "Failed to deploy services, " +
+                    "client node disconnected: " + cfgs));
+            }
+
+            if (ctx.isStopping()) {
+                return new GridFinishedFuture<>(new IgniteCheckedException("Failed to deploy services, " +
+                    "node is stopping: " + cfgs));
+            }
+
+            if (cfgs.isEmpty())
+                return new GridFinishedFuture<>();
+
+            PreparedConfigurations<IgniteUuid> srvcCfg = prepareServiceConfigurations(cfgs, dfltNodeFilter);
+
+            List<ServiceConfiguration> cfgsCp = srvcCfg.cfgs;
+
+            List<GridServiceDeploymentFuture<IgniteUuid>> failedFuts = srvcCfg.failedFuts;
+
+            GridServiceDeploymentCompoundFuture<IgniteUuid> res = new GridServiceDeploymentCompoundFuture<>();
+
+            if (!cfgsCp.isEmpty()) {
+                try {
+                    Collection<ServiceChangeAbstractRequest> reqs = new ArrayList<>();
+
+                    for (ServiceConfiguration cfg : cfgsCp) {
+                        IgniteUuid srvcId = IgniteUuid.randomUuid();
+
+                        GridServiceDeploymentFuture<IgniteUuid> fut = new GridServiceDeploymentFuture<>(cfg, srvcId);
+
+                        res.add(fut, true);
+
+                        reqs.add(new ServiceDeploymentRequest(srvcId, cfg));
+
+                        depFuts.put(srvcId, fut);
+                    }
+
+                    ServiceChangeBatchRequest msg = new ServiceChangeBatchRequest(reqs);
+
+                    ctx.discovery().sendCustomEvent(msg);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Services have been sent to deploy, req=" + msg);
+                }
+                catch (IgniteException | IgniteCheckedException e) {
+                    for (IgniteUuid id : res.servicesToRollback())
+                        depFuts.remove(id).onDone(e);
+
+                    res.onDone(new IgniteCheckedException(
+                        new ServiceDeploymentException("Failed to deploy provided services.", e, cfgs)));
+
+                    return res;
+                }
+            }
+
+            if (failedFuts != null) {
+                for (GridServiceDeploymentFuture<IgniteUuid> fut : failedFuts)
+                    res.add(fut, false);
+            }
+
+            res.markInitialized();
+
+            return res;
+        }
+        finally {
+            opsLock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> cancel(String name) {
+        return cancelAll(Collections.singleton(name));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> cancelAll() {
+        return cancelAll(deployedServices.values().stream().map(ServiceInfo::name).collect(Collectors.toSet()));
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public IgniteInternalFuture<?> cancelAll(@NotNull Collection<String> servicesNames) {
+        opsLock.readLock().lock();
+
+        try {
+            if (disconnected) {
+                return new GridFinishedFuture<>(new IgniteClientDisconnectedCheckedException(
+                    ctx.cluster().clientReconnectFuture(), "Failed to undeploy services, " +
+                    "client node disconnected: " + servicesNames));
+            }
+
+            if (ctx.isStopping()) {
+                return new GridFinishedFuture<>(new IgniteCheckedException("Failed to undeploy services, " +
+                    "node is stopping: " + servicesNames));
+            }
+
+            if (servicesNames.isEmpty())
+                return new GridFinishedFuture<>();
+
+            GridCompoundFuture res = new GridCompoundFuture<>();
+
+            Set<IgniteUuid> toRollback = new HashSet<>();
+
+            List<ServiceChangeAbstractRequest> reqs = new ArrayList<>();
+
+            try {
+                for (String name : servicesNames) {
+                    IgniteUuid srvcId = lookupDeployedServiceId(name);
+
+                    if (srvcId == null)
+                        continue;
+
+                    Exception err = checkPermissions(name, SecurityPermission.SERVICE_CANCEL);
+
+                    if (err != null) {
+                        res.add(new GridFinishedFuture<>(err));
+
+                        continue;
+                    }
+
+                    GridFutureAdapter<?> fut = new GridFutureAdapter<>();
+
+                    GridFutureAdapter<?> old = undepFuts.putIfAbsent(srvcId, fut);
+
+                    if (old != null) {
+                        res.add(old);
+
+                        continue;
+                    }
+
+                    res.add(fut);
+
+                    toRollback.add(srvcId);
+
+                    reqs.add(new ServiceUndeploymentRequest(srvcId));
+                }
+
+                if (!reqs.isEmpty()) {
+                    ServiceChangeBatchRequest msg = new ServiceChangeBatchRequest(reqs);
+
+                    ctx.discovery().sendCustomEvent(msg);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Services have been sent to cancel, msg=" + msg);
+                }
+            }
+            catch (IgniteException | IgniteCheckedException e) {
+                for (IgniteUuid id : toRollback)
+                    undepFuts.remove(id).onDone(e);
+
+                U.error(log, "Failed to undeploy services: " + servicesNames, e);
+
+                res.onDone(e);
+
+                return res;
+            }
+
+            res.markInitialized();
+
+            return res;
+        }
+        finally {
+            opsLock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException {
+        assert timeout >= 0;
+
+        long startTime = U.currentTimeMillis();
+
+        Map<UUID, Integer> top;
+
+        while (true) {
+            top = serviceTopology(name);
+
+            if (timeout == 0 || (top != null && !top.isEmpty()))
+                return top;
+
+            synchronized (servicesTopsUpdateMux) {
+                long wait = timeout - (U.currentTimeMillis() - startTime);
+
+                if (wait <= 0)
+                    return top;
+
+                try {
+                    servicesTopsUpdateMux.wait(wait);
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedCheckedException(e);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param name Service name.
+     * @return Service topology.
+     */
+    private Map<UUID, Integer> serviceTopology(String name) {
+        for (ServiceInfo desc : registeredServices.values()) {
+            if (desc.name().equals(name))
+                return desc.topologySnapshot();
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ServiceDescriptor> serviceDescriptors() {
+        return new ArrayList<>(registeredServices.values());
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T service(String name) {
+        if (!enterBusy())
+            return null;
+
+        try {
+            ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE, null);
+
+            Collection<ServiceContextImpl> ctxs = serviceContexts(name);
+
+            if (ctxs == null)
+                return null;
+
+            synchronized (ctxs) {
+                if (F.isEmpty(ctxs))
+                    return null;
+
+                for (ServiceContextImpl ctx : ctxs) {
+                    Service srvc = ctx.service();
+
+                    if (srvc != null)
+                        return (T)srvc;
+                }
+
+                return null;
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ServiceContextImpl serviceContext(String name) {
+        if (!enterBusy())
+            return null;
+
+        try {
+            Collection<ServiceContextImpl> ctxs = serviceContexts(name);
+
+            if (ctxs == null)
+                return null;
+
+            synchronized (ctxs) {
+                if (F.isEmpty(ctxs))
+                    return null;
+
+                for (ServiceContextImpl ctx : ctxs) {
+                    if (ctx.service() != null)
+                        return ctx;
+                }
+            }
+
+            return null;
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param name Service name.
+     * @return Collection of locally deployed instance if present.
+     */
+    @Nullable private Collection<ServiceContextImpl> serviceContexts(String name) {
+        IgniteUuid srvcId = lookupDeployedServiceId(name);
+
+        if (srvcId == null)
+            return null;
+
+        return locServices.get(srvcId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> srvcCls, boolean sticky,
+        long timeout)
+        throws IgniteException {
+        ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE, null);
+
+        if (hasLocalNode(prj)) {
+            ServiceContextImpl ctx = serviceContext(name);
+
+            if (ctx != null) {
+                Service srvc = ctx.service();
+
+                if (srvc != null) {
+                    if (!srvcCls.isAssignableFrom(srvc.getClass()))
+                        throw new IgniteException("Service does not implement specified interface [srvcCls=" +
+                            srvcCls.getName() + ", srvcCls=" + srvc.getClass().getName() + ']');
+
+                    return (T)srvc;
+                }
+            }
+        }
+
+        return new GridServiceProxy<T>(prj, name, srvcCls, sticky, timeout, ctx).proxy();
+    }
+
+    /**
+     * @param prj Grid nodes projection.
+     * @return Whether given projection contains any local node.
+     */
+    private boolean hasLocalNode(ClusterGroup prj) {
+        for (ClusterNode n : prj.nodes()) {
+            if (n.isLocal())
+                return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> Collection<T> services(String name) {
+        if (!enterBusy())
+            return null;
+
+        try {
+            ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE, null);
+
+            Collection<ServiceContextImpl> ctxs = serviceContexts(name);
+
+            if (ctxs == null)
+                return null;
+
+            synchronized (ctxs) {
+                if (F.isEmpty(ctxs))
+                    return null;
+
+                Collection<T> res = new ArrayList<>(ctxs.size());
+
+                for (ServiceContextImpl ctx : ctxs) {
+                    Service srvc = ctx.service();
+
+                    if (srvc != null)
+                        res.add((T)srvc);
+                }
+
+                return res;
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Reassigns service to nodes.
+     *
+     * @param srvcId Service id.
+     * @param cfg Service configuration.
+     * @param topVer Topology version.
+     * @param oldTop Previous topology snapshot. Will be ignored for affinity service.
+     * @throws IgniteCheckedException If failed.
+     */
+    Map<UUID, Integer> reassign(@NotNull IgniteUuid srvcId, @NotNull ServiceConfiguration cfg,
+        @NotNull AffinityTopologyVersion topVer,
+        @Nullable TreeMap<UUID, Integer> oldTop) throws IgniteCheckedException {
+        Object nodeFilter = cfg.getNodeFilter();
+
+        if (nodeFilter != null)
+            ctx.resource().injectGeneric(nodeFilter);
+
+        int totalCnt = cfg.getTotalCount();
+        int maxPerNodeCnt = cfg.getMaxPerNodeCount();
+        String cacheName = cfg.getCacheName();
+        Object affKey = cfg.getAffinityKey();
+
+        Map<UUID, Integer> cnts = new TreeMap<>();
+
+        if (affKey != null && cacheName != null) { // Affinity service
+            ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, topVer);
+
+            if (n != null) {
+                int cnt = maxPerNodeCnt == 0 ? totalCnt == 0 ? 1 : totalCnt : maxPerNodeCnt;
+
+                cnts.put(n.id(), cnt);
+            }
+        }
+        else {
+            Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer);
+
+            if (cfg.getNodeFilter() != null) {
+                Collection<ClusterNode> nodes0 = new ArrayList<>();
+
+                for (ClusterNode node : nodes) {
+                    if (cfg.getNodeFilter().apply(node))
+                        nodes0.add(node);
+                }
+
+                nodes = nodes0;
+            }
+
+            if (!nodes.isEmpty()) {
+                int size = nodes.size();
+
+                int perNodeCnt = totalCnt != 0 ? totalCnt / size : maxPerNodeCnt;
+                int remainder = totalCnt != 0 ? totalCnt % size : 0;
+
+                if (perNodeCnt >= maxPerNodeCnt && maxPerNodeCnt != 0) {
+                    perNodeCnt = maxPerNodeCnt;
+                    remainder = 0;
+                }
+
+                for (ClusterNode n : nodes)
+                    cnts.put(n.id(), perNodeCnt);
+
+                assert perNodeCnt >= 0;
+                assert remainder >= 0;
+
+                if (remainder > 0) {
+                    int cnt = perNodeCnt + 1;
+
+                    Random rnd = new Random(srvcId.localId());
+
+                    if (oldTop != null && !oldTop.isEmpty()) {
+                        Collection<UUID> used = new TreeSet<>();
+
+                        // Avoid redundant moving of services.
+                        for (Map.Entry<UUID, Integer> e : oldTop.entrySet()) {
+                            // If old count and new count match, then reuse the assignment.
+                            if (e.getValue() == cnt) {
+                                cnts.put(e.getKey(), cnt);
+
+                                used.add(e.getKey());
+
+                                if (--remainder == 0)
+                                    break;
+                            }
+                        }
+
+                        if (remainder > 0) {
+                            List<Map.Entry<UUID, Integer>> entries = new ArrayList<>(cnts.entrySet());
+
+                            // Randomize.
+                            Collections.shuffle(entries, rnd);
+
+                            for (Map.Entry<UUID, Integer> e : entries) {
+                                // Assign only the ones that have not been reused from previous assignments.
+                                if (!used.contains(e.getKey())) {
+                                    if (e.getValue() < maxPerNodeCnt || maxPerNodeCnt == 0) {
+                                        e.setValue(e.getValue() + 1);
+
+                                        if (--remainder == 0)
+                                            break;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                    else {
+                        List<Map.Entry<UUID, Integer>> entries = new ArrayList<>(cnts.entrySet());
+
+                        // Randomize.
+                        Collections.shuffle(entries, rnd);
+
+                        for (Map.Entry<UUID, Integer> e : entries) {
+                            e.setValue(e.getValue() + 1);
+
+                            if (--remainder == 0)
+                                break;
+                        }
+                    }
+                }
+            }
+        }
+
+        return cnts;
+    }
+
+    /**
+     * Redeploys local services based on assignments.
+     * <p/>
+     * Invokes from services deployment worker.
+     *
+     * @param srvcId Service id.
+     * @param cfg Service configuration.
+     * @param top Service topology.
+     * @throws IgniteCheckedException In case of deployment errors.
+     */
+    void redeploy(IgniteUuid srvcId, ServiceConfiguration cfg,
+        Map<UUID, Integer> top) throws IgniteCheckedException {
+        String name = cfg.getName();
+        String cacheName = cfg.getCacheName();
+        Object affKey = cfg.getAffinityKey();
+
+        int assignCnt = top.getOrDefault(ctx.localNodeId(), 0);
+
+        Collection<ServiceContextImpl> ctxs = locServices.computeIfAbsent(srvcId, c -> new ArrayList<>());
+
+        Collection<ServiceContextImpl> toInit = new ArrayList<>();
+
+        synchronized (ctxs) {
+            if (ctxs.size() > assignCnt) {
+                int cancelCnt = ctxs.size() - assignCnt;
+
+                cancel(ctxs, cancelCnt);
+            }
+            else if (ctxs.size() < assignCnt) {
+                int createCnt = assignCnt - ctxs.size();
+
+                for (int i = 0; i < createCnt; i++) {
+                    ServiceContextImpl srvcCtx = new ServiceContextImpl(name,
+                        UUID.randomUUID(),
+                        cacheName,
+                        affKey,
+                        Executors.newSingleThreadExecutor(threadFactory));
+
+                    ctxs.add(srvcCtx);
+
+                    toInit.add(srvcCtx);
+                }
+            }
+        }
+
+        for (final ServiceContextImpl srvcCtx : toInit) {
+            final Service srvc;
+
+            try {
+                srvc = copyAndInject(cfg);
+
+                // Initialize service.
+                srvc.init(srvcCtx);
+
+                srvcCtx.service(srvc);
+            }
+            catch (Throwable e) {
+                U.error(log, "Failed to initialize service (service will not be deployed): " + name, e);
+
+                synchronized (ctxs) {
+                    ctxs.removeAll(toInit);
+                }
+
+                throw new IgniteCheckedException("Error occured during service initialization: " +
+                    "[locId=" + ctx.localNodeId() + ", name=" + name + ']', e);
+            }
+
+            if (log.isInfoEnabled())
+                log.info("Starting service instance [name=" + srvcCtx.name() + ", execId=" +
+                    srvcCtx.executionId() + ']');
+
+            // Start service in its own thread.
+            final ExecutorService exe = srvcCtx.executor();
+
+            exe.execute(new Runnable() {
+                @Override public void run() {
+                    try {
+                        srvc.execute(srvcCtx);
+                    }
+                    catch (InterruptedException | IgniteInterruptedCheckedException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Service thread was interrupted [name=" + srvcCtx.name() + ", execId=" +
+                                srvcCtx.executionId() + ']');
+                    }
+                    catch (IgniteException e) {
+                        if (e.hasCause(InterruptedException.class) ||
+                            e.hasCause(IgniteInterruptedCheckedException.class)) {
+                            if (log.isDebugEnabled())
+                                log.debug("Service thread was interrupted [name=" + srvcCtx.name() +
+                                    ", execId=" + srvcCtx.executionId() + ']');
+                        }
+                        else {
+                            U.error(log, "Service execution stopped with error [name=" + srvcCtx.name() +
+                                ", execId=" + srvcCtx.executionId() + ']', e);
+                        }
+                    }
+                    catch (Throwable e) {
+                        U.error(log, "Service execution stopped with error [name=" + srvcCtx.name() +
+                            ", execId=" + srvcCtx.executionId() + ']', e);
+
+                        if (e instanceof Error)
+                            throw (Error)e;
+                    }
+                    finally {
+                        // Suicide.
+                        exe.shutdownNow();
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     * @param cfg Service configuration.
+     * @return Copy of service.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("deprecation")
+    private Service copyAndInject(ServiceConfiguration cfg) throws IgniteCheckedException {
+        if (cfg instanceof LazyServiceConfiguration) {
+            byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
+
+            Service srvc = U.unmarshal(marsh, bytes, U.resolveClassLoader(null, ctx.config()));
+
+            ctx.resource().inject(srvc);
+
+            return srvc;
+        }
+        else {
+            Service srvc = cfg.getService();
+
+            try {
+                byte[] bytes = U.marshal(marsh, srvc);
+
+                Service cp = U.unmarshal(marsh, bytes, U.resolveClassLoader(srvc.getClass().getClassLoader(), ctx.config()));
+
+                ctx.resource().inject(cp);
+
+                return cp;
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to copy service (will reuse same instance): " + srvc.getClass(), e);
+
+                return srvc;
+            }
+        }
+    }
+
+    /**
+     * @param ctxs Contexts to cancel.
+     * @param cancelCnt Number of contexts to cancel.
+     */
+    private void cancel(Iterable<ServiceContextImpl> ctxs, int cancelCnt) {
+        for (Iterator<ServiceContextImpl> it = ctxs.iterator(); it.hasNext(); ) {
+            cancel(it.next());
+
+            it.remove();
+
+            if (--cancelCnt == 0)
+                break;
+        }
+    }
+
+    /**
+     * Perform cancelation on given service context.
+     *
+     * @param ctx Service context.
+     */
+    private void cancel(ServiceContextImpl ctx) {
+        // Flip cancelled flag.
+        ctx.setCancelled(true);
+
+        // Notify service about cancellation.
+        Service srvc = ctx.service();
+
+        if (srvc != null) {
+            try {
+                srvc.cancel(ctx);
+            }
+            catch (Throwable e) {
+                U.error(log, "Failed to cancel service (ignoring) [name=" + ctx.name() +
+                    ", execId=" + ctx.executionId() + ']', e);
+
+                if (e instanceof Error)
+                    throw e;
+            }
+            finally {
+                try {
+                    this.ctx.resource().cleanup(srvc);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to clean up service (will ignore): " + ctx.name(), e);
+                }
+            }
+        }
+
+        // Close out executor thread for the service.
+        // This will cause the thread to be interrupted.
+        ctx.executor().shutdownNow();
+
+        if (log.isInfoEnabled()) {
+            log.info("Cancelled service instance [name=" + ctx.name() + ", execId=" +
+                ctx.executionId() + ']');
+        }
+    }
+
+    /**
+     * Undeployes service with given id.
+     * <p/>
+     * Invokes from services deployment worker.
+     *
+     * @param srvcId Service id.
+     */
+    void undeploy(@NotNull IgniteUuid srvcId) {
+        Collection<ServiceContextImpl> ctxs = locServices.remove(srvcId);
+
+        if (ctxs != null) {
+            synchronized (ctxs) {
+                cancel(ctxs, ctxs.size());
+            }
+        }
+    }
+
+    /**
+     * @param deploy {@code true} if complete deployment requests, otherwise complete undeployment request will be
+     * completed.
+     * @param reqSrvcId Request's service id.
+     * @param err Error to complete with. If {@code null} a future will be completed successfully.
+     */
+    void completeInitiatingFuture(boolean deploy, IgniteUuid reqSrvcId, Throwable err) {
+        GridFutureAdapter<?> fut = deploy ? depFuts.remove(reqSrvcId) : undepFuts.remove(reqSrvcId);
+
+        if (fut == null)
+            return;
+
+        if (err != null) {
+            fut.onDone(err);
+
+            if (deploy) {
+                U.warn(log, "Failed to deploy service, cfg=" +
+                    ((GridServiceDeploymentFuture)fut).configuration(), err);
+            }
+            else
+                U.warn(log, "Failed to undeploy service, srvcId=" + reqSrvcId, err);
+        }
+        else
+            fut.onDone();
+    }
+
+    /**
+     * Processes deployment result.
+     *
+     * @param fullTops Deployment topologies.
+     */
+    void updateServicesTopologies(@NotNull final Map<IgniteUuid, Map<UUID, Integer>> fullTops) {
+        if (!enterBusy())
+            return;
+
+        try {
+            updateServicesMap(deployedServices, fullTops);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param name Service name;
+     * @return @return Service's id if exists, otherwise {@code null};
+     */
+    @Nullable private IgniteUuid lookupDeployedServiceId(String name) {
+        for (ServiceInfo desc : deployedServices.values()) {
+            if (desc.name().equals(name))
+                return desc.serviceId();
+        }
+
+        return null;
+    }
+
+    /**
+     * @param srvcId Service id.
+     * @return Count of locally deployed service with given id.
+     */
+    int localInstancesCount(IgniteUuid srvcId) {
+        Collection<ServiceContextImpl> ctxs = locServices.get(srvcId);
+
+        if (ctxs == null)
+            return 0;
+
+        synchronized (ctxs) {
+            return ctxs.size();
+        }
+    }
+
+    /**
+     * Updates deployed services map according to deployment task.
+     * <p/>
+     * Invokes from services deployment worker.
+     *
+     * @param depActions Service deployment actions.
+     */
+    void updateDeployedServices(final ServiceDeploymentActions depActions) {
+        if (!enterBusy())
+            return;
+
+        try {
+            depActions.servicesToDeploy().forEach(deployedServices::putIfAbsent);
+
+            depActions.servicesToUndeploy().forEach((srvcId, desc) -> {
+                ServiceInfo rmv = deployedServices.remove(srvcId);
+
+                assert rmv != null && rmv == desc : "Concurrent map modification.";
+            });
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @return Deployed services information.
+     */
+    Map<IgniteUuid, ServiceInfo> deployedServices() {
+        return new HashMap<>(deployedServices);
+    }
+
+    /**
+     * Gets services received to deploy from node with given id on joining.
+     *
+     * @param nodeId Joined node id.
+     * @return Services to deploy.
+     */
+    @NotNull Map<IgniteUuid, ServiceInfo> servicesReceivedFromJoin(UUID nodeId) {
+        Map<IgniteUuid, ServiceInfo> descs = new HashMap<>();
+
+        registeredServices.forEach((srvcId, desc) -> {
+            if (desc.staticallyConfigured() && desc.originNodeId().equals(nodeId))
+                descs.put(srvcId, desc);
+        });
+
+        return descs;
+    }
+
+    /**
+     * @return Cluster coordinator, {@code null} if failed to determine.
+     */
+    @Nullable ClusterNode coordinator() {
+        return U.oldest(ctx.discovery().aliveServerNodes(), null);
+    }
+
+    /**
+     * @return {@code true} if local node is coordinator.
+     */
+    private boolean isLocalNodeCoordinator() {
+        DiscoverySpi spi = ctx.discovery().getInjectedDiscoverySpi();
+
+        return spi instanceof TcpDiscoverySpi ?
+            ((TcpDiscoverySpi)spi).isLocalNodeCoordinator() :
+            F.eq(ctx.discovery().localNode(), coordinator());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache) {
+        assert ctx.localNodeId().equals(evt.eventNode().id());
+        assert evt.type() == EVT_NODE_JOINED;
+
+        if (isLocalNodeCoordinator()) {
+            // First node start, method onGridDataReceived(DiscoveryDataBag.GridDiscoveryData) has not been called.
+            ArrayList<ServiceInfo> staticServicesInfo = staticallyConfiguredServices(false);
+
+            staticServicesInfo.forEach(desc -> registeredServices.put(desc.serviceId(), desc));
+        }
+
+        ServiceDeploymentActions depActions = null;
+
+        if (!registeredServices.isEmpty()) {
+            depActions = new ServiceDeploymentActions();
+
+            depActions.servicesToDeploy(new HashMap<>(registeredServices));
+        }
+
+        depMgr.onLocalJoin(evt, discoCache, depActions);
+    }
+
+    /**
+     * @return Services deployment manager.
+     */
+    public ServiceDeploymentManager deployment() {
+        return depMgr;
+    }
+
+    /**
+     * @param logErrors Whenever it's necessary to log validation failures.
+     * @return Statically configured services.
+     */
+    @NotNull private ArrayList<ServiceInfo> staticallyConfiguredServices(boolean logErrors) {
+        ServiceConfiguration[] cfgs = ctx.config().getServiceConfiguration();
+
+        ArrayList<ServiceInfo> staticServicesInfo = new ArrayList<>();
+
+        if (cfgs != null) {
+            PreparedConfigurations<IgniteUuid> prepCfgs = prepareServiceConfigurations(Arrays.asList(cfgs),
+                node -> !node.isClient());
+
+            if (logErrors) {
+                if (prepCfgs.failedFuts != null) {
+                    for (GridServiceDeploymentFuture<IgniteUuid> fut : prepCfgs.failedFuts) {
+                        U.warn(log, "Failed to validate static service configuration (won't be deployed), " +
+                            "cfg=" + fut.configuration() + ", err=" + fut.result());
+                    }
+                }
+            }
+
+            for (ServiceConfiguration srvcCfg : prepCfgs.cfgs)
+                staticServicesInfo.add(new ServiceInfo(ctx.localNodeId(), IgniteUuid.randomUuid(), srvcCfg, true));
+        }
+
+        return staticServicesInfo;
+    }
+
+    /**
+     * @param snd Sender.
+     * @param msg Message.
+     */
+    private void processServicesChangeRequest(ClusterNode snd, ServiceChangeBatchRequest msg) {
+        DiscoveryDataClusterState state = ctx.state().clusterState();
+
+        if (!state.active() || state.transition()) {
+            for (ServiceChangeAbstractRequest req : msg.requests()) {
+                GridFutureAdapter<?> fut = null;
+
+                if (req instanceof ServiceDeploymentRequest)
+                    fut = depFuts.remove(req.serviceId());
+                else if (req instanceof ServiceUndeploymentRequest)
+                    fut = undepFuts.remove(req.serviceId());
+
+                if (fut != null) {
+                    fut.onDone(new IgniteCheckedException("Operation has been canceled, cluster state " +
+                        "change is in progress."));
+                }
+            }
+
+            return;
+        }
+
+        Map<IgniteUuid, ServiceInfo> toDeploy = new HashMap<>();
+        Map<IgniteUuid, ServiceInfo> toUndeploy = new HashMap<>();
+
+        for (ServiceChangeAbstractRequest req : msg.requests()) {
+            IgniteUuid reqSrvcId = req.serviceId();
+            ServiceInfo oldDesc = registeredServices.get(reqSrvcId);
+
+            if (req instanceof ServiceDeploymentRequest) {
+                IgniteCheckedException err = null;
+
+                if (oldDesc != null) { // In case of a collision of IgniteUuid.randomUuid() (almost impossible case)
+                    err = new IgniteCheckedException("Failed to deploy service. Service with generated id already" +
+                        "exists : [" + "srvcId" + reqSrvcId + ", srvcTop=" + oldDesc.topologySnapshot() + ']');
+                }
+                else {
+                    ServiceConfiguration cfg = ((ServiceDeploymentRequest)req).configuration();
+
+                    oldDesc = lookupInRegisteredServices(cfg.getName());
+
+                    if (oldDesc == null) {
+                        if (cfg.getCacheName() != null && ctx.cache().cacheDescriptor(cfg.getCacheName()) == null) {
+                            err = new IgniteCheckedException("Failed to deploy service, " +
+                                "affinity cache is not found, cfg=" + cfg);
+                        }
+                        else {
+                            ServiceInfo desc = new ServiceInfo(snd.id(), reqSrvcId, cfg);
+
+                            registeredServices.put(reqSrvcId, desc);
+
+                            toDeploy.put(reqSrvcId, desc);
+                        }
+                    }
+                    else {
+                        if (!oldDesc.configuration().equalsIgnoreNodeFilter(cfg)) {
+                            err = new IgniteCheckedException("Failed to deploy service " +
+                                "(service already exists with different configuration) : " +
+                                "[deployed=" + oldDesc.configuration() + ", new=" + cfg + ']');
+                        }
+                        else {
+                            GridServiceDeploymentFuture<IgniteUuid> fut = depFuts.remove(reqSrvcId);
+
+                            if (fut != null) {
+                                fut.onDone();
+
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Service sent to deploy is already deployed : " +
+                                        "[srvcId=" + oldDesc.serviceId() + ", cfg=" + oldDesc.configuration());
+                                }
+                            }
+                        }
+                    }
+                }
+
+                if (err != null) {
+                    completeInitiatingFuture(true, reqSrvcId, err);
+
+                    U.warn(log, err.getMessage(), err);
+                }
+            }
+            else if (req instanceof ServiceUndeploymentRequest) {
+                ServiceInfo rmv = registeredServices.remove(reqSrvcId);
+
+                assert oldDesc == rmv : "Concurrent map modification.";
+
+                toUndeploy.put(reqSrvcId, rmv);
+            }
+        }
+
+        if (!toDeploy.isEmpty() || !toUndeploy.isEmpty()) {
+            ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+
+            if (!toDeploy.isEmpty())
+                depActions.servicesToDeploy(toDeploy);
+
+            if (!toUndeploy.isEmpty())
+                depActions.servicesToUndeploy(toUndeploy);
+
+            msg.servicesDeploymentActions(depActions);
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void processChangeGlobalStateRequest(ChangeGlobalStateMessage msg) {
+        if (msg.activate() && registeredServices.isEmpty())
+            return;
+
+        ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+
+        if (msg.activate())
+            depActions.servicesToDeploy(new HashMap<>(registeredServices));
+        else
+            depActions.deactivate(true);
+
+        msg.servicesDeploymentActions(depActions);
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void processDynamicCacheChangeRequest(DynamicCacheChangeBatch msg) {
+        Map<IgniteUuid, ServiceInfo> toUndeploy = new HashMap<>();
+
+        for (DynamicCacheChangeRequest chReq : msg.requests()) {
+            if (chReq.stop()) {
+                registeredServices.entrySet().removeIf(e -> {
+                    ServiceInfo desc = e.getValue();
+
+                    if (desc.cacheName().equals(chReq.cacheName())) {
+                        toUndeploy.put(desc.serviceId(), desc);
+
+                        return true;
+                    }
+
+                    return false;
+                });
+            }
+        }
+
+        if (!toUndeploy.isEmpty()) {
+            ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+
+            depActions.servicesToUndeploy(toUndeploy);
+
+            msg.servicesDeploymentActions(depActions);
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void processServicesFullDeployments(ServiceClusterDeploymentResultBatch msg) {
+        final Map<IgniteUuid, Map<UUID, Integer>> fullTops = new HashMap<>();
+        final Map<IgniteUuid, Collection<byte[]>> fullErrors = new HashMap<>();
+
+        for (ServiceClusterDeploymentResult depRes : msg.results()) {
+            final IgniteUuid srvcId = depRes.serviceId();
+            final Map<UUID, ServiceSingleNodeDeploymentResult> deps = depRes.results();
+
+            final Map<UUID, Integer> top = new HashMap<>();
+            final Collection<byte[]> errors = new ArrayList<>();
+
+            deps.forEach((nodeId, res) -> {
+                int cnt = res.count();
+
+                if (cnt > 0)
+                    top.put(nodeId, cnt);
+
+                if (!res.errors().isEmpty())
+                    errors.addAll(res.errors());
+            });
+
+            if (!errors.isEmpty())
+                fullErrors.computeIfAbsent(srvcId, e -> new ArrayList<>()).addAll(errors);
+
+            fullTops.put(srvcId, top);
+        }
+
+        synchronized (servicesTopsUpdateMux) {
+            updateServicesMap(registeredServices, fullTops);
+
+            servicesTopsUpdateMux.notifyAll();
+        }
+
+        ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+
+        depActions.deploymentTopologies(fullTops);
+        depActions.deploymentErrors(fullErrors);
+
+        msg.servicesDeploymentActions(depActions);
+    }
+
+    /**
+     * @param name Service name.
+     * @return Mapped service descriptor. Possibly {@code null} if not found.
+     */
+    @Nullable private ServiceInfo lookupInRegisteredServices(String name) {
+        for (ServiceInfo desc : registeredServices.values()) {
+            if (desc.name().equals(name))
+                return desc;
+        }
+
+        return null;
+    }
+
+    /**
+     * Updates services info according to given arguments.
+     *
+     * @param services Services info to update.
+     * @param tops Deployment topologies.
+     */
+    private void updateServicesMap(Map<IgniteUuid, ServiceInfo> services,
+        Map<IgniteUuid, Map<UUID, Integer>> tops) {
+
+        tops.forEach((srvcId, top) -> {
+            ServiceInfo desc = services.get(srvcId);
+
+            if (desc != null)
+                desc.topologySnapshot(top);
+        });
+    }
+
+    /**
+     * Enters busy state.
+     *
+     * @return {@code true} if entered to busy state.
+     */
+    private boolean enterBusy() {
+        return opsLock.readLock().tryLock();
+    }
+
+    /**
+     * Leaves busy state.
+     */
+    private void leaveBusy() {
+        opsLock.readLock().unlock();
+    }
+}
index dc41c22..abfb65c 100644 (file)
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.service;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.services.ServiceConfiguration;
@@ -24,18 +25,18 @@ import org.apache.ignite.services.ServiceConfiguration;
 /**
  * Result of services validation before deployment.
  */
-class PreparedConfigurations {
+class PreparedConfigurations<T extends Serializable> {
     /** */
     final List<ServiceConfiguration> cfgs;
 
     /** */
-    final List<GridServiceDeploymentFuture> failedFuts;
+    final List<GridServiceDeploymentFuture<T>> failedFuts;
 
     /**
      * @param cfgs Configurations to deploy.
      * @param failedFuts Finished futures for failed configurations.
      */
-    PreparedConfigurations(List<ServiceConfiguration> cfgs, List<GridServiceDeploymentFuture> failedFuts) {
+    PreparedConfigurations(List<ServiceConfiguration> cfgs, List<GridServiceDeploymentFuture<T>> failedFuts) {
         this.cfgs = cfgs;
         this.failedFuts = failedFuts;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceChangeAbstractRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceChangeAbstractRequest.java
new file mode 100644 (file)
index 0000000..d083aad
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Abstract class for service change requests.
+ */
+abstract class ServiceChangeAbstractRequest implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Service id. */
+    protected final IgniteUuid srvcId;
+
+    /**
+     * @param srvcId Service id.
+     */
+    protected ServiceChangeAbstractRequest(@NotNull IgniteUuid srvcId) {
+        this.srvcId = srvcId;
+    }
+
+    /**
+     * @return Service id.
+     */
+    public IgniteUuid serviceId() {
+        return srvcId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceChangeAbstractRequest.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceChangeBatchRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceChangeBatchRequest.java
new file mode 100644 (file)
index 0000000..a6707b6
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Service change batch request discovery message.
+ */
+public class ServiceChangeBatchRequest implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Unique custom message ID. */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** Change requests. */
+    @GridToStringInclude
+    private final Collection<ServiceChangeAbstractRequest> reqs;
+
+    /** Services deployment actions to be processed on services deployment process. */
+    @GridToStringExclude
+    @Nullable private transient ServiceDeploymentActions serviceDeploymentActions;
+
+    /**
+     * @param reqs Change requests.
+     */
+    public ServiceChangeBatchRequest(Collection<ServiceChangeAbstractRequest> reqs) {
+        assert !F.isEmpty(reqs);
+
+        this.reqs = reqs;
+    }
+
+    /**
+     * @return Change requests.
+     */
+    public Collection<ServiceChangeAbstractRequest> requests() {
+        return Collections.unmodifiableCollection(reqs);
+    }
+
+    /**
+     * @return Services deployment actions to be processed on services deployment process.
+     */
+    @Nullable public ServiceDeploymentActions servicesDeploymentActions() {
+        return serviceDeploymentActions;
+    }
+
+    /**
+     * @param serviceDeploymentActions Services deployment actions to be processed on services deployment process.
+     */
+    public void servicesDeploymentActions(ServiceDeploymentActions serviceDeploymentActions) {
+        this.serviceDeploymentActions = serviceDeploymentActions;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        // No-op.
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
+        DiscoCache discoCache) {
+        // No-op.
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceChangeBatchRequest.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java
new file mode 100644 (file)
index 0000000..45e2c52
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Service cluster deployment result.
+ * <p/>
+ * Contains coint of deployed service and deployment errors across the cluster mapped to nodes ids.
+ */
+public class ServiceClusterDeploymentResult implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Service id. */
+    private final IgniteUuid srvcId;
+
+    /** Per node deployments results. */
+    @GridToStringInclude
+    private final Map<UUID, ServiceSingleNodeDeploymentResult> results;
+
+    /**
+     * @param srvcId Service id.
+     * @param results Deployments results.
+     */
+    public ServiceClusterDeploymentResult(@NotNull IgniteUuid srvcId,
+        @NotNull Map<UUID, ServiceSingleNodeDeploymentResult> results) {
+        this.srvcId = srvcId;
+        this.results = results;
+    }
+
+    /**
+     * @return Service id.
+     */
+    public IgniteUuid serviceId() {
+        return srvcId;
+    }
+
+    /**
+     * @return Per node deployments results.
+     */
+    public Map<UUID, ServiceSingleNodeDeploymentResult> results() {
+        return Collections.unmodifiableMap(results);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceClusterDeploymentResult.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResultBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResultBatch.java
new file mode 100644 (file)
index 0000000..6aeb4b6
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.Collection;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Batch of services cluster deployment result.
+ * <p/>
+ * Contains collection of {@link ServiceClusterDeploymentResult}.
+ */
+public class ServiceClusterDeploymentResultBatch implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Unique custom message ID. */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** Deployment process id. */
+    private final ServiceDeploymentProcessId depId;
+
+    /** Services deployments results. */
+    @GridToStringInclude
+    private Collection<ServiceClusterDeploymentResult> results;
+
+    /** Services deployment actions to be processed on services deployment process. */
+    @GridToStringExclude
+    @Nullable private transient ServiceDeploymentActions serviceDeploymentActions;
+
+    /**
+     * @param depId Deployment process id.
+     * @param results Services deployments results.
+     */
+    public ServiceClusterDeploymentResultBatch(@NotNull ServiceDeploymentProcessId depId,
+        @NotNull Collection<ServiceClusterDeploymentResult> results) {
+        this.depId = depId;
+        this.results = results;
+    }
+
+    /**
+     * @return Deployment process id.
+     */
+    public ServiceDeploymentProcessId deploymentId() {
+        return depId;
+    }
+
+    /**
+     * @return Services deployments results.
+     */
+    public Collection<ServiceClusterDeploymentResult> results() {
+        return results;
+    }
+
+    /**
+     * @return Services deployment actions to be processed on services deployment process.
+     */
+    @Nullable public ServiceDeploymentActions servicesDeploymentActions() {
+        return serviceDeploymentActions;
+    }
+
+    /**
+     * @param serviceDeploymentActions Services deployment actions to be processed on services deployment process.
+     */
+    public void servicesDeploymentActions(ServiceDeploymentActions serviceDeploymentActions) {
+        this.serviceDeploymentActions = serviceDeploymentActions;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        // No-op.
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
+        DiscoCache discoCache) {
+        // No-op.
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceClusterDeploymentResultBatch.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
new file mode 100644 (file)
index 0000000..b33a646
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Actions of change service state to be processed in the service deployment process.
+ */
+public class ServiceDeploymentActions {
+    /** Whenever it's necessary to deactivate service processor. */
+    private boolean deactivate;
+
+    /** Services info to deploy. */
+    private Map<IgniteUuid, ServiceInfo> servicesToDeploy;
+
+    /** Services info to undeploy. */
+    private Map<IgniteUuid, ServiceInfo> servicesToUndeploy;
+
+    /** Services deployment topologies. */
+    private Map<IgniteUuid, Map<UUID, Integer>> depTops;
+
+    /** Services deployment errors. */
+    private Map<IgniteUuid, Collection<byte[]>> depErrors;
+
+    /**
+     * @param servicesToDeploy Services info to deploy.
+     */
+    public void servicesToDeploy(@NotNull Map<IgniteUuid, ServiceInfo> servicesToDeploy) {
+        this.servicesToDeploy = servicesToDeploy;
+    }
+
+    /**
+     * @return Services info to deploy.
+     */
+    @NotNull public Map<IgniteUuid, ServiceInfo> servicesToDeploy() {
+        return servicesToDeploy != null ? servicesToDeploy : Collections.emptyMap();
+    }
+
+    /**
+     * @param servicesToUndeploy Services info to undeploy.
+     */
+    public void servicesToUndeploy(@NotNull Map<IgniteUuid, ServiceInfo> servicesToUndeploy) {
+        this.servicesToUndeploy = servicesToUndeploy;
+    }
+
+    /**
+     * @return Services info to undeploy.
+     */
+    @NotNull public Map<IgniteUuid, ServiceInfo> servicesToUndeploy() {
+        return servicesToUndeploy != null ? servicesToUndeploy : Collections.emptyMap();
+    }
+
+    /**
+     * @param deactivate Whenever it's necessary to deactivate service processor.
+     */
+    public void deactivate(boolean deactivate) {
+        this.deactivate = deactivate;
+    }
+
+    /**
+     * @return Whenever it's necessary to deactivate service processor.
+     */
+    public boolean deactivate() {
+        return deactivate;
+    }
+
+    /**
+     * @return Deployment topologies.
+     */
+    @NotNull public Map<IgniteUuid, Map<UUID, Integer>> deploymentTopologies() {
+        return depTops != null ? depTops : Collections.emptyMap();
+    }
+
+    /**
+     * @param depTops Deployment topologies.
+     */
+    public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID, Integer>> depTops) {
+        this.depTops = depTops;
+    }
+
+    /**
+     * @return Deployment errors.
+     */
+    @NotNull public Map<IgniteUuid, Collection<byte[]>> deploymentErrors() {
+        return depErrors != null ? depErrors : Collections.emptyMap();
+    }
+
+    /**
+     * @param depErrors Deployment errors.
+     */
+    public void deploymentErrors(@NotNull Map<IgniteUuid, Collection<byte[]>> depErrors) {
+        this.depErrors = depErrors;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java
new file mode 100644 (file)
index 0000000..e2a3add
--- /dev/null
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT;
+import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+import static org.apache.ignite.internal.GridTopic.TOPIC_SERVICES;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+
+/**
+ * Service deployment manager.
+ *
+ * @see ServiceDeploymentTask
+ * @see ServiceDeploymentActions
+ */
+public class ServiceDeploymentManager {
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** Services discovery messages listener. */
+    private final DiscoveryEventListener discoLsnr = new ServiceDiscoveryListener();
+
+    /** Services communication messages listener. */
+    private final GridMessageListener commLsnr = new ServiceCommunicationListener();
+
+    /** Services deployments tasks. */
+    private final Map<ServiceDeploymentProcessId, ServiceDeploymentTask> tasks = new ConcurrentHashMap<>();
+
+    /** Discovery events received while cluster state transition was in progress. */
+    private final List<PendingEventHolder> pendingEvts = new ArrayList<>();
+
+    /** Topology version of latest deployment task's event. */
+    private final AtomicReference<AffinityTopologyVersion> readyTopVer =
+        new AtomicReference<>(AffinityTopologyVersion.NONE);
+
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Deployment worker. */
+    private final ServicesDeploymentWorker depWorker;
+
+    /** Default dump operation limit. */
+    private final long dfltDumpTimeoutLimit;
+
+    /**
+     * @param ctx Grid kernal context.
+     */
+    ServiceDeploymentManager(@NotNull GridKernalContext ctx) {
+        this.ctx = ctx;
+
+        log = ctx.log(getClass());
+
+        ctx.event().addDiscoveryEventListener(discoLsnr,
+            EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT);
+
+        ctx.io().addMessageListener(TOPIC_SERVICES, commLsnr);
+
+        depWorker = new ServicesDeploymentWorker();
+
+        long limit = getLong(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT, 0);
+
+        dfltDumpTimeoutLimit = limit <= 0 ? 30 * 60_000 : limit;
+    }
+
+    /**
+     * Starts processing of services deployments tasks.
+     */
+    void startProcessing() {
+        assert depWorker.runner() == null : "Method shouldn't be called twice during lifecycle;";
+
+        new IgniteThread(ctx.igniteInstanceName(), "services-deployment-worker", depWorker).start();
+    }
+
+    /**
+     * Stops processing of services deployments tasks.
+     *
+     * @param stopErr Cause error of deployment manager stop.
+     */
+    void stopProcessing(IgniteCheckedException stopErr) {
+        busyLock.block(); // Will not release it.
+
+        ctx.event().removeDiscoveryEventListener(discoLsnr);
+
+        ctx.io().removeMessageListener(commLsnr);
+
+        U.cancel(depWorker);
+
+        U.join(depWorker, log);
+
+        depWorker.tasksQueue.clear();
+
+        pendingEvts.clear();
+
+        tasks.values().forEach(t -> t.completeError(stopErr));
+
+        tasks.clear();
+    }
+
+    /**
+     * @return Ready topology version.
+     */
+    public AffinityTopologyVersion readyTopologyVersion() {
+        return readyTopVer.get();
+    }
+
+    /**
+     * Special handler for local discovery events for which the regular events are not generated, e.g. local join and
+     * client reconnect events.
+     *
+     * @param evt Discovery event.
+     * @param discoCache Discovery cache.
+     * @param depActions Service deployment actions.
+     */
+    void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache, ServiceDeploymentActions depActions) {
+        checkClusterStateAndAddTask(evt, discoCache, depActions);
+    }
+
+    /**
+     * Invokes {@link GridWorker#blockingSectionBegin()} for service deployment worker.
+     * <p/>
+     * Should be called from service deployment worker thread.
+     */
+    void deployerBlockingSectionBegin() {
+        assert depWorker != null && Thread.currentThread() == depWorker.runner();
+
+        depWorker.blockingSectionBegin();
+    }
+
+    /**
+     * Invokes {@link GridWorker#blockingSectionEnd()} for service deployment worker.
+     * <p/>
+     * Should be called from service deployment worker thread.
+     */
+    void deployerBlockingSectionEnd() {
+        assert depWorker != null && Thread.currentThread() == depWorker.runner();
+
+        depWorker.blockingSectionEnd();
+    }
+
+    /**
+     * Checks cluster state and handles given event.
+     * <pre>
+     * - if cluster is active, then adds event in deployment queue;
+     * - if cluster state in transition, them adds to pending events;
+     * - if cluster is inactive, then ignore event;
+     * </pre>
+     * <b>Should be called from discovery thread.</b>
+     *
+     * @param evt Discovery event.
+     * @param discoCache Discovery cache.
+     * @param depActions Services deployment actions.
+     */
+    private void checkClusterStateAndAddTask(@NotNull DiscoveryEvent evt, @NotNull DiscoCache discoCache,
+        @Nullable ServiceDeploymentActions depActions) {
+        if (discoCache.state().transition())
+            pendingEvts.add(new PendingEventHolder(evt, discoCache.version(), depActions));
+        else if (discoCache.state().active())
+            addTask(evt, discoCache.version(), depActions);
+        else if (log.isDebugEnabled())
+            log.debug("Ignore event, cluster is inactive, evt=" + evt);
+    }
+
+    /**
+     * Adds deployment task with given deployment process id.
+     * </p>
+     * <b>Should be called from discovery thread.</b>
+     *
+     * @param evt Discovery event.
+     * @param topVer Topology version.
+     * @param depActions Services deployment actions.
+     */
+    private void addTask(@NotNull DiscoveryEvent evt, @NotNull AffinityTopologyVersion topVer,
+        @Nullable ServiceDeploymentActions depActions) {
+        final ServiceDeploymentProcessId depId = deploymentId(evt, topVer);
+
+        ServiceDeploymentTask task = tasks.computeIfAbsent(depId,
+            t -> new ServiceDeploymentTask(ctx, depId));
+
+        if (!task.onEnqueued()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Service deployment process hasn't been started for discovery event, because of " +
+                    "a task with the same deployment process id is already added (possible cause is message's" +
+                    " double delivering), evt=" + evt);
+            }
+
+            return;
+        }
+
+        assert task.event() == null && task.topologyVersion() == null;
+
+        task.onEvent(evt, topVer, depActions);
+
+        depWorker.tasksQueue.add(task);
+    }
+
+    /**
+     * Creates service deployment process id.
+     *
+     * @param evt Discovery event.
+     * @param topVer Topology version.
+     * @return Services deployment process id.
+     */
+    private ServiceDeploymentProcessId deploymentId(@NotNull DiscoveryEvent evt,
+        @NotNull AffinityTopologyVersion topVer) {
+        return evt instanceof DiscoveryCustomEvent ?
+            new ServiceDeploymentProcessId(((DiscoveryCustomEvent)evt).customMessage().id()) :
+            new ServiceDeploymentProcessId(topVer);
+    }
+
+    /**
+     * Clones some instances of {@link DiscoveryCustomEvent} to capture necessary data, to avoid custom messages's
+     * nullifying by {@link GridDhtPartitionsExchangeFuture#onDone}.
+     *
+     * @param evt Discovery event.
+     * @return Discovery event to process.
+     */
+    private DiscoveryCustomEvent copyIfNeeded(@NotNull DiscoveryCustomEvent evt) {
+        DiscoveryCustomMessage msg = evt.customMessage();
+
+        assert msg != null : "DiscoveryCustomMessage has been nullified concurrently, evt=" + evt;
+
+        if (msg instanceof ServiceChangeBatchRequest)
+            return evt;
+
+        DiscoveryCustomEvent cp = new DiscoveryCustomEvent();
+
+        cp.node(evt.node());
+        cp.customMessage(msg);
+        cp.eventNode(evt.eventNode());
+        cp.affinityTopologyVersion(evt.affinityTopologyVersion());
+
+        return cp;
+    }
+
+    /**
+     * Services discovery messages high priority listener.
+     * <p/>
+     * The listener should be notified earlier then PME's listener because of a custom message of {@link
+     * DiscoveryCustomEvent} may be nullified in PME before the listener will be able to capture it.
+     */
+    private class ServiceDiscoveryListener implements DiscoveryEventListener, HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(final DiscoveryEvent evt, final DiscoCache discoCache) {
+            if (!enterBusy())
+                return;
+
+            final UUID snd = evt.eventNode().id();
+            final int evtType = evt.type();
+
+            assert snd != null : "Event's node id shouldn't be null.";
+            assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED
+                || evtType == EVT_DISCOVERY_CUSTOM_EVT : "Unexpected event was received, evt=" + evt;
+
+            try {
+                if (evtType == EVT_DISCOVERY_CUSTOM_EVT) {
+                    DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+                    if (msg instanceof ChangeGlobalStateFinishMessage) {
+                        ChangeGlobalStateFinishMessage msg0 = (ChangeGlobalStateFinishMessage)msg;
+
+                        if (msg0.clusterActive())
+                            pendingEvts.forEach(t -> addTask(t.evt, t.topVer, t.depActions));
+                        else if (log.isDebugEnabled())
+                            pendingEvts.forEach(t -> log.debug("Ignore event, cluster is inactive: " + t.evt));
+
+                        pendingEvts.clear();
+                    }
+                    else {
+                        if (msg instanceof ServiceClusterDeploymentResultBatch) {
+                            ServiceClusterDeploymentResultBatch msg0 = (ServiceClusterDeploymentResultBatch)msg;
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Received services full deployments message : " +
+                                    "[locId=" + ctx.localNodeId() + ", snd=" + snd + ", msg=" + msg0 + ']');
+                            }
+
+                            ServiceDeploymentProcessId depId = msg0.deploymentId();
+
+                            assert depId != null;
+
+                            ServiceDeploymentTask task = tasks.get(depId);
+
+                            if (task != null) // May be null in case of double delivering
+                                task.onReceiveFullDeploymentsMessage(msg0);
+                        }
+                        else if (msg instanceof CacheAffinityChangeMessage)
+                            addTask(copyIfNeeded((DiscoveryCustomEvent)evt), discoCache.version(), null);
+                        else {
+                            ServiceDeploymentActions depActions = null;
+
+                            if (msg instanceof ChangeGlobalStateMessage)
+                                depActions = ((ChangeGlobalStateMessage)msg).servicesDeploymentActions();
+                            else if (msg instanceof ServiceChangeBatchRequest) {
+                                depActions = ((ServiceChangeBatchRequest)msg)
+                                    .servicesDeploymentActions();
+                            }
+                            else if (msg instanceof DynamicCacheChangeBatch)
+                                depActions = ((DynamicCacheChangeBatch)msg).servicesDeploymentActions();
+
+                            if (depActions != null)
+                                addTask(copyIfNeeded((DiscoveryCustomEvent)evt), discoCache.version(), depActions);
+                        }
+                    }
+                }
+                else {
+                    if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED)
+                        tasks.values().forEach(t -> t.onNodeLeft(snd));
+
+                    checkClusterStateAndAddTask(evt, discoCache, null);
+                }
+            }
+            finally {
+                leaveBusy();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 0;
+        }
+    }
+
+    /**
+     * Pending event's holder.
+     */
+    private static class PendingEventHolder {
+        /** Discovery event. */
+        private DiscoveryEvent evt;
+
+        /** Topology version. */
+        private AffinityTopologyVersion topVer;
+
+        /** Services deployemnt actions. */
+        private ServiceDeploymentActions depActions;
+
+        /**
+         * @param evt Discovery event.
+         * @param topVer Topology version.
+         * @param depActions Services deployment actions.
+         */
+        private PendingEventHolder(DiscoveryEvent evt,
+            AffinityTopologyVersion topVer, ServiceDeploymentActions depActions) {
+            this.evt = evt;
+            this.topVer = topVer;
+            this.depActions = depActions;
+        }
+    }
+
+    /**
+     * Services messages communication listener.
+     */
+    private class ServiceCommunicationListener implements GridMessageListener {
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (!enterBusy())
+                return;
+
+            try {
+                if (msg instanceof ServiceSingleNodeDeploymentResultBatch) {
+                    ServiceSingleNodeDeploymentResultBatch msg0 = (ServiceSingleNodeDeploymentResultBatch)msg;
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received services single deployments message : " +
+                            "[locId=" + ctx.localNodeId() + ", snd=" + nodeId + ", msg=" + msg0 + ']');
+                    }
+
+                    tasks.computeIfAbsent(msg0.deploymentId(),
+                        t -> new ServiceDeploymentTask(ctx, msg0.deploymentId()))
+                        .onReceiveSingleDeploymentsMessage(nodeId, msg0);
+                }
+            }
+            finally {
+                leaveBusy();
+            }
+        }
+    }
+
+    /**
+     * Services deployment worker.
+     */
+    private class ServicesDeploymentWorker extends GridWorker {
+        /** Queue to process. */
+        private final LinkedBlockingQueue<ServiceDeploymentTask> tasksQueue = new LinkedBlockingQueue<>();
+
+        /** {@inheritDoc} */
+        private ServicesDeploymentWorker() {
+            super(ctx.igniteInstanceName(), "services-deployment-worker",
+                ServiceDeploymentManager.this.log, ctx.workersRegistry());
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            Throwable err = null;
+
+            try {
+                ServiceDeploymentTask task;
+
+                while (!isCancelled()) {
+                    onIdle();
+
+                    blockingSectionBegin();
+
+                    try {
+                        task = tasksQueue.take();
+                    }
+                    finally {
+                        blockingSectionEnd();
+                    }
+
+                    if (isCancelled())
+                        Thread.currentThread().interrupt();
+
+                    task.init();
+
+                    final long dumpTimeout = 2 * ctx.config().getNetworkTimeout();
+
+                    long dumpCnt = 0;
+                    long nextDumpTime = 0;
+
+                    while (true) {
+                        try {
+                            blockingSectionBegin();
+
+                            try {
+                                task.waitForComplete(dumpTimeout);
+                            }
+                            finally {
+                                blockingSectionEnd();
+                            }
+
+                            taskPostProcessing(task);
+
+                            break;
+                        }
+                        catch (IgniteFutureTimeoutCheckedException ignored) {
+                            if (isCancelled())
+                                return;
+
+                            if (nextDumpTime <= U.currentTimeMillis()) {
+                                log.warning("Failed to wait service deployment process or timeout had been" +
+                                    " reached, timeout=" + dumpTimeout + ", task=" + task);
+
+                                long nextTimeout = dumpTimeout * (2 + dumpCnt++);
+
+                                nextDumpTime = U.currentTimeMillis() + Math.min(nextTimeout, dfltDumpTimeoutLimit);
+                            }
+                        }
+                        catch (ClusterTopologyServerNotFoundException e) {
+                            U.error(log, e);
+
+                            taskPostProcessing(task);
+
+                            break;
+                        }
+                    }
+                }
+            }
+            catch (InterruptedException | IgniteInterruptedCheckedException e) {
+                Thread.currentThread().interrupt();
+
+                if (!isCancelled())
+                    err = e;
+            }
+            catch (Throwable t) {
+                err = t;
+            }
+            finally {
+                if (err == null && !isCancelled())
+                    err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly.");
+
+                if (err instanceof OutOfMemoryError)
+                    ctx.failure().process(new FailureContext(CRITICAL_ERROR, err));
+                else if (err != null)
+                    ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+            }
+        }
+
+        /**
+         * Does additional actions after task's completion.
+         */
+        private void taskPostProcessing(ServiceDeploymentTask task) {
+            AffinityTopologyVersion readyVer = readyTopVer.get();
+
+            readyTopVer.compareAndSet(readyVer, task.topologyVersion());
+
+            tasks.remove(task.deploymentId());
+        }
+    }
+
+    /**
+     * Enters busy state.
+     *
+     * @return {@code true} if entered to busy state.
+     */
+    private boolean enterBusy() {
+        return busyLock.enterBusy();
+    }
+
+    /**
+     * Leaves busy state.
+     */
+    private void leaveBusy() {
+        busyLock.leaveBusy();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessId.java
new file mode 100644 (file)
index 0000000..711c302
--- /dev/null
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Service deployment process' identifier.
+ */
+public class ServiceDeploymentProcessId implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Topology version. */
+    @Nullable private AffinityTopologyVersion topVer;
+
+    /** Request's id. */
+    @Nullable private IgniteUuid reqId;
+
+    /**
+     * Empty constructor for marshalling purposes.
+     */
+    public ServiceDeploymentProcessId() {
+    }
+
+    /**
+     * @param topVer Topology version.
+     */
+    ServiceDeploymentProcessId(@NotNull AffinityTopologyVersion topVer) {
+        this.topVer = topVer;
+    }
+
+    /**
+     * @param reqId Request's id.
+     */
+    ServiceDeploymentProcessId(@NotNull IgniteUuid reqId) {
+        this.reqId = reqId;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Requests id.
+     */
+    public IgniteUuid requestId() {
+        return reqId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeIgniteUuid("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                reqId = reader.readIgniteUuid("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(ServiceDeploymentProcessId.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 167;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ServiceDeploymentProcessId id = (ServiceDeploymentProcessId)o;
+
+        return F.eq(topVer, id.topVer) && F.eq(reqId, id.reqId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Objects.hash(topVer, reqId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceDeploymentProcessId.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentRequest.java
new file mode 100644 (file)
index 0000000..d41e5aa
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Service deployment request.
+ */
+public class ServiceDeploymentRequest extends ServiceChangeAbstractRequest {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Service configuration. */
+    private final ServiceConfiguration cfg;
+
+    /**
+     * @param srvcId Service id.
+     * @param cfg Service configuration.
+     */
+    public ServiceDeploymentRequest(@NotNull IgniteUuid srvcId, @NotNull ServiceConfiguration cfg) {
+        super(srvcId);
+
+        this.cfg = cfg;
+    }
+
+    /**
+     * @return Service configuration.
+     */
+    public ServiceConfiguration configuration() {
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceDeploymentRequest.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
new file mode 100644 (file)
index 0000000..ea0114b
--- /dev/null
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_SERVICES;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SERVICE_POOL;
+
+/**
+ * Services deployment task.
+ *
+ * @see ServiceDeploymentActions
+ * @see ServiceSingleNodeDeploymentResultBatch
+ * @see ServiceClusterDeploymentResultBatch
+ */
+class ServiceDeploymentTask {
+    /** Task's completion future. */
+    private final GridFutureAdapter<?> completeStateFut = new GridFutureAdapter<>();
+
+    /** Task's completion of initialization future. */
+    private final GridFutureAdapter<?> initTaskFut = new GridFutureAdapter<>();
+
+    /** Task's completion of remaining nodes ids initialization future. */
+    private final GridFutureAdapter<?> initCrdFut = new GridFutureAdapter<>();
+
+    /** Coordinator initialization actions mutex. */
+    private final Object initCrdMux = new Object();
+
+    /** Remaining nodes to received services single deployments message. */
+    @GridToStringInclude
+    private final Set<UUID> remaining = new HashSet<>();
+
+    /** Added in deployment queue flag. */
+    private final AtomicBoolean addedInQueue = new AtomicBoolean(false);
+
+    /** Single deployments messages to process. */
+    @GridToStringInclude
+    private final Map<UUID, ServiceSingleNodeDeploymentResultBatch> singleDepsMsgs = new HashMap<>();
+
+    /** Expected services assignments. */
+    @GridToStringExclude
+    private final Map<IgniteUuid, Map<UUID, Integer>> expDeps = new HashMap<>();
+
+    /** Deployment errors. */
+    @GridToStringExclude
+    private final Map<IgniteUuid, Collection<Throwable>> depErrors = new HashMap<>();
+
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Service processor. */
+    private final IgniteServiceProcessor srvcProc;
+
+    /** Deployment process id. */
+    @GridToStringInclude
+    private final ServiceDeploymentProcessId depId;
+
+    /** Coordinator node id. */
+    @GridToStringExclude
+    private volatile UUID crdId;
+
+    /** Cause discovery event. */
+    @GridToStringInclude
+    private volatile DiscoveryEvent evt;
+
+    /** Topology version. */
+    @GridToStringInclude
+    private volatile AffinityTopologyVersion evtTopVer;
+
+    /** Services deployment actions. */
+    private volatile ServiceDeploymentActions depActions;
+
+    /**
+     * @param ctx Kernal context.
+     * @param depId Service deployment process id.
+     */
+    protected ServiceDeploymentTask(GridKernalContext ctx, ServiceDeploymentProcessId depId) {
+        assert ctx.service() instanceof IgniteServiceProcessor;
+
+        this.depId = depId;
+        this.ctx = ctx;
+
+        srvcProc = (IgniteServiceProcessor)ctx.service();
+        log = ctx.log(getClass());
+    }
+
+    /**
+     * Handles discovery event receiving.
+     *
+     * @param evt Discovery event.
+     * @param evtTopVer Topology version.
+     * @param depActions Services deployment actions.
+     */
+    protected void onEvent(@NotNull DiscoveryEvent evt, @NotNull AffinityTopologyVersion evtTopVer,
+        @Nullable ServiceDeploymentActions depActions) {
+        assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED
+            || evt.type() == EVT_DISCOVERY_CUSTOM_EVT : "Unexpected event type, evt=" + evt;
+
+        this.evt = evt;
+        this.evtTopVer = evtTopVer;
+        this.depActions = depActions;
+    }
+
+    /**
+     * Initializes deployment task.
+     *
+     * @throws IgniteCheckedException In case of an error.
+     */
+    protected void init() throws IgniteCheckedException {
+        if (isCompleted() || initTaskFut.isDone())
+            return;
+
+        assert evt != null && evtTopVer != null : "Illegal state to perform task's initialization :" + this;
+
+        if (log.isDebugEnabled()) {
+            log.debug("Started services deployment task init: [depId=" + depId +
+                ", locId=" + ctx.localNodeId() + ", evt=" + evt + ']');
+        }
+
+        try {
+            if (depActions != null && depActions.deactivate()) {
+                srvcProc.onDeActivate(ctx);
+
+                completeSuccess();
+
+                return;
+            }
+
+            if (depActions == null) {
+                Map<IgniteUuid, ServiceInfo> toDeploy = new HashMap<>();
+
+                final int evtType = evt.type();
+
+                if (evtType == EVT_DISCOVERY_CUSTOM_EVT) {
+                    DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
+
+                    if (msg instanceof CacheAffinityChangeMessage) {
+                        CacheAffinityChangeMessage msg0 = (CacheAffinityChangeMessage)msg;
+
+                        Map<IgniteUuid, ServiceInfo> services = srvcProc.deployedServices();
+
+                        if (!services.isEmpty()) {
+                            Map<Integer, Map<Integer, List<UUID>>> change = msg0.assignmentChange();
+
+                            if (change != null) {
+                                Set<String> names = new HashSet<>();
+
+                                ctx.cache().cacheDescriptors().forEach((name, desc) -> {
+                                    if (change.containsKey(desc.groupId()))
+                                        names.add(name);
+                                });
+
+                                services.forEach((srvcId, desc) -> {
+                                    if (names.contains(desc.cacheName()))
+                                        toDeploy.put(srvcId, desc);
+                                });
+                            }
+                        }
+                    }
+                }
+                else {
+                    assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED;
+
+                    final ClusterNode eventNode = evt.eventNode();
+
+                    final Map<IgniteUuid, ServiceInfo> deployedServices = srvcProc.deployedServices();
+
+                    if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED) {
+                        deployedServices.forEach((srvcId, desc) -> {
+                            if (desc.topologySnapshot().containsKey(eventNode.id()) ||
+                                (desc.cacheName() != null && !eventNode.isClient())) // If affinity service
+                                toDeploy.put(srvcId, desc);
+                        });
+                    }
+                    else {
+                        toDeploy.putAll(deployedServices);
+
+                        toDeploy.putAll(srvcProc.servicesReceivedFromJoin(eventNode.id()));
+                    }
+                }
+
+                if (toDeploy.isEmpty()) {
+                    completeSuccess();
+
+                    if (log.isDebugEnabled())
+                        log.debug("No services deployment deployment action required.");
+
+                    return;
+                }
+
+                depActions = new ServiceDeploymentActions();
+
+                depActions.servicesToDeploy(toDeploy);
+            }
+
+            ClusterNode crd = srvcProc.coordinator();
+
+            if (crd == null) {
+                onAllServersLeft();
+
+                return;
+            }
+
+            crdId = crd.id();
+
+            if (crd.isLocal())
+                initCoordinator(evtTopVer);
+
+            processDeploymentActions(depActions);
+        }
+        catch (Exception e) {
+            log.error("Error occurred while initializing deployment task, err=" + e.getMessage(), e);
+
+            completeError(e);
+
+            throw new IgniteCheckedException(e);
+        }
+        finally {
+            if (!initTaskFut.isDone())
+                initTaskFut.onDone();
+
+            if (log.isDebugEnabled()) {
+                log.debug("Finished services deployment future init: [depId=" + deploymentId() +
+                    ", locId=" + ctx.localNodeId() + ']');
+            }
+        }
+    }
+
+    /**
+     * @param depActions Services deployment actions.
+     */
+    @SuppressWarnings("ErrorNotRethrown")
+    private void processDeploymentActions(@NotNull ServiceDeploymentActions depActions) {
+        srvcProc.updateDeployedServices(depActions);
+
+        depActions.servicesToUndeploy().forEach((srvcId, desc) -> {
+            srvcProc.deployment().deployerBlockingSectionBegin();
+
+            try {
+                srvcProc.undeploy(srvcId);
+            }
+            finally {
+                srvcProc.deployment().deployerBlockingSectionEnd();
+            }
+        });
+
+        if (!depActions.servicesToDeploy().isEmpty()) {
+            final Collection<UUID> evtTopNodes = F.nodeIds(ctx.discovery().nodes(evtTopVer));
+
+            depActions.servicesToDeploy().forEach((srvcId, desc) -> {
+                try {
+                    ServiceConfiguration cfg = desc.configuration();
+
+                    TreeMap<UUID, Integer> oldTop = filterDeadNodes(evtTopNodes, desc.topologySnapshot());
+
+                    Map<UUID, Integer> top = reassign(srvcId, cfg, evtTopVer, oldTop);
+
+                    expDeps.put(srvcId, top);
+
+                    Integer expCnt = top.getOrDefault(ctx.localNodeId(), 0);
+
+                    if (expCnt > srvcProc.localInstancesCount(srvcId)) {
+                        srvcProc.deployment().deployerBlockingSectionBegin();
+
+                        try {
+                            srvcProc.redeploy(srvcId, cfg, top);
+                        }
+                        finally {
+                            srvcProc.deployment().deployerBlockingSectionEnd();
+                        }
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    depErrors.computeIfAbsent(srvcId, c -> new ArrayList<>()).add(e);
+                }
+            });
+        }
+
+        createAndSendSingleDeploymentsMessage(depId, depErrors);
+    }
+
+    /**
+     * Prepares the coordinator to manage deployment process.
+     *
+     * @param topVer Topology version to initialize {@link #remaining} collection.
+     */
+    private void initCoordinator(AffinityTopologyVersion topVer) {
+        synchronized (initCrdMux) {
+            if (initCrdFut.isDone())
+                return;
+
+            try {
+                for (ClusterNode node : ctx.discovery().nodes(topVer)) {
+                    if (ctx.discovery().alive(node) && !singleDepsMsgs.containsKey(node.id()))
+                        remaining.add(node.id());
+                }
+            }
+            catch (Exception e) {
+                log.error("Error occurred while initializing remaining collection.", e);
+
+                initCrdFut.onDone(e);
+            }
+            finally {
+                if (!initCrdFut.isDone())
+                    initCrdFut.onDone();
+            }
+        }
+    }
+
+    /**
+     * @param depId Deployment process id.
+     * @param errors Deployment errors.
+     */
+    private void createAndSendSingleDeploymentsMessage(ServiceDeploymentProcessId depId,
+        final Map<IgniteUuid, Collection<Throwable>> errors) {
+        assert crdId != null : "Coordinator should be defined at this point, locId=" + ctx.localNodeId();
+
+        try {
+            Set<IgniteUuid> depServicesIds = new HashSet<>();
+
+            if (evt.type() == EVT_NODE_JOINED) {
+                UUID evtNodeId = evt.eventNode().id();
+
+                expDeps.forEach((srvcId, top) -> {
+                    if (top.containsKey(evtNodeId))
+                        depServicesIds.add(srvcId);
+                });
+            }
+            else
+                depServicesIds.addAll(expDeps.keySet());
+
+            Map<IgniteUuid, ServiceSingleNodeDeploymentResult> results = new HashMap<>();
+
+            for (IgniteUuid srvcId : depServicesIds) {
+                ServiceSingleNodeDeploymentResult depRes = new ServiceSingleNodeDeploymentResult(
+                    srvcProc.localInstancesCount(srvcId));
+
+                attachDeploymentErrors(depRes, errors.get(srvcId));
+
+                results.put(srvcId, depRes);
+            }
+
+            errors.forEach((srvcId, err) -> {
+                if (results.containsKey(srvcId))
+                    return;
+
+                ServiceSingleNodeDeploymentResult depRes = new ServiceSingleNodeDeploymentResult(
+                    srvcProc.localInstancesCount(srvcId));
+
+                attachDeploymentErrors(depRes, err);
+
+                results.put(srvcId, depRes);
+            });
+
+            ServiceSingleNodeDeploymentResultBatch msg = new ServiceSingleNodeDeploymentResultBatch(depId, results);
+
+            if (ctx.localNodeId().equals(crdId))
+                onReceiveSingleDeploymentsMessage(ctx.localNodeId(), msg);
+            else
+                ctx.io().sendToGridTopic(crdId, TOPIC_SERVICES, msg, SERVICE_POOL);
+
+            if (log.isDebugEnabled())
+                log.debug("Send services single deployments message, msg=" + msg);
+        }
+        catch (IgniteCheckedException e) {
+            log.error("Failed to send services single deployments message to coordinator over communication spi.", e);
+        }
+    }
+
+    /**
+     * Handles received single node services map message.
+     *
+     * @param snd Sender node id.
+     * @param msg Single services map message.
+     */
+    protected void onReceiveSingleDeploymentsMessage(UUID snd, ServiceSingleNodeDeploymentResultBatch msg) {
+        assert depId.equals(msg.deploymentId()) : "Wrong message's deployment process id, msg=" + msg;
+
+        initCrdFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> {
+            if (isCompleted())
+                return;
+
+            synchronized (initCrdMux) {
+                if (remaining.remove(snd)) {
+                    singleDepsMsgs.put(snd, msg);
+
+                    if (remaining.isEmpty())
+                        onAllReceived();
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Unexpected service single deployments received, msg=" + msg);
+            }
+        });
+    }
+
+    /**
+     * Handles received full services map message.
+     *
+     * @param msg Full services map message.
+     */
+    protected void onReceiveFullDeploymentsMessage(ServiceClusterDeploymentResultBatch msg) {
+        assert depId.equals(msg.deploymentId()) : "Wrong message's deployment process id, msg=" + msg;
+
+        initTaskFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> {
+            if (isCompleted())
+                return;
+
+            ctx.closure().runLocalSafe(() -> {
+                try {
+                    ServiceDeploymentActions depResults = msg.servicesDeploymentActions();
+
+                    assert depResults != null : "Services deployment actions should be attached.";
+
+                    final Map<IgniteUuid, Map<UUID, Integer>> fullTops = depResults.deploymentTopologies();
+                    final Map<IgniteUuid, Collection<byte[]>> fullErrors = depResults.deploymentErrors();
+
+                    depActions.deploymentTopologies(fullTops);
+                    depActions.deploymentErrors(fullErrors);
+
+                    srvcProc.updateServicesTopologies(fullTops);
+
+                    final Map<IgniteUuid, ServiceInfo> services = srvcProc.deployedServices();
+
+                    fullTops.forEach((srvcId, top) -> {
+                        Integer expCnt = top.getOrDefault(ctx.localNodeId(), 0);
+
+                        if (expCnt < srvcProc.localInstancesCount(srvcId)) { // Undeploy exceed instances
+                            ServiceInfo desc = services.get(srvcId);
+
+                            assert desc != null;
+
+                            ServiceConfiguration cfg = desc.configuration();
+
+                            try {
+                                srvcProc.redeploy(srvcId, cfg, top);
+                            }
+                            catch (IgniteCheckedException e) {
+                                log.error("Error occured during cancel exceed service instances: " +
+                                    "[srvcId=" + srvcId + ", name=" + desc.name() + ']', e);
+                            }
+                        }
+                    });
+
+                    completeSuccess();
+                }
+                catch (Throwable t) {
+                    log.error("Failed to process services full deployments message, msg=" + msg, t);
+
+                    completeError(t);
+                }
+            });
+        });
+    }
+
+    /**
+     * Completes initiating futures.
+     *
+     * @param err Error to complete initiating.
+     */
+    private void completeInitiatingFuture(final Throwable err) {
+        if (depActions == null)
+            return;
+
+        depActions.servicesToDeploy().forEach((srvcId, desc) -> {
+            if (err != null) {
+                srvcProc.completeInitiatingFuture(true, srvcId, err);
+
+                return;
+            }
+
+            Collection<byte[]> errors = depActions.deploymentErrors().get(srvcId);
+
+            if (errors == null) {
+                srvcProc.completeInitiatingFuture(true, srvcId, null);
+
+                return;
+            }
+
+            Throwable depErr = null;
+
+            for (byte[] error : errors) {
+                try {
+                    Throwable t = U.unmarshal(ctx, error, null);
+
+                    if (depErr == null)
+                        depErr = t;
+                    else
+                        depErr.addSuppressed(t);
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Failed to unmarshal deployment error.", e);
+                }
+            }
+
+            srvcProc.completeInitiatingFuture(true, srvcId, depErr);
+        });
+
+        for (IgniteUuid reqSrvcId : depActions.servicesToUndeploy().keySet())
+            srvcProc.completeInitiatingFuture(false, reqSrvcId, err);
+    }
+
+    /**
+     * Creates services full deployments message and send it over discovery.
+     */
+    private void onAllReceived() {
+        assert !isCompleted();
+
+        Collection<ServiceClusterDeploymentResult> fullResults = buildFullDeploymentsResults(singleDepsMsgs);
+
+        try {
+            ServiceClusterDeploymentResultBatch msg = new ServiceClusterDeploymentResultBatch(depId, fullResults);
+
+            ctx.discovery().sendCustomEvent(msg);
+        }
+        catch (IgniteCheckedException e) {
+            log.error("Failed to send services full deployments message across the ring.", e);
+        }
+    }
+
+    /**
+     * Reassigns service to nodes.
+     *
+     * @param cfg Service configuration.
+     * @param topVer Topology version.
+     * @param oldTop Previous topology snapshot.
+     * @throws IgniteCheckedException In case of an error.
+     */
+    private Map<UUID, Integer> reassign(IgniteUuid srvcId, ServiceConfiguration cfg,
+        AffinityTopologyVersion topVer, TreeMap<UUID, Integer> oldTop) throws IgniteCheckedException {
+        try {
+            Map<UUID, Integer> top = srvcProc.reassign(srvcId, cfg, topVer, oldTop);
+
+            if (top.isEmpty())
+                throw new IgniteCheckedException("Failed to determine suitable nodes to deploy service.");
+
+            if (log.isDebugEnabled())
+                log.debug("Calculated service assignment : [srvcId=" + srvcId + ", srvcTop=" + top + ']');
+
+            return top;
+        }
+        catch (Throwable e) {
+            throw new IgniteCheckedException("Failed to calculate assignments for service, cfg=" + cfg, e);
+        }
+    }
+
+    /**
+     * Filters dead nodes from given service topology snapshot using given ids.
+     *
+     * @param evtTopNodes Ids being used to filter.
+     * @param top Service topology snapshot.
+     * @return Filtered service topology snapshot.
+     */
+    private TreeMap<UUID, Integer> filterDeadNodes(Collection<UUID> evtTopNodes, Map<UUID, Integer> top) {
+        TreeMap<UUID, Integer> filtered = new TreeMap<>();
+
+        if (F.isEmpty(top))
+            return filtered;
+
+        top.forEach((nodeId, cnt) -> {
+            // We can't just use 'ctx.discovery().alive(UUID)', because during the deployment process discovery
+            // topology may be changed and results may be different on some set of nodes.
+            if (evtTopNodes.contains(nodeId))
+                filtered.put(nodeId, cnt);
+        });
+
+        return filtered;
+    }
+
+    /**
+     * Processes single deployments messages to build full deployment results.
+     *
+     * @param singleDepsMsgs Services single deployments messages.
+     * @return Services full deployments results.
+     */
+    private Collection<ServiceClusterDeploymentResult> buildFullDeploymentsResults(
+        Map<UUID, ServiceSingleNodeDeploymentResultBatch> singleDepsMsgs) {
+        final Map<IgniteUuid, Map<UUID, ServiceSingleNodeDeploymentResult>> singleResults = new HashMap<>();
+
+        singleDepsMsgs.forEach((nodeId, msg) -> msg.results().forEach((srvcId, res) -> {
+            Map<UUID, ServiceSingleNodeDeploymentResult> depResults = singleResults
+                .computeIfAbsent(srvcId, r -> new HashMap<>());
+
+            int cnt = res.count();
+
+            if (cnt != 0) {
+                Map<UUID, Integer> expTop = expDeps.get(srvcId);
+
+                if (expTop != null) {
+                    Integer expCnt = expTop.get(nodeId);
+
+                    cnt = expCnt == null ? 0 : Math.min(cnt, expCnt);
+                }
+            }
+
+            if (cnt == 0 && res.errors().isEmpty())
+                return;
+
+            ServiceSingleNodeDeploymentResult singleDepRes = new ServiceSingleNodeDeploymentResult(cnt);
+
+            if (!res.errors().isEmpty())
+                singleDepRes.errors(res.errors());
+
+            depResults.put(nodeId, singleDepRes);
+        }));
+
+        final Collection<ServiceClusterDeploymentResult> fullResults = new ArrayList<>();
+
+        singleResults.forEach((srvcId, dep) -> {
+            ServiceClusterDeploymentResult res = new ServiceClusterDeploymentResult(srvcId, dep);
+
+            fullResults.add(res);
+        });
+
+        return fullResults;
+    }
+
+    /**
+     * @param depRes Service single deployments results.
+     * @param errors Deployment errors.
+     */
+    private void attachDeploymentErrors(@NotNull ServiceSingleNodeDeploymentResult depRes,
+        @Nullable Collection<Throwable> errors) {
+        if (F.isEmpty(errors))
+            return;
+
+        Collection<byte[]> errorsBytes = new ArrayList<>();
+
+        for (Throwable th : errors) {
+            try {
+                byte[] arr = U.marshal(ctx, th);
+
+                errorsBytes.add(arr);
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to marshal deployment error, err=" + th, e);
+            }
+        }
+
+        depRes.errors(errorsBytes);
+    }
+
+    /**
+     * Handles a node leaves topology.
+     *
+     * @param nodeId Left node id.
+     */
+    protected void onNodeLeft(UUID nodeId) {
+        initTaskFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> {
+            if (isCompleted())
+                return;
+
+            final boolean crdChanged = nodeId.equals(crdId);
+
+            if (crdChanged) {
+                ClusterNode crd = srvcProc.coordinator();
+
+                if (crd != null) {
+                    crdId = crd.id();
+
+                    if (crd.isLocal())
+                        initCoordinator(evtTopVer);
+
+                    createAndSendSingleDeploymentsMessage(depId, depErrors);
+                }
+                else
+                    onAllServersLeft();
+            }
+            else if (ctx.localNodeId().equals(crdId)) {
+                synchronized (initCrdMux) {
+                    boolean rmvd = remaining.remove(nodeId);
+
+                    if (rmvd && remaining.isEmpty()) {
+                        singleDepsMsgs.remove(nodeId);
+
+                        onAllReceived();
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * Handles case when all server nodes have left the grid.
+     */
+    private void onAllServersLeft() {
+        assert ctx.clientNode();
+
+        completeError(new ClusterTopologyServerNotFoundException("Failed to resolve coordinator to continue services " +
+            "deployment process: [locId=" + ctx.localNodeId() + "client=" + ctx.clientNode() + "evt=" + evt + ']'));
+    }
+
+    /**
+     * @return Cause discovery event.
+     */
+    public DiscoveryEvent event() {
+        return evt;
+    }
+
+    /**
+     * Returns cause of deployment process topology version.
+     *
+     * @return Cause of deployment process topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return evtTopVer;
+    }
+
+    /**
+     * Returns services deployment process id of the task.
+     *
+     * @return Services deployment process id.
+     */
+    public ServiceDeploymentProcessId deploymentId() {
+        return depId;
+    }
+
+    /**
+     * Completes the task.
+     */
+    public void completeSuccess() {
+        if (!completeStateFut.isDone()) {
+            completeInitiatingFuture(null);
+
+            completeStateFut.onDone();
+        }
+
+        if (!initTaskFut.isDone())
+            initTaskFut.onDone();
+
+        if (!initCrdFut.isDone())
+            initCrdFut.onDone();
+    }
+
+    /**
+     * @param err Error to complete with.
+     */
+    public void completeError(Throwable err) {
+        if (!completeStateFut.isDone()) {
+            completeInitiatingFuture(err);
+
+            completeStateFut.onDone(err);
+        }
+
+        if (!initTaskFut.isDone())
+            initTaskFut.onDone(err);
+
+        if (!initCrdFut.isDone())
+            initCrdFut.onDone(err);
+    }
+
+    /**
+     * Returns if the task completed.
+     *
+     * @return {@code true} if the task completed, otherwise {@code false}.
+     */
+    protected boolean isCompleted() {
+        return completeStateFut.isDone();
+    }
+
+    /**
+     * Synchronously waits for completion of the task for up to the given timeout.
+     *
+     * @param timeout The maximum time to wait in milliseconds.
+     * @throws IgniteCheckedException In case of an error.
+     */
+    protected void waitForComplete(long timeout) throws IgniteCheckedException {
+        completeStateFut.get(timeout);
+    }
+
+    /**
+     * Handles when this task is being added in deployment queue.
+     * <p/>
+     * Introduced to avoid overhead on calling of {@link Collection#contains(Object)}}.
+     *
+     * @return {@code true} if task is has not been added previously, otherwise {@code false}.
+     */
+    protected boolean onEnqueued() {
+        return addedInQueue.compareAndSet(false, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ServiceDeploymentTask task = (ServiceDeploymentTask)o;
+
+        return depId.equals(task.depId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return depId.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceDeploymentTask.class, this,
+            "locNodeId", (ctx != null ? ctx.localNodeId() : "unknown"),
+            "crdId", crdId);
+    }
+}
\ No newline at end of file
index 4db44cb..2e14b94 100644 (file)
@@ -29,7 +29,11 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Service descriptor.
+ *
+ * @deprecated This implementation is based on {@code GridServiceDeployment} which has been deprecated because of
+ * services internals use messages for deployment management instead of the utility cache, since Ignite 2.8.
  */
+@Deprecated
 public class ServiceDescriptorImpl implements ServiceDescriptor {
     /** */
     private static final long serialVersionUID = 0L;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
new file mode 100644 (file)
index 0000000..5a3aefc
--- /dev/null
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceDescriptor;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Service's information container.
+ */
+public class ServiceInfo implements ServiceDescriptor {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Origin node ID. */
+    private final UUID originNodeId;
+
+    /** Service id. */
+    private final IgniteUuid srvcId;
+
+    /** Service configuration. */
+    private final ServiceConfiguration cfg;
+
+    /** Statically configured flag. */
+    private final boolean staticCfg;
+
+    /** Topology snapshot. */
+    @GridToStringInclude
+    private volatile Map<UUID, Integer> top = Collections.emptyMap();
+
+    /**
+     * @param originNodeId Initiating node id.
+     * @param srvcId Service id.
+     * @param cfg Service configuration.
+     */
+    public ServiceInfo(@NotNull UUID originNodeId, @NotNull IgniteUuid srvcId, @NotNull ServiceConfiguration cfg) {
+        this(originNodeId, srvcId, cfg, false);
+    }
+
+    /**
+     * @param originNodeId Initiating node id.
+     * @param srvcId Service id.
+     * @param cfg Service configuration.
+     * @param staticCfg Statically configured flag.
+     */
+    public ServiceInfo(@NotNull UUID originNodeId, @NotNull IgniteUuid srvcId, @NotNull ServiceConfiguration cfg,
+        boolean staticCfg) {
+        this.originNodeId = originNodeId;
+        this.srvcId = srvcId;
+        this.cfg = cfg;
+        this.staticCfg = staticCfg;
+    }
+
+    /**
+     * Sets service's new topology snapshot.
+     *
+     * @param top Topology snapshot.
+     */
+    public void topologySnapshot(@NotNull Map<UUID, Integer> top) {
+        this.top = top;
+    }
+
+    /**
+     * Returns service's configuration.
+     *
+     * @return Service configuration.
+     */
+    public ServiceConfiguration configuration() {
+        return cfg;
+    }
+
+    /**
+     * @return {@code true} if statically configured.
+     */
+    public boolean staticallyConfigured() {
+        return staticCfg;
+    }
+
+    /**
+     * Rerurns services id.
+     *
+     * @return Service id.
+     */
+    public IgniteUuid serviceId() {
+        return srvcId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return cfg.getName();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public Class<? extends Service> serviceClass() {
+        if (cfg instanceof LazyServiceConfiguration) {
+            String clsName = ((LazyServiceConfiguration)cfg).serviceClassName();
+
+            try {
+                return (Class<? extends Service>)Class.forName(clsName);
+            }
+            catch (ClassNotFoundException e) {
+                throw new IgniteException("Failed to find service class: " + clsName, e);
+            }
+        }
+        else
+            return cfg.getService().getClass();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int totalCount() {
+        return cfg.getTotalCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int maxPerNodeCount() {
+        return cfg.getMaxPerNodeCount();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String cacheName() {
+        return cfg.getCacheName();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Nullable @Override public <K> K affinityKey() {
+        return (K)cfg.getAffinityKey();
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID originNodeId() {
+        return originNodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<UUID, Integer> topologySnapshot() {
+        return Collections.unmodifiableMap(top);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceInfo.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java
new file mode 100644 (file)
index 0000000..2fea452
--- /dev/null
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceDescriptor;
+
+/**
+ * Adapter for different service processor implementations.
+ */
+public abstract class ServiceProcessorAdapter extends GridProcessorAdapter {
+    /**
+     * @param ctx Kernal context.
+     */
+    protected ServiceProcessorAdapter(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /**
+     * @param prj Grid projection.
+     * @param name Service name.
+     * @param srvc Service.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service srvc);
+
+    /**
+     * @param name Service name.
+     * @param srvc Service instance.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service srvc);
+
+    /**
+     * @param name Service name.
+     * @param srvc Service.
+     * @param totalCnt Total count.
+     * @param maxPerNodeCnt Max per-node count.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service srvc, int totalCnt,
+        int maxPerNodeCnt);
+
+    /**
+     * @param name Service name.
+     * @param srvc Service.
+     * @param cacheName Cache name.
+     * @param affKey Affinity key.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service srvc, String cacheName,
+        Object affKey);
+
+    /**
+     * @param prj Grid projection.
+     * @param cfgs Service configurations.
+     * @return Future for deployment.
+     */
+    public abstract IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs);
+
+    /**
+     * @param name Service name.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> cancel(String name);
+
+    /**
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> cancelAll();
+
+    /**
+     * @param servicesNames Name of services to deploy.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> cancelAll(Collection<String> servicesNames);
+
+    /**
+     * @return Collection of service descriptors.
+     */
+    public abstract Collection<ServiceDescriptor> serviceDescriptors();
+
+    /**
+     * @param name Service name.
+     * @param <T> Service type.
+     * @return Service by specified service name.
+     */
+    public abstract <T> T service(String name);
+
+    /**
+     * @param prj Grid projection.
+     * @param name Service name.
+     * @param srvcCls Service class.
+     * @param sticky Whether multi-node request should be done.
+     * @param timeout If greater than 0 limits service acquire time. Cannot be negative.
+     * @param <T> Service interface type.
+     * @return The proxy of a service by its name and class.
+     * @throws IgniteException If failed to create proxy.
+     */
+    public abstract <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> srvcCls, boolean sticky,
+        long timeout) throws IgniteException;
+
+    /**
+     * @param name Service name.
+     * @param <T> Service type.
+     * @return Services by specified service name.
+     */
+    public abstract <T> Collection<T> services(String name);
+
+    /**
+     * @param name Service name.
+     * @return Service by specified service name.
+     */
+    public abstract ServiceContextImpl serviceContext(String name);
+
+    /**
+     * @param name Service name.
+     * @param timeout If greater than 0 limits task execution time. Cannot be negative.
+     * @return Service topology.
+     * @throws IgniteCheckedException On error.
+     */
+    public abstract Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException;
+
+    /**
+     * Callback for local join events for which the regular events are not generated.
+     * <p/>
+     * Local join event is expected in cases of joining to topology or client reconnect.
+     *
+     * @param evt Discovery event.
+     * @param discoCache Discovery cache.
+     */
+    public void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache) {
+        // No-op.
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorCommonDiscoveryData.java
new file mode 100644 (file)
index 0000000..7f9fc83
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Initial data container to be sent to newly joining node for initialization of {@link IgniteServiceProcessor}.
+ */
+class ServiceProcessorCommonDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Clusters registered services descriptors. */
+    private final ArrayList<ServiceInfo> registeredServices;
+
+    /**
+     * @param registeredServices Clusters registered services descriptors.
+     */
+    public ServiceProcessorCommonDiscoveryData(@NotNull ArrayList<ServiceInfo> registeredServices) {
+        this.registeredServices = registeredServices;
+    }
+
+    /**
+     * Returns clusters registered services descriptors.
+     *
+     * @return Registered services descriptors.
+     */
+    public ArrayList<ServiceInfo> registeredServices() {
+        return registeredServices;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceProcessorCommonDiscoveryData.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorJoinNodeDiscoveryData.java
new file mode 100644 (file)
index 0000000..5a8473d
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Initial data of {@link IgniteServiceProcessor} to send in cluster on joining node.
+ */
+public class ServiceProcessorJoinNodeDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Static services configurations info. */
+    public final ArrayList<ServiceInfo> staticServicesInfo;
+
+    /**
+     * @param staticServicesInfo Static services configurations info.
+     */
+    public ServiceProcessorJoinNodeDiscoveryData(@NotNull ArrayList<ServiceInfo> staticServicesInfo) {
+        this.staticServicesInfo = staticServicesInfo;
+    }
+
+    /**
+     * @return Static services configurations info.
+     */
+    public ArrayList<ServiceInfo> services() {
+        return staticServicesInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceProcessorJoinNodeDiscoveryData.class, this);
+    }
+}
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java
new file mode 100644 (file)
index 0000000..1f003d9
--- /dev/null
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType.BYTE_ARR;
+
+/**
+ * Service single node deployment result.
+ * <p/>
+ * Contains count of deployed service instances on single node and deployment errors if exist.
+ */
+public class ServiceSingleNodeDeploymentResult implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Count of service's instances. */
+    private int cnt;
+
+    /** Serialized exceptions. */
+    private Collection<byte[]> errors;
+
+    /**
+     * Empty constructor for marshalling purposes.
+     */
+    public ServiceSingleNodeDeploymentResult() {
+    }
+
+    /**
+     * @param cnt Count of service's instances.
+     */
+    public ServiceSingleNodeDeploymentResult(int cnt) {
+        this.cnt = cnt;
+    }
+
+    /**
+     * @return Count of service's instances.
+     */
+    public int count() {
+        return cnt;
+    }
+
+    /**
+     * @param cnt Count of service's instances.
+     */
+    public void count(int cnt) {
+        this.cnt = cnt;
+    }
+
+    /**
+     * @return Serialized exceptions.
+     */
+    @NotNull public Collection<byte[]> errors() {
+        return errors != null ? errors : Collections.emptyList();
+    }
+
+    /**
+     * @param errors Serialized exceptions.
+     */
+    public void errors(Collection<byte[]> errors) {
+        this.errors = errors;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeInt("cnt", cnt))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeCollection("errors", errors, BYTE_ARR))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cnt = reader.readInt("cnt");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                errors = reader.readCollection("errors", BYTE_ARR);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(ServiceSingleNodeDeploymentResult.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 169;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceSingleNodeDeploymentResult.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResultBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResultBatch.java
new file mode 100644 (file)
index 0000000..320c5a9
--- /dev/null
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType.IGNITE_UUID;
+import static org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType.MSG;
+
+/**
+ * Batch of service single node deployment result.
+ * <p/>
+ * Contains collection of {@link ServiceSingleNodeDeploymentResult} mapped services ids.
+ */
+public class ServiceSingleNodeDeploymentResultBatch implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Deployment process id. */
+    @GridToStringInclude
+    private ServiceDeploymentProcessId depId;
+
+    /** Services deployments results. */
+    @GridToStringInclude
+    private Map<IgniteUuid, ServiceSingleNodeDeploymentResult> results;
+
+    /**
+     * Empty constructor for marshalling purposes.
+     */
+    public ServiceSingleNodeDeploymentResultBatch() {
+    }
+
+    /**
+     * @param depId Deployment process id.
+     * @param results Services deployments results.
+     */
+    public ServiceSingleNodeDeploymentResultBatch(@NotNull ServiceDeploymentProcessId depId,
+        @NotNull Map<IgniteUuid, ServiceSingleNodeDeploymentResult> results) {
+        this.depId = depId;
+        this.results = results;
+    }
+
+    /**
+     * @return Services deployments results.
+     */
+    public Map<IgniteUuid, ServiceSingleNodeDeploymentResult> results() {
+        return results;
+    }
+
+    /**
+     * @return Deployment process id.
+     */
+    public ServiceDeploymentProcessId deploymentId() {
+        return depId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMessage("depId", depId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeMap("results", results, IGNITE_UUID, MSG))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                depId = reader.readMessage("depId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                results = reader.readMap("results", IGNITE_UUID, MSG, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(ServiceSingleNodeDeploymentResultBatch.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 168;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceSingleNodeDeploymentResultBatch.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceUndeploymentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceUndeploymentRequest.java
new file mode 100644 (file)
index 0000000..5482362
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Service undeployment request.
+ */
+public class ServiceUndeploymentRequest extends ServiceChangeAbstractRequest {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param srvcId Service id.
+     */
+    public ServiceUndeploymentRequest(@NotNull IgniteUuid srvcId) {
+        super(srvcId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceUndeploymentRequest.class, this);
+    }
+}
index 37a794b..a517197 100644 (file)
@@ -181,6 +181,8 @@ public class ServiceConfiguration implements Serializable {
      * Gets cache name used for key-to-node affinity calculation.
      * <p>
      * This parameter is optional and is set only when deploying service based on key-affinity.
+     * <p/>
+     * <b>NOTE:</b> If the cache is destroyed, the service will be undeployed automatically.
      *
      * @return Cache name, possibly {@code null}.
      */
index e61d30a..0edb497 100644 (file)
@@ -150,6 +150,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -163,6 +164,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_COMPACT_FOOTER;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
 import static org.apache.ignite.spi.IgnitePortProtocol.TCP;
 import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.AUTH_FAILED;
 import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.CHECK_FAILED;
@@ -4044,6 +4046,48 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
+                final Boolean locSrvcProcModeAttr = locNode.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED);
+                // Can be null only in module tests of discovery spi (without node startup).
+                final Boolean locSrvcProcMode = locSrvcProcModeAttr != null ? locSrvcProcModeAttr : false;
+
+                final Boolean rmtSrvcProcModeAttr = node.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED);
+                final boolean rmtSrvcProcMode = rmtSrvcProcModeAttr != null ? rmtSrvcProcModeAttr : false;
+
+                if (!F.eq(locSrvcProcMode, rmtSrvcProcMode)) {
+                    utilityPool.execute(
+                        new Runnable() {
+                            @Override public void run() {
+                                String errMsg = "Local node's " + IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED +
+                                    " property value differs from remote node's value " +
+                                    "(to make sure all nodes in topology have identical service processor mode, " +
+                                    "configure system property explicitly) " +
+                                    "[locSrvcProcMode=" + locSrvcProcMode +
+                                    ", rmtSrvcProcMode=" + rmtSrvcProcMode +
+                                    ", locNodeAddrs=" + U.addressesAsString(locNode) +
+                                    ", rmtNodeAddrs=" + U.addressesAsString(node) +
+                                    ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
+
+                                String sndMsg = "Local node's " + IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED +
+                                    " property value differs from remote node's value " +
+                                    "(to make sure all nodes in topology have identical service processor mode, " +
+                                    "configure system property explicitly) " +
+                                    "[locSrvcProcMode=" + rmtSrvcProcMode +
+                                    ", rmtSrvcProcMode=" + locSrvcProcMode +
+                                    ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
+                                    ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
+                                    ", rmtNodeId=" + locNode.id() + ']';
+
+                                nodeCheckError(
+                                    node,
+                                    errMsg,
+                                    sndMsg);
+                            }
+                        });
+
+                    // Ignore join request.
+                    return;
+                }
+
                 // Handle join.
                 node.internalOrder(ring.nextNodeOrder());
 
index 000581a..2c41c8e 100644 (file)
@@ -480,7 +480,7 @@ public class GridDeploymentSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public boolean register(ClassLoader ldr, Class rsrc) throws IgniteSpiException {
-            if (super.register(ldr, rsrc)) {
+            if (super.register(ldr, rsrc) && ComputeTaskAdapter.class.isAssignableFrom(rsrc)) {
                 deployCnt++;
 
                 return true;
index 98803e0..a42f960 100644 (file)
@@ -30,6 +30,7 @@ import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceContext;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -133,6 +134,8 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
      */
     @Test
     public void testReconnectInDeploying() throws Exception {
+        Assume.assumeTrue(!isEventDrivenServiceProcessorEnabled());
+
         Ignite client = grid(serverCount());
 
         assertTrue(client.cluster().localNode().isClient());
@@ -179,6 +182,52 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
     /**
      * @throws Exception If failed.
      */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    @Test
+    public void testReconnectInDeployingNew() throws Exception {
+        Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
+
+        IgniteEx client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final IgniteServices services = client.services();
+
+        Ignite srv = ignite(0);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    services.deployClusterSingleton("testReconnectInDeploying", new TestServiceImpl());
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        reconnectClientNode(client, srv, () -> {
+            // Check that client waiting operation.
+            GridTestUtils.assertThrows(log, () -> fut.get(200), IgniteFutureTimeoutCheckedException.class, null);
+
+            try {
+                assertNotDone(fut);
+            }
+            catch (Exception e) {
+                fail("Unexpected exception has been thrown, err=" + e.getMessage());
+            }
+        });
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     @Test
     public void testReconnectInProgress() throws Exception {
         Ignite client = grid(serverCount());
index 8db1551..cc88a6a 100644 (file)
@@ -35,6 +35,7 @@ import org.junit.runners.JUnit4;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SECURITY_COMPATIBILITY_MODE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
 import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
 import static org.apache.ignite.configuration.DeploymentMode.SHARED;
 
@@ -246,6 +247,17 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA
      * @throws Exception If failed.
      */
     @Test
+    public void testServiceProcessorModeProperty() throws Exception {
+        doTestCompatibilityEnabled(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, true, false, true);
+        doTestCompatibilityEnabled(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, false, true, true);
+        doTestCompatibilityEnabled(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, true, true, false);
+        doTestCompatibilityEnabled(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, false, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
     public void testSecurityCompatibilityEnabled() throws Exception {
         TestReconnectPluginProvider.enabled = true;
         TestReconnectProcessor.enabled = true;
index 7e66584..b26e4d4 100644 (file)
@@ -58,8 +58,10 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.processors.datastructures.GridCacheInternalKeyImpl;
+import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.PA;
@@ -206,9 +208,12 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         }, 3000);
 
         for (int i = 0; i < gridCount(); i++) {
-            GridContinuousProcessor proc = grid(i).context().continuous();
+            GridKernalContext ctx = grid(i).context();
+            GridContinuousProcessor proc = ctx.continuous();
 
-            assertEquals(String.valueOf(i), 1, ((Map)U.field(proc, "locInfos")).size());
+            final int locInfosCnt = ctx.service() instanceof GridServiceProcessor ? 1 : 0;
+
+            assertEquals(String.valueOf(i), locInfosCnt, ((Map)U.field(proc, "locInfos")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size());
index e4157e2..47bdf61 100644 (file)
@@ -132,11 +132,7 @@ public class GridServiceContinuousQueryRedeployTest extends GridCommonAbstractTe
         svcCfg.setName(SERVICE_NAME);
         svcCfg.setTotalCount(1);
         svcCfg.setMaxPerNodeCount(1);
-        svcCfg.setNodeFilter(new IgnitePredicate<ClusterNode>() {
-            @Override public boolean apply(ClusterNode node) {
-                return !node.isClient();
-            }
-        });
+        svcCfg.setNodeFilter((IgnitePredicate<ClusterNode>)node -> !node.isClient());
 
         ignite.services().deploy(svcCfg);
     }
index f2563d2..5a09aa2 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.services.ServiceDeploymentException;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -40,7 +41,7 @@ public class GridServiceDeploymentCompoundFutureSelfTest extends GridCommonAbstr
      */
     @Test
     public void testWaitForCompletionOnFailingFuture() throws Exception {
-        GridServiceDeploymentCompoundFuture compFut = new GridServiceDeploymentCompoundFuture();
+        GridServiceDeploymentCompoundFuture<IgniteUuid> compFut = new GridServiceDeploymentCompoundFuture<>();
 
         int failingFutsNum = 2;
 
@@ -51,7 +52,7 @@ public class GridServiceDeploymentCompoundFutureSelfTest extends GridCommonAbstr
         for (int i = 0; i < failingFutsNum; i++) {
             ServiceConfiguration failingCfg = config("Failed-" + i);
 
-            GridServiceDeploymentFuture failingFut = new GridServiceDeploymentFuture(failingCfg);
+            GridServiceDeploymentFuture<IgniteUuid> failingFut = new GridServiceDeploymentFuture<>(failingCfg, IgniteUuid.randomUuid());
 
             failingFuts.add(failingFut);
 
@@ -61,7 +62,7 @@ public class GridServiceDeploymentCompoundFutureSelfTest extends GridCommonAbstr
         List<GridFutureAdapter<Object>> futs = new ArrayList<>(completingFutsNum);
 
         for (int i = 0; i < completingFutsNum; i++) {
-            GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(config(String.valueOf(i)));
+            GridServiceDeploymentFuture<IgniteUuid> fut = new GridServiceDeploymentFuture<>(config(String.valueOf(i)), IgniteUuid.randomUuid());
 
             futs.add(fut);
 
index d5df828..60f632a 100644 (file)
 package org.apache.ignite.internal.processors.service;
 
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceContext;
-import org.apache.ignite.testframework.GridStringLogger;
+import org.apache.ignite.services.ServiceDeploymentException;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assume;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -31,26 +33,37 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class GridServiceDeploymentExceptionPropagationTest extends GridCommonAbstractTest {
     /** */
-    @SuppressWarnings("unused")
+    @BeforeClass
+    public static void check() {
+        Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
+    }
+
+    /** */
     @Test
     public void testExceptionPropagation() throws Exception {
-        try (Ignite srv = startGrid("server")) {
-
-            GridStringLogger log = new GridStringLogger();
-
-            try (Ignite client = startGrid("client", getConfiguration("client").setGridLogger(log).setClientMode(true))) {
+        try (IgniteEx srv = startGrid("server")) {
+            try (Ignite client = startGrid("client", getConfiguration("client").setClientMode(true))) {
+                final String srvcName = "my-service";
 
                 try {
-                    client.services().deployClusterSingleton("my-service", new ServiceImpl());
-                }
-                catch (IgniteException ignored) {
-                    assertTrue(log.toString().contains("ServiceImpl init exception"));
+                    client.services().deployClusterSingleton(srvcName, new ServiceImpl());
 
-                    return; // Exception is what we expect.
+                    fail("Deployment exception has been expected.");
                 }
+                catch (ServiceDeploymentException ex) {
+                    String errMsg = ex.getSuppressed()[0].getMessage();
+
+                    // Check that message contains cause node id
+                    assertTrue(errMsg.contains(srv.cluster().localNode().id().toString()));
+
+                    // Check that message contains service name
+                    assertTrue(errMsg.contains(srvcName));
 
-                // Fail explicitly if we've managed to get here though we shouldn't have.
-                fail("https://issues.apache.org/jira/browse/IGNITE-3392");
+                    Throwable cause = ex.getSuppressed()[0].getCause();
+
+                    // Check that error's cause contains users message
+                    assertTrue(cause.getMessage().contains("ServiceImpl init exception"));
+                }
             }
         }
     }
index 9197d93..f25a1e4 100644 (file)
@@ -30,7 +30,9 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
 import org.apache.ignite.internal.util.typedef.CA;
@@ -139,7 +141,7 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
     /**
      * @return Random grid.
      */
-    protected Ignite randomGrid() {
+    protected IgniteEx randomGrid() {
         return grid(RAND.nextInt(nodeCount()));
     }
 
@@ -249,11 +251,11 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
 
         info("Deployed service: " + name);
 
-        fut1.get();
+        try {
+            fut1.get();
 
-        info("Finished waiting for service future: " + name);
+            info("Finished waiting for service future: " + name);
 
-        try {
             fut2.get();
 
             fail("Failed to receive mismatching configuration exception.");
@@ -764,6 +766,19 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
 
     /**
      * @param svcName Service name.
+     * @param ignite Ignite instance.
+     * @param cnt Expected count.
+     */
+    protected void checkCount(String svcName, IgniteEx ignite, int cnt) throws IgniteInterruptedCheckedException {
+        AffinityTopologyVersion topVer = ignite.context().discovery().topologyVersionEx();
+
+        waitForServicesReadyTopology(ignite, topVer);
+
+        assertEquals(cnt, actualCount(svcName, ignite.services().serviceDescriptors()));
+    }
+
+    /**
+     * @param svcName Service name.
      * @param descs Descriptors.
      * @param cnt Expected count.
      */
@@ -825,6 +840,32 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
     }
 
     /**
+     * @param ignite Ignite instance.
+     * @param srvcName Affinity service name.
+     */
+    protected void checkAffinityServiceDeployment(Ignite ignite, String srvcName) {
+        ServiceDescriptor desc = null;
+
+        for (ServiceDescriptor d : ignite.services().serviceDescriptors()) {
+            if (d.name().equals(srvcName)) {
+                desc = d;
+
+                break;
+            }
+        }
+
+        assertNotNull(desc);
+
+        assertEquals(1, desc.topologySnapshot().size());
+
+        ClusterNode n = ignite.affinity(desc.cacheName()).mapKeyToNode(desc.affinityKey());
+
+        assertNotNull(n);
+
+        assertTrue(desc.topologySnapshot().containsKey(n.id()));
+    }
+
+    /**
      * Affinity service.
      */
     protected static class AffinityService implements Service {
@@ -853,11 +894,6 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs
         /** {@inheritDoc} */
         @Override public void init(ServiceContext ctx) throws Exception {
             X.println("Initializing affinity service for key: " + affKey);
-
-            ClusterNode n = g.affinity(CACHE_NAME).mapKeyToNode(affKey);
-
-            assertNotNull(n);
-            assertTrue(n.isLocal());
         }
 
         /** {@inheritDoc} */
index b77144c..625b287 100644 (file)
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.lang.IgniteFuture;
@@ -37,6 +38,7 @@ import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.services.ServiceDeploymentException;
 import org.apache.ignite.services.ServiceDescriptor;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assume;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -132,16 +134,7 @@ public class GridServiceProcessorBatchDeploySelfTest extends GridCommonAbstractT
 
             CountDownLatch latch = new CountDownLatch(numServices);
 
-            IgnitePredicate<ClusterNode> depPred = client.cluster().forServers()
-                .forPredicate(new IgnitePredicate<ClusterNode>() {
-                    @Override public boolean apply(ClusterNode node) {
-                        String gridName = node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME);
-
-                        assert gridName != null;
-
-                        return gridName.startsWith(getTestIgniteInstanceName());
-                    }
-                }).predicate();
+            IgnitePredicate<ClusterNode> depPred = new TestPredicate(getTestIgniteInstanceName());
 
             List<ServiceConfiguration> cfgs = getConfigs(depPred, numServices);
 
@@ -185,16 +178,7 @@ public class GridServiceProcessorBatchDeploySelfTest extends GridCommonAbstractT
 
             CountDownLatch latch = new CountDownLatch(numServices);
 
-            IgnitePredicate<ClusterNode> depPred = client.cluster().forServers()
-                .forPredicate(new IgnitePredicate<ClusterNode>() {
-                    @Override public boolean apply(ClusterNode node) {
-                        String gridName = node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME);
-
-                        assert gridName != null;
-
-                        return gridName.startsWith(getTestIgniteInstanceName());
-                    }
-                }).predicate();
+            IgnitePredicate<ClusterNode> depPred = new TestPredicate(getTestIgniteInstanceName());
 
             List<ServiceConfiguration> cfgs = getConfigs(depPred, numServices);
 
@@ -430,10 +414,12 @@ public class GridServiceProcessorBatchDeploySelfTest extends GridCommonAbstractT
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10021")
     @Test
     public void testCancelAllTopologyChange() throws Exception {
-        Ignite client = grid(CLIENT_NODE_NAME);
+        IgniteEx client = grid(CLIENT_NODE_NAME);
+
+        Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-10021",
+            client.context().service() instanceof GridServiceProcessor);
 
         int numServices = 500;
 
@@ -661,4 +647,28 @@ public class GridServiceProcessorBatchDeploySelfTest extends GridCommonAbstractT
             }
         });
     }
+
+    /**
+     * Test predicate.
+     */
+    private static class TestPredicate implements IgnitePredicate<ClusterNode> {
+        /** */
+        private final String namePrefix;
+
+        /**
+         * @param namePrefix Prefix to match instances name.
+         */
+        public TestPredicate(String namePrefix) {
+            this.namePrefix = namePrefix;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            String gridName = node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME);
+
+            assert gridName != null;
+
+            return !node.isClient() && gridName.startsWith(namePrefix);
+        }
+    }
 }
index d328db2..b1e557f 100644 (file)
@@ -20,9 +20,9 @@ package org.apache.ignite.internal.processors.service;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -192,22 +192,28 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
      */
     @Test
     public void testAffinityUpdateTopology() throws Exception {
-        Ignite g = randomGrid();
+        IgniteEx g = randomGrid();
 
-        checkCount(AFFINITY, g.services().serviceDescriptors(), 1);
+        checkCount(AFFINITY, g, 1);
+
+        checkAffinityServiceDeployment(g, AFFINITY);
 
         int nodeCnt = 2;
 
         startExtraNodes(nodeCnt);
 
         try {
-            checkCount(AFFINITY, g.services().serviceDescriptors(), 1);
+            checkCount(AFFINITY, g, 1);
+
+            checkAffinityServiceDeployment(g, AFFINITY);
         }
         finally {
             stopExtraNodes(nodeCnt);
         }
 
-        checkCount(AFFINITY, g.services().serviceDescriptors(), 1);
+        checkCount(AFFINITY, g, 1);
+
+        checkAffinityServiceDeployment(g, AFFINITY);
     }
 
     /**
@@ -215,13 +221,13 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
      */
     @Test
     public void testDeployLimits() throws Exception {
-        final Ignite g = randomGrid();
+        final IgniteEx g = randomGrid();
 
         final String name = NODE_SINGLE_WITH_LIMIT;
 
         waitForDeployment(name, nodeCount());
 
-        checkCount(name, g.services().serviceDescriptors(), nodeCount());
+        checkCount(name, g, nodeCount());
 
         CountDownLatch latch = new CountDownLatch(1);
 
@@ -236,7 +242,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
 
             waitForDeployment(name, nodeCount() + 1);
 
-            checkCount(name, g.services().serviceDescriptors(), nodeCount() + 1);
+            checkCount(name, g, nodeCount() + 1);
         }
         finally {
             stopExtraNodes(extraNodes);
@@ -247,7 +253,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
         // Service can be redeployed when nodes is stopping one-by-one.
         assertEquals(0, DummyService.started(name) - DummyService.cancelled(name));
 
-        checkCount(name, g.services().serviceDescriptors(), nodeCount());
+        checkCount(name, g, nodeCount());
     }
 
     /**
@@ -255,7 +261,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
      * @throws Exception If failed.
      */
     private void checkSingletonUpdateTopology(String name) throws Exception {
-        Ignite g = randomGrid();
+        IgniteEx g = randomGrid();
 
         startExtraNodes(2, 2);
 
@@ -265,7 +271,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
 
             info(">>> Passed checks.");
 
-            checkCount(name, g.services().serviceDescriptors(), 1);
+            checkCount(name, g, 1);
         }
         finally {
             stopExtraNodes(4);
@@ -277,7 +283,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
      * @throws Exception If failed.
      */
     private void checkDeployOnEachNodeUpdateTopology(String name) throws Exception {
-        Ignite g = randomGrid();
+        IgniteEx g = randomGrid();
 
         int newNodes = 4;
 
@@ -297,7 +303,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
             // not start and cancel events individually.
             assertEquals(name, newNodes,  DummyService.started(name) - DummyService.cancelled(name));
 
-            checkCount(name, g.services().serviceDescriptors(), nodeCount() + newNodes);
+            checkCount(name, g, nodeCount() + newNodes);
         }
         finally {
             stopExtraNodes(newNodes);
@@ -305,7 +311,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
 
         waitForDeployment(name, nodeCount());
 
-        checkCount(name, g.services().serviceDescriptors(), nodeCount());
+        checkCount(name, g, nodeCount());
     }
 
     /**
@@ -313,7 +319,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
      * @throws Exception If failed.
      */
     private void checkDeployOnEachNodeButClientUpdateTopology(String name) throws Exception {
-        Ignite g = randomGrid();
+        IgniteEx g = randomGrid();
 
         int servers = 2;
 
@@ -335,7 +341,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
             // not start and cancel events individually.
             assertEquals(name, servers,  DummyService.started(name) - DummyService.cancelled(name));
 
-            checkCount(name, g.services().serviceDescriptors(), nodeCount() + servers);
+            checkCount(name, g, nodeCount() + servers);
         }
         finally {
             stopExtraNodes(servers + clients);
@@ -343,6 +349,6 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
 
         waitForDeployment(name, nodeCount());
 
-        checkCount(name, g.services().serviceDescriptors(), nodeCount());
+        checkCount(name, g, nodeCount());
     }
 }
index ed331fa..c6bb223 100644 (file)
@@ -22,6 +22,7 @@ import junit.framework.TestCase;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceConfiguration;
@@ -46,7 +47,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
     public void testSingletonUpdateTopology() throws Exception {
         String name = "serviceSingletonUpdateTopology";
 
-        Ignite g = randomGrid();
+        IgniteEx g = randomGrid();
 
         CountDownLatch latch = new CountDownLatch(1);
 
@@ -77,7 +78,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
 
             info(">>> Passed checks.");
 
-            checkCount(name, g.services().serviceDescriptors(), 1);
+            checkCount(name, g, 1);
         }
         finally {
             stopExtraNodes(nodeCnt);
@@ -89,7 +90,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
      */
     @Test
     public void testAffinityDeployUpdateTopology() throws Exception {
-        Ignite g = randomGrid();
+        IgniteEx g = randomGrid();
 
         final Integer affKey = 1;
 
@@ -116,7 +117,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
         startExtraNodes(nodeCnt);
 
         try {
-            checkCount(name, g.services().serviceDescriptors(), 1);
+            checkCount(name, g, 1);
         }
         finally {
             stopExtraNodes(nodeCnt);
@@ -134,7 +135,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
         try {
             final String name = "serviceOnEachNodeButClientUpdateTopology";
 
-            Ignite g = randomGrid();
+            IgniteEx g = randomGrid();
 
             CountDownLatch latch = new CountDownLatch(nodeCount());
 
@@ -178,7 +179,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
                 // not start and cancel events individually.
                 assertEquals(name, nodeCount() + servers, DummyService.started(name) - DummyService.cancelled(name));
 
-                checkCount(name, g.services().serviceDescriptors(), nodeCount() + servers);
+                checkCount(name, g, nodeCount() + servers);
             }
             finally {
                 stopExtraNodes(servers + clients);
@@ -200,7 +201,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
         try {
             final String name = "serviceOnEachProjectionNodeUpdateTopology";
 
-            Ignite g = randomGrid();
+            IgniteEx g = randomGrid();
 
             int prestartedSrvcs = 1;
 
@@ -246,7 +247,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
                 // not start and cancel events individually.
                 assertEquals(name, clients + prestartedSrvcs, DummyService.started(name) - DummyService.cancelled(name));
 
-                checkCount(name, g.services().serviceDescriptors(), clients + prestartedSrvcs);
+                checkCount(name, g, clients + prestartedSrvcs);
             }
             finally {
                 stopExtraNodes(servers + clients);
@@ -268,7 +269,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
         try {
             final String name = "serviceOnEachNodeUpdateTopology";
 
-            Ignite g = randomGrid();
+            IgniteEx g = randomGrid();
 
             final int prestartedNodes = nodeCount() + 1;
 
@@ -323,7 +324,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
                 assertEquals(name, prestartedNodes + extraNodes,
                     DummyService.started(name) - DummyService.cancelled(name));
 
-                checkCount(name, g.services().serviceDescriptors(), prestartedNodes + extraNodes);
+                checkCount(name, g, prestartedNodes + extraNodes);
             }
             finally {
                 stopExtraNodes(extraNodes);
@@ -342,7 +343,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
     public void testDeployLimits() throws Exception {
         final String name = "serviceWithLimitsUpdateTopology";
 
-        Ignite g = randomGrid();
+        IgniteEx g = randomGrid();
 
         final int totalInstances = nodeCount() + 1;
 
@@ -394,7 +395,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
             // not start and cancel events individually.
             assertEquals(name, totalInstances, DummyService.started(name) - DummyService.cancelled(name));
 
-            checkCount(name, g.services().serviceDescriptors(), totalInstances);
+            checkCount(name, g, totalInstances);
         }
         finally {
             stopExtraNodes(extraNodes);
index b022622..33db5fa 100644 (file)
@@ -230,7 +230,8 @@ public class GridServiceProcessorProxySelfTest extends GridServiceProcessorAbstr
         ignite.services(ignite.cluster().forLocal()).deployClusterSingleton(name, new MapServiceImpl<String, Integer>());
 
         for (int i = 1; i < nodeCount(); i++) {
-            MapService<Integer, String> svc =  grid(i).services().serviceProxy(name, MapService.class, false);
+            MapService<Integer, String> svc =  grid(i).services()
+                .serviceProxy(name, MapService.class, false, 1_000L);
 
             // Make sure service is a proxy.
             assertFalse(svc instanceof Service);
@@ -445,7 +446,7 @@ public class GridServiceProcessorProxySelfTest extends GridServiceProcessorAbstr
     /**
      *
      */
-    protected class ErrorServiceImpl implements ErrorService {
+    protected static class ErrorServiceImpl implements ErrorService {
         /** {@inheritDoc} */
         @Override public void cancel(ServiceContext ctx) {
             // No-op.
index e2e5a5d..56d8a34 100644 (file)
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.processors.service;
 
-import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -33,7 +33,6 @@ public class GridServiceProcessorSingleNodeSelfTest extends GridServiceProcessor
         return 1;
     }
 
-
     /**
      * @throws Exception If failed.
      */
@@ -41,17 +40,20 @@ public class GridServiceProcessorSingleNodeSelfTest extends GridServiceProcessor
     public void testNodeSingletonNotDeployedProxy() throws Exception {
         String name = "testNodeSingletonNotDeployedProxy";
 
-        Ignite ignite = randomGrid();
+        IgniteEx ignite = randomGrid();
+
+        try {
+            // Deploy only on remote nodes.
+            ignite.services(ignite.cluster().forRemotes()).deployNodeSingleton(name, new CounterServiceImpl());
 
-        // Deploy only on remote nodes.
-        ignite.services(ignite.cluster().forRemotes()).deployNodeSingleton(name, new CounterServiceImpl());
+            assertFalse("Should not reach here in this mode, because exception should be thrown.",
+                ignite.context().service() instanceof IgniteServiceProcessor);
 
-        info("Deployed service: " + name);
+            info("Deployed service: " + name);
 
-        // Get local proxy.
-        CounterService svc = ignite.services().serviceProxy(name, CounterService.class, false);
+            // Get local proxy.
+            CounterService svc = ignite.services().serviceProxy(name, CounterService.class, false);
 
-        try {
             svc.increment();
 
             fail("Should never reach here.");
index d17b015..ac3aaa6 100644 (file)
@@ -180,7 +180,7 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
     /**
      *
      */
-    public class TestServiceImpl implements Service, TestService {
+    public static class TestServiceImpl implements Service, TestService {
         /** Serial version UID. */
         private static final long serialVersionUID = 0L;
 
index 532728b..096c9be 100644 (file)
@@ -26,7 +26,6 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -42,6 +41,9 @@ public class GridServiceReassignmentSelfTest extends GridServiceProcessorAbstrac
     /** */
     private static final String SERVICE_NAME = "testService";
 
+    /** */
+    private static final long SERVICE_TOP_WAIT_TIMEOUT = 2_000L;
+
     /** {@inheritDoc} */
     @Override protected int nodeCount() {
         return 1;
@@ -164,17 +166,17 @@ public class GridServiceReassignmentSelfTest extends GridServiceProcessorAbstrac
     private boolean checkServices(int total, int maxPerNode, int gridIdx, boolean lastTry) throws Exception {
         IgniteEx grid = grid(gridIdx);
 
-        IgniteInternalCache<GridServiceAssignmentsKey, GridServiceAssignments> cache = grid.utilityCache();
+        waitForServicesReadyTopology(grid, grid.context().discovery().topologyVersionEx());
 
-        GridServiceAssignments assignments = cache.get(new GridServiceAssignmentsKey(SERVICE_NAME));
+        Map<UUID, Integer> srvcTop = grid.context().service().serviceTopology(SERVICE_NAME, SERVICE_TOP_WAIT_TIMEOUT);
 
-        Collection<UUID> nodes = F.viewReadOnly(grid.cluster().nodes(), F.node2id());
+        Collection<UUID> nodes = F.viewReadOnly(grid.context().discovery().aliveServerNodes(), F.node2id());
 
-        assertNotNull("Grid assignments object is null", assignments);
+        assertNotNull("Grid assignments object is null", srvcTop);
 
         int sum = 0;
 
-        for (Map.Entry<UUID, Integer> entry : assignments.assigns().entrySet()) {
+        for (Map.Entry<UUID, Integer> entry : srvcTop.entrySet()) {
             UUID nodeId = entry.getKey();
 
             if (!lastTry && !nodes.contains(nodeId))
@@ -193,9 +195,9 @@ public class GridServiceReassignmentSelfTest extends GridServiceProcessorAbstrac
 
         if (total > 0)
             assertTrue("Total number of services limit exceeded [sum=" + sum +
-                ", assigns=" + assignments.assigns() + ']', sum <= total);
+                ", assigns=" + srvcTop + ']', sum <= total);
         else
-            assertEquals("Reassign per node failed.", nodes.size(), assignments.assigns().size());
+            assertEquals("Reassign per node failed.", nodes.size(), srvcTop.size());
 
         if (!lastTry && proxy(grid).get() != 10)
             return false;
index 92a7449..9c9cc73 100644 (file)
@@ -54,7 +54,7 @@ public class GridServiceSerializationSelfTest extends GridCommonAbstractTest {
             server.services(server.cluster().forServers())
                 .deployClusterSingleton("my-service", new MyServiceImpl());
 
-            MyService svc = client.services().serviceProxy("my-service", MyService.class, false);
+            MyService svc = client.services().serviceProxy("my-service", MyService.class, false, 2_000);
 
             svc.hello();
 
index 0ebee6e..f64cef4 100644 (file)
@@ -21,18 +21,14 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.io.Serializable;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.cache.configuration.Factory;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteServices;
-import org.apache.ignite.binary.BinaryObjectException;
-import org.apache.ignite.binary.BinaryReader;
-import org.apache.ignite.binary.BinaryWriter;
-import org.apache.ignite.binary.Binarylizable;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.services.ServiceContext;
@@ -67,9 +63,18 @@ public class IgniteServiceConfigVariationsFullApiTest extends IgniteConfigVariat
     private static final Factory[] serviceFactories = new Factory[] {
         Parameters.factory(TestServiceImpl.class),
         Parameters.factory(TestServiceImplExternalizable.class),
-        Parameters.factory(TestServiceImplBinarylizable.class)
     };
 
+    /** */
+    private static boolean isEventDrivenServiceProcessorEnabled;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        isEventDrivenServiceProcessorEnabled = grid(0).context().service() instanceof IgniteServiceProcessor;
+    }
+
     /** {@inheritDoc} */
     @Override protected boolean expectedClient(String testGridName) {
         int i = testsCfg.gridCount();
@@ -93,8 +98,7 @@ public class IgniteServiceConfigVariationsFullApiTest extends IgniteConfigVariat
             @Override public void run(IgniteServices services, String svcName, TestService svc) throws Exception {
                 services.deployNodeSingleton(svcName, (Service)svc);
 
-                // TODO: Waiting for deployment should be removed after IEP-17 completion
-                GridTestUtils.waitForCondition(() -> services.service(svcName) != null, DEPLOYMENT_WAIT_TIMEOUT);
+                waitForServiceDeploymentIfNeeded(services, svcName);
             }
         }));
     }
@@ -110,8 +114,7 @@ public class IgniteServiceConfigVariationsFullApiTest extends IgniteConfigVariat
             @Override public void run(IgniteServices services, String svcName, TestService svc) throws Exception {
                 services.deployClusterSingleton(svcName, (Service)svc);
 
-                // TODO: Waiting for deployment should be removed after IEP-17 completion
-                GridTestUtils.waitForCondition(() -> services.service(svcName) != null, DEPLOYMENT_WAIT_TIMEOUT);
+                waitForServiceDeploymentIfNeeded(services, svcName);
             }
         }));
     }
@@ -145,8 +148,10 @@ public class IgniteServiceConfigVariationsFullApiTest extends IgniteConfigVariat
     @Test
  &nbs