IGNITE-6272 .NET: Multiple services deployment
authorAlexey Popov <tank2.alex@gmail.com>
Fri, 24 Nov 2017 16:27:09 +0000 (19:27 +0300)
committerPavel Tupitsyn <ptupitsyn@apache.org>
Fri, 24 Nov 2017 16:27:09 +0000 (19:27 +0300)
This closes #2813

14 files changed:
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
modules/core/src/test/java/org/apache/ignite/platform/PlatformExceptionTask.java
modules/platforms/dotnet/Apache.Ignite.Core.Tests/ExceptionsTest.cs
modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesAsyncWrapper.cs
modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs
modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs
modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
modules/platforms/dotnet/Apache.Ignite.Core/Services/IServices.cs
modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceConfiguration.cs
modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceDeploymentException.cs

index bebff8e..9d52c4a 100644 (file)
@@ -75,4 +75,11 @@ public class PlatformClusterNodeFilterImpl extends PlatformAbstractPredicate imp
     public void setIgniteInstance(Ignite ignite) {
         ctx = PlatformUtils.platformContext(ignite);
     }
+
+    /**
+     * @return Filter itself
+     */
+    public Object getInternalPredicate() {
+        return pred;
+    }
 }
index 1eb9a2c..8730940 100644 (file)
@@ -44,4 +44,11 @@ public class PlatformDotNetServiceImpl extends PlatformAbstractService implement
     public PlatformDotNetServiceImpl(Object svc, PlatformContext ctx, boolean srvKeepBinary) {
         super(svc, ctx, srvKeepBinary);
     }
+
+    /**
+     * @return Service itself
+     */
+    public Object getInternalService() {
+        return svc;
+    }
 }
index 6f8d9e5..ccb04d4 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformTarget;
+import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilterImpl;
 import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetService;
 import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl;
 import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
@@ -37,6 +38,7 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceDeploymentException;
 import org.apache.ignite.services.ServiceDescriptor;
 import org.jetbrains.annotations.NotNull;
 
@@ -96,6 +98,12 @@ public class PlatformServices extends PlatformAbstractTarget {
     private static final int OP_CANCEL_ALL_ASYNC = 14;
 
     /** */
+    private static final int OP_DOTNET_DEPLOY_ALL = 15;
+
+    /** */
+    private static final int OP_DOTNET_DEPLOY_ALL_ASYNC = 16;
+
+    /** */
     private static final byte PLATFORM_JAVA = 0;
 
     /** */
@@ -178,6 +186,12 @@ public class PlatformServices extends PlatformAbstractTarget {
                 return TRUE;
             }
 
+            case OP_DOTNET_DEPLOY_ALL_ASYNC: {
+                readAndListenFuture(reader, dotnetDeployAllAsync(reader, services), RESULT_WRITER);
+
+                return TRUE;
+            }
+
             default:
                 return super.processInStreamOutLong(type, reader);
         }
@@ -209,11 +223,10 @@ public class PlatformServices extends PlatformAbstractTarget {
             case OP_DOTNET_DEPLOY: {
                 try {
                     dotnetDeploy(reader, services);
-
-                    PlatformUtils.writeInvocationResult(writer, null, null);
+                    writeDeploymentResult(writer, null);
                 }
                 catch (Exception e) {
-                    PlatformUtils.writeInvocationResult(writer, null, e);
+                    writeDeploymentResult(writer, e);
                 }
 
                 return;
@@ -223,10 +236,23 @@ public class PlatformServices extends PlatformAbstractTarget {
                 try {
                     dotnetDeployMultiple(reader);
 
-                    PlatformUtils.writeInvocationResult(writer, null, null);
+                    writeDeploymentResult(writer, null);
                 }
                 catch (Exception e) {
-                    PlatformUtils.writeInvocationResult(writer, null, e);
+                    writeDeploymentResult(writer, e);
+                }
+
+                return;
+            }
+
+            case OP_DOTNET_DEPLOY_ALL: {
+                try {
+                    dotnetDeployAll(reader, services);
+
+                    writeDeploymentResult(writer, null);
+                }
+                catch (Exception e) {
+                    writeDeploymentResult(writer, e);
                 }
 
                 return;
@@ -421,6 +447,31 @@ public class PlatformServices extends PlatformAbstractTarget {
     }
 
     /**
+     * Deploys a collection of dotnet services.
+     *
+     * @param reader Binary reader.
+     * @param services Services.
+     */
+    private void dotnetDeployAll(BinaryRawReaderEx reader, IgniteServices services) {
+        Collection<ServiceConfiguration> cfgs = dotnetConfigurations(reader);
+
+        services.deployAll(cfgs);
+    }
+
+    /**
+     * Deploys a collection of dotnet services asynchronously.
+     *
+     * @param reader Binary reader.
+     * @param services Services.
+     * @return Future of the operation.
+     */
+    private IgniteFuture<Void> dotnetDeployAllAsync(BinaryRawReaderEx reader, IgniteServices services) {
+        Collection<ServiceConfiguration> cfgs = dotnetConfigurations(reader);
+
+        return services.deployAllAsync(cfgs);
+    }
+
+    /**
      * Read the dotnet service configuration.
      *
      * @param reader Binary reader,
@@ -445,6 +496,24 @@ public class PlatformServices extends PlatformAbstractTarget {
     }
 
     /**
+     * Reads the collection of dotnet service configurations.
+     *
+     * @param reader Binary reader,
+     * @return Service configuration.
+     */
+    @NotNull private Collection<ServiceConfiguration> dotnetConfigurations(BinaryRawReaderEx reader) {
+        int numServices = reader.readInt();
+
+        List<ServiceConfiguration> cfgs = new ArrayList<>(numServices);
+
+        for (int i = 0; i < numServices; i++) {
+            cfgs.add(dotnetConfiguration(reader));
+        }
+
+        return cfgs;
+    }
+
+    /**
      * Proxy holder.
      */
     @SuppressWarnings("unchecked")
@@ -646,7 +715,7 @@ public class PlatformServices extends PlatformAbstractTarget {
     private static class ServiceDeploymentResultWriter implements PlatformFutureUtils.Writer {
         /** <inheritDoc /> */
         @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) {
-            PlatformUtils.writeInvocationResult(writer, obj, err);
+            writeDeploymentResult(writer, err);
         }
 
         /** <inheritDoc /> */
@@ -655,4 +724,49 @@ public class PlatformServices extends PlatformAbstractTarget {
         }
     }
 
+    /**
+     * Writes a service deployment result for dotnet code.
+     *
+     * @param writer Writer.
+     * @param err Error.
+      */
+    private static void writeDeploymentResult(BinaryRawWriterEx writer, Throwable err) {
+        PlatformUtils.writeInvocationResult(writer, null, err);
+
+        Collection<ServiceConfiguration> failedCfgs = null;
+
+        if (err instanceof ServiceDeploymentException)
+            failedCfgs = ((ServiceDeploymentException)err).getFailedConfigurations();
+
+        // write a collection of failed service configurations
+        PlatformUtils.writeNullableCollection(writer, failedCfgs, new PlatformWriterClosure<ServiceConfiguration>() {
+            @Override public void write(BinaryRawWriterEx writer, ServiceConfiguration svcCfg) {
+                writeFailedConfiguration(writer, svcCfg);
+            }
+        });
+    }
+
+    /**
+     * Writes a failed service configuration for dotnet code.
+     *
+     * @param w Writer
+     * @param svcCfg Service configuration
+     */
+    private static void writeFailedConfiguration(BinaryRawWriterEx w, ServiceConfiguration svcCfg) {
+        Object dotnetSvc = null;
+        Object dotnetFilter = null;
+        w.writeString(svcCfg.getName());
+        if (svcCfg.getService() instanceof PlatformDotNetServiceImpl)
+            dotnetSvc = ((PlatformDotNetServiceImpl)svcCfg.getService()).getInternalService();
+
+        w.writeObjectDetached(dotnetSvc);
+        w.writeInt(svcCfg.getTotalCount());
+        w.writeInt(svcCfg.getMaxPerNodeCount());
+        w.writeString(svcCfg.getCacheName());
+        w.writeObjectDetached(svcCfg.getAffinityKey());
+
+        if (svcCfg.getNodeFilter() instanceof PlatformClusterNodeFilterImpl)
+            dotnetFilter = ((PlatformClusterNodeFilterImpl)svcCfg.getNodeFilter()).getInternalPredicate();
+        w.writeObjectDetached(dotnetFilter);
+    }
 }
index c1ab991..0962fc5 100644 (file)
@@ -25,6 +25,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.lang.IgniteFutureCancelledException;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceDeploymentException;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.Nullable;
 
@@ -32,16 +34,20 @@ import javax.cache.CacheException;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import javax.cache.processor.EntryProcessorException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 /**
  * Task to test exception mappings.
  */
+@SuppressWarnings("unused")  // Used by .NET ExceptionsTest.
 public class PlatformExceptionTask extends ComputeTaskAdapter<String, String> {
     /** {@inheritDoc} */
     @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
         @Nullable String arg) {
+        assert arg != null;
+
         switch (arg) {
             case "IllegalArgumentException": throw new IllegalArgumentException(arg);
             case "IllegalStateException": throw new IllegalStateException(arg);
@@ -66,6 +72,8 @@ public class PlatformExceptionTask extends ComputeTaskAdapter<String, String> {
             case "TransactionHeuristicException": throw new TransactionHeuristicException(arg);
             case "TransactionDeadlockException": throw new TransactionDeadlockException(arg);
             case "IgniteFutureCancelledException": throw new IgniteFutureCancelledException(arg);
+            case "ServiceDeploymentException": throw new ServiceDeploymentException(arg,
+                    Collections.singletonList(new ServiceConfiguration().setName("foo")));
         }
 
         return null;
index d84e6dc..f7568ef 100644 (file)
@@ -29,6 +29,7 @@ namespace Apache.Ignite.Core.Tests
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Compute;
+    using Apache.Ignite.Core.Services;
     using Apache.Ignite.Core.Transactions;
     using NUnit.Framework;
 
@@ -93,6 +94,7 @@ namespace Apache.Ignite.Core.Tests
             CheckException<TransactionHeuristicException>(comp, "TransactionHeuristicException");
             CheckException<TransactionDeadlockException>(comp, "TransactionDeadlockException");
             CheckException<IgniteFutureCancelledException>(comp, "IgniteFutureCancelledException");
+            CheckException<ServiceDeploymentException>(comp, "ServiceDeploymentException");
 
             // Check stopped grid.
             grid.Dispose();
index 470804c..18db5d5 100644 (file)
@@ -121,6 +121,25 @@ namespace Apache.Ignite.Core.Tests.Services
         }
 
         /** <inheritDoc /> */
+        public void DeployAll(IEnumerable<ServiceConfiguration> configurations)
+        {
+            try
+            {
+                _services.DeployAllAsync(configurations).Wait();
+            }
+            catch (AggregateException ex)
+            {
+                throw ex.InnerException;
+            }
+        }
+
+        /** <inheritDoc /> */
+        public Task DeployAllAsync(IEnumerable<ServiceConfiguration> configurations)
+        {
+            return _services.DeployAllAsync(configurations);
+        }
+
+        /** <inheritDoc /> */
         public void Cancel(string name)
         {
             _services.CancelAsync(name).Wait();
index e2b3a09..d3dd9b0 100644 (file)
@@ -21,12 +21,13 @@ namespace Apache.Ignite.Core.Tests.Services
     using System.Collections;
     using System.Collections.Generic;
     using System.Diagnostics.CodeAnalysis;
+    using System.IO;
     using System.Linq;
+    using System.Runtime.Serialization.Formatters.Binary;
     using System.Threading;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Common;
-    //using Apache.Ignite.Core.Impl;
     using Apache.Ignite.Core.Resource;
     using Apache.Ignite.Core.Services;
     using Apache.Ignite.Core.Tests.Compute;
@@ -82,7 +83,7 @@ namespace Apache.Ignite.Core.Tests.Services
         {
             try
             {
-                Services.Cancel(SvcName);
+                Services.CancelAll();
 
                 TestUtils.AssertHandleRegistryIsEmpty(1000, Grid1, Grid2, Grid3);
             }
@@ -123,6 +124,35 @@ namespace Apache.Ignite.Core.Tests.Services
         }
 
         /// <summary>
+        /// Tests several services deployment via DeployAll() method.
+        /// </summary>
+        [Test]
+        public void TestDeployAll([Values(true, false)] bool binarizable)
+        {
+            const int num = 10;
+
+            var cfgs = new List<ServiceConfiguration>();
+            for (var i = 0; i < num; i++)
+            {
+                cfgs.Add(new ServiceConfiguration
+                {
+                    Name = MakeServiceName(i),
+                    MaxPerNodeCount = 3,
+                    TotalCount = 3,
+                    NodeFilter = new NodeFilter {NodeId = Grid1.GetCluster().GetLocalNode().Id},
+                    Service = binarizable ? new TestIgniteServiceBinarizable() : new TestIgniteServiceSerializable()
+                });
+            }
+
+            Services.DeployAll(cfgs);
+
+            for (var i = 0; i < num; i++)
+            {
+                CheckServiceStarted(Grid1, 3, MakeServiceName(i));
+            }
+        }
+
+        /// <summary>
         /// Tests cluster singleton deployment.
         /// </summary>
         [Test]
@@ -455,6 +485,132 @@ namespace Apache.Ignite.Core.Tests.Services
         }
 
         /// <summary>
+        /// Tests ServiceDeploymentException result via DeployAll() method.
+        /// </summary>
+        [Test]
+        public void TestDeployAllException([Values(true, false)] bool binarizable)
+        {
+            const int num = 10;
+            const int firstFailedIdx = 1;
+            const int secondFailedIdx = 9;
+
+            var cfgs = new List<ServiceConfiguration>();
+            for (var i = 0; i < num; i++)
+            {
+                var throwInit = (i == firstFailedIdx || i == secondFailedIdx);
+                cfgs.Add(new ServiceConfiguration
+                {
+                    Name = MakeServiceName(i),
+                    MaxPerNodeCount = 2,
+                    TotalCount = 2,
+                    NodeFilter = new NodeFilter { NodeId = Grid1.GetCluster().GetLocalNode().Id },
+                    Service = binarizable ? new TestIgniteServiceBinarizable { TestProperty = i, ThrowInit = throwInit } 
+                        : new TestIgniteServiceSerializable { TestProperty = i, ThrowInit = throwInit }
+                });
+            } 
+
+            var deploymentException = Assert.Throws<ServiceDeploymentException>(() => Services.DeployAll(cfgs));
+
+            var failedCfgs = deploymentException.FailedConfigurations;
+            Assert.IsNotNull(failedCfgs);
+            Assert.AreEqual(2, failedCfgs.Count);
+
+            var firstFailedSvc = binarizable ? failedCfgs.ElementAt(0).Service as TestIgniteServiceBinarizable : 
+                failedCfgs.ElementAt(0).Service as TestIgniteServiceSerializable;
+            var secondFailedSvc = binarizable ? failedCfgs.ElementAt(1).Service as TestIgniteServiceBinarizable : 
+                failedCfgs.ElementAt(1).Service as TestIgniteServiceSerializable;
+
+            Assert.IsNotNull(firstFailedSvc);
+            Assert.IsNotNull(secondFailedSvc);
+
+            Assert.AreEqual(firstFailedIdx, firstFailedSvc.TestProperty);
+            Assert.AreEqual(secondFailedIdx, secondFailedSvc.TestProperty);
+
+            for (var i = 0; i < num; i++)
+            {
+                if (i != firstFailedIdx && i != secondFailedIdx)
+                {
+                    CheckServiceStarted(Grid1, 2, MakeServiceName(i));
+                }
+            }
+        }
+
+        /// <summary>
+        /// Tests input errors for DeployAll() method.
+        /// </summary>
+        [Test]
+        public void TestDeployAllInputErrors()
+        {
+            var nullException = Assert.Throws<ArgumentNullException>(() => Services.DeployAll(null));
+            Assert.IsTrue(nullException.Message.Contains("configurations"));
+
+            var argException = Assert.Throws<ArgumentException>(() => Services.DeployAll(new List<ServiceConfiguration>()));
+            Assert.IsTrue(argException.Message.Contains("empty collection"));
+
+            nullException = Assert.Throws<ArgumentNullException>(() => Services.DeployAll(new List<ServiceConfiguration> { null }));
+            Assert.IsTrue(nullException.Message.Contains("configurations[0]"));
+
+            nullException = Assert.Throws<ArgumentNullException>(() => Services.DeployAll(new List<ServiceConfiguration>
+            {
+                new ServiceConfiguration { Name = SvcName }
+            }));
+            Assert.IsTrue(nullException.Message.Contains("configurations[0].Service"));
+
+            argException = Assert.Throws<ArgumentException>(() => Services.DeployAll(new List<ServiceConfiguration>
+            {
+                new ServiceConfiguration { Service = new TestIgniteServiceSerializable() }
+            }));
+            Assert.IsTrue(argException.Message.Contains("configurations[0].Name"));
+
+            argException = Assert.Throws<ArgumentException>(() => Services.DeployAll(new List<ServiceConfiguration>
+            {
+                new ServiceConfiguration { Service = new TestIgniteServiceSerializable(), Name = string.Empty }
+            }));
+            Assert.IsTrue(argException.Message.Contains("configurations[0].Name"));
+        }
+
+        /// <summary>
+        /// Tests [Serializable] usage of ServiceDeploymentException.
+        /// </summary>
+        [Test]
+        public void TestDeploymentExceptionSerializable()
+        {
+            var cfg = new ServiceConfiguration
+            {
+                Name = "foo",
+                CacheName = "cacheName",
+                AffinityKey = 1,
+                MaxPerNodeCount = 2,
+                Service = new TestIgniteServiceSerializable(),
+                NodeFilter = new NodeFilter(),
+                TotalCount = 3
+            };
+
+            var ex = new ServiceDeploymentException("msg", new Exception("in"), new[] {cfg});
+
+            var formatter = new BinaryFormatter();
+            var stream = new MemoryStream();
+            formatter.Serialize(stream, ex);
+            stream.Seek(0, SeekOrigin.Begin);
+
+            var res = (ServiceDeploymentException) formatter.Deserialize(stream);
+
+            Assert.AreEqual(ex.Message, res.Message);
+            Assert.IsNotNull(res.InnerException);
+            Assert.AreEqual("in", res.InnerException.Message);
+
+            var resCfg = res.FailedConfigurations.Single();
+
+            Assert.AreEqual(cfg.Name, resCfg.Name);
+            Assert.AreEqual(cfg.CacheName, resCfg.CacheName);
+            Assert.AreEqual(cfg.AffinityKey, resCfg.AffinityKey);
+            Assert.AreEqual(cfg.MaxPerNodeCount, resCfg.MaxPerNodeCount);
+            Assert.AreEqual(cfg.TotalCount, resCfg.TotalCount);
+            Assert.IsInstanceOf<TestIgniteServiceSerializable>(cfg.Service);
+            Assert.IsInstanceOf<NodeFilter>(cfg.NodeFilter);
+        }
+
+        /// <summary>
         /// Verifies the deployment exception.
         /// </summary>
         private void VerifyDeploymentException(Action<IServices, IService> deploy, bool keepBinary)
@@ -496,6 +652,10 @@ namespace Apache.Ignite.Core.Tests.Services
             Assert.IsTrue(ex.StackTrace.Trim().StartsWith(
                 "at Apache.Ignite.Core.Tests.Services.ServicesTest.TestIgniteServiceSerializable.Init"));
 
+            var failedCfgs = deploymentException.FailedConfigurations;
+            Assert.IsNotNull(failedCfgs);
+            Assert.AreEqual(1, failedCfgs.Count);
+
             var svc0 = Services.GetService<TestIgniteServiceSerializable>(SvcName);
             Assert.IsNull(svc0);
         }
@@ -582,6 +742,8 @@ namespace Apache.Ignite.Core.Tests.Services
             Grid1.GetCompute()
                 .ExecuteJavaTask<object>("org.apache.ignite.platform.PlatformDeployServiceTask", javaSvcName);
 
+            TestUtils.WaitForCondition(() => Services.GetServiceDescriptors().Any(x => x.Name == javaSvcName), 1000);
+
             // Verify decriptor
             var descriptor = Services.GetServiceDescriptors().Single(x => x.Name == javaSvcName);
             Assert.AreEqual(javaSvcName, descriptor.Name);
@@ -707,10 +869,10 @@ namespace Apache.Ignite.Core.Tests.Services
         /// <summary>
         /// Checks that service has started on specified grid.
         /// </summary>
-        private static void CheckServiceStarted(IIgnite grid, int count = 1)
+        private static void CheckServiceStarted(IIgnite grid, int count = 1, string svcName = SvcName)
         {
             Func<ICollection<TestIgniteServiceSerializable>> getServices = () =>
-                grid.GetServices().GetServices<TestIgniteServiceSerializable>(SvcName);
+                grid.GetServices().GetServices<TestIgniteServiceSerializable>(svcName);
 
             Assert.IsTrue(TestUtils.WaitForCondition(() => count == getServices().Count, 5000));
 
@@ -779,6 +941,15 @@ namespace Apache.Ignite.Core.Tests.Services
         protected virtual bool CompactFooter { get { return true; } }
 
         /// <summary>
+        /// Makes Service1-{i} names for services.
+        /// </summary>
+        private static string MakeServiceName(int i)
+        {
+            // Please note that CheckContext() validates Name.StartsWith(SvcName).
+            return string.Format("{0}-{1}", SvcName, i);
+        }
+
+        /// <summary>
         /// Test service interface for proxying.
         /// </summary>
         private interface ITestIgniteService
@@ -961,11 +1132,13 @@ namespace Apache.Ignite.Core.Tests.Services
             public void WriteBinary(IBinaryWriter writer)
             {
                 writer.WriteInt("TestProp", TestProperty);
+                writer.WriteBoolean("ThrowInit", ThrowInit);
             }
 
             /** <inheritdoc /> */
             public void ReadBinary(IBinaryReader reader)
             {
+                ThrowInit = reader.ReadBoolean("ThrowInit");
                 TestProperty = reader.ReadInt("TestProp");
             }
         }
index de0277b..db2f84f 100644 (file)
@@ -100,7 +100,9 @@ namespace Apache.Ignite.Core.Impl.Binary
         }
 
         /// <summary>
-        /// Reads the collection.
+        /// Reads the collection. The collection could be produced by Java PlatformUtils.writeCollection()
+        /// from org.apache.ignite.internal.processors.platform.utils package
+        /// Note: return null if collection is empty
         /// </summary>
         public static ICollection<T> ReadCollectionRaw<T, TReader>(this TReader reader,
             Func<TReader, T> factory) where TReader : IBinaryRawReader
@@ -142,5 +144,24 @@ namespace Apache.Ignite.Core.Impl.Binary
 
             return res;
         }
+
+        /// <summary>
+        /// Reads a nullable collection. The collection could be produced by Java 
+        /// PlatformUtils.writeNullableCollection() from org.apache.ignite.internal.processors.platform.utils package.
+        /// </summary>
+        public static ICollection<T> ReadNullableCollectionRaw<T, TReader>(this TReader reader,
+            Func<TReader, T> factory) where TReader : IBinaryRawReader
+        {
+            Debug.Assert(reader != null);
+            Debug.Assert(factory != null);
+
+            var hasVal = reader.ReadBoolean();
+
+            if (!hasVal)
+            {
+                return null;
+            }
+            return ReadCollectionRaw(reader, factory);
+        }
     }
 }
index 3123c07..4e4d327 100644 (file)
@@ -1559,10 +1559,10 @@ namespace Apache.Ignite.Core.Impl.Binary
         {
             err = null;
 
-            if (reader.ReadBoolean())
-                return reader.ReadObject<object>();
+            if (reader.ReadBoolean()) // success indication
+                return reader.ReadObject<object>(); 
 
-            err = reader.ReadBoolean()
+            err = reader.ReadBoolean() // native error indication
                 ? reader.ReadObject<object>()
                 : ExceptionUtils.GetException(reader.Marshaller.Ignite, reader.ReadString(), reader.ReadString(),
                                               reader.ReadString());
index fc6009a..422908f 100644 (file)
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Impl.Services
 {
     using System;
     using System.Collections;
+    using System.Collections.Generic;
     using System.Diagnostics;
     using System.Reflection;
     using Apache.Ignite.Core.Binary;
@@ -185,13 +186,28 @@ namespace Apache.Ignite.Core.Impl.Services
                 return;
             }
 
+            // read failed configurations
+            ICollection<ServiceConfiguration> failedCfgs;
+
+            try
+            {
+                // switch to BinaryMode.Deserialize mode to avoid IService casting exception
+                reader = marsh.StartUnmarshal(stream);
+                failedCfgs = reader.ReadNullableCollectionRaw(f => new ServiceConfiguration(f));
+            }
+            catch (Exception e)
+            {
+                throw new ServiceDeploymentException("Service deployment failed with an exception. " +
+                                                     "Examine InnerException for details.", e);
+            }
+
             var binErr = err as IBinaryObject;
 
             throw binErr != null
                 ? new ServiceDeploymentException("Service deployment failed with a binary error. " +
-                                                 "Examine BinaryCause for details.", binErr)
+                                                 "Examine BinaryCause for details.", binErr, failedCfgs)
                 : new ServiceDeploymentException("Service deployment failed with an exception. " +
-                                                 "Examine InnerException for details.", (Exception) err);
+                                                 "Examine InnerException for details.", (Exception) err, failedCfgs);
         }
 
         /// <summary>
index fca3425..a9aea66 100644 (file)
@@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Services
     using System.Linq;
     using System.Reflection;
     using System.Threading.Tasks;
-    using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Binary.IO;
@@ -34,6 +33,11 @@ namespace Apache.Ignite.Core.Impl.Services
     /// </summary>
     internal sealed class Services : PlatformTargetAdapter, IServices
     {
+        /*
+         * Please keep the following constants in sync with
+         * \modules\core\src\main\java\org\apache\ignite\internal\processors\platform\services\PlatformServices.java
+         */
+
         /** */
         private const int OpDeploy = 1;
         
@@ -74,6 +78,12 @@ namespace Apache.Ignite.Core.Impl.Services
         private const int OpCancelAllAsync = 14;
 
         /** */
+        private const int OpDeployAll = 15;
+
+        /** */
+        private const int OpDeployAllAsync = 16;
+
+        /** */
         private readonly IClusterGroup _clusterGroup;
 
         /** Invoker binary flag. */
@@ -229,17 +239,34 @@ namespace Apache.Ignite.Core.Impl.Services
         /** <inheritDoc /> */
         public void Deploy(ServiceConfiguration configuration)
         {
-            IgniteArgumentCheck.NotNull(configuration, "configuration");
+            ValidateConfiguration(configuration, "configuration");
 
-            DoOutInOp(OpDeploy, w => WriteServiceConfiguration(configuration, w), ReadDeploymentResult);
+            DoOutInOp(OpDeploy, w => configuration.Write(w), ReadDeploymentResult);
         }
 
         /** <inheritDoc /> */
         public Task DeployAsync(ServiceConfiguration configuration)
         {
-            IgniteArgumentCheck.NotNull(configuration, "configuration");
+            ValidateConfiguration(configuration, "configuration");
+
+            return DoOutOpAsync(OpDeployAsync, w => configuration.Write(w), 
+                _keepBinary, ReadDeploymentResult);
+        }
+
+        /** <inheritDoc /> */
+        public void DeployAll(IEnumerable<ServiceConfiguration> configurations)
+        {
+            IgniteArgumentCheck.NotNull(configurations, "configurations");
 
-            return DoOutOpAsync(OpDeployAsync, w => WriteServiceConfiguration(configuration, w), 
+            DoOutInOp(OpDeployAll, w => SerializeConfigurations(configurations, w), ReadDeploymentResult);
+        }
+
+        /** <inheritDoc /> */
+        public Task DeployAllAsync(IEnumerable<ServiceConfiguration> configurations)
+        {
+            IgniteArgumentCheck.NotNull(configurations, "configurations");
+            return DoOutOpAsync(OpDeployAllAsync, w => SerializeConfigurations(configurations, w),
                 _keepBinary, ReadDeploymentResult);
         }
 
@@ -381,27 +408,6 @@ namespace Apache.Ignite.Core.Impl.Services
         }
 
         /// <summary>
-        /// Writes the service configuration.
-        /// </summary>
-        private static void WriteServiceConfiguration(ServiceConfiguration configuration, IBinaryRawWriter w)
-        {
-            Debug.Assert(configuration != null);
-            Debug.Assert(w != null);
-
-            w.WriteString(configuration.Name);
-            w.WriteObject(configuration.Service);
-            w.WriteInt(configuration.TotalCount);
-            w.WriteInt(configuration.MaxPerNodeCount);
-            w.WriteString(configuration.CacheName);
-            w.WriteObject(configuration.AffinityKey);
-
-            if (configuration.NodeFilter != null)
-                w.WriteObject(configuration.NodeFilter);
-            else
-                w.WriteObject<object>(null);
-        }
-
-        /// <summary>
         /// Reads the deployment result.
         /// </summary>
         private object ReadDeploymentResult(BinaryReader r)
@@ -417,5 +423,43 @@ namespace Apache.Ignite.Core.Impl.Services
             ServiceProxySerializer.ReadDeploymentResult(s, Marshaller, _keepBinary);
             return null;
         }
+
+        /// <summary>
+        /// Performs ServiceConfiguration validation.
+        /// </summary>
+        /// <param name="configuration">Service configuration</param>
+        /// <param name="argName">argument name</param>
+        private static void ValidateConfiguration(ServiceConfiguration configuration, string argName)
+        {
+            IgniteArgumentCheck.NotNull(configuration, argName);
+            IgniteArgumentCheck.NotNullOrEmpty(configuration.Name, string.Format("{0}.Name", argName));
+            IgniteArgumentCheck.NotNull(configuration.Service, string.Format("{0}.Service", argName));
+        }
+
+        /// <summary>
+        /// Writes a collection of service configurations using passed BinaryWriter
+        /// Also it performs basic validation of each service configuration and could throw exceptions
+        /// </summary>
+        /// <param name="configurations">a collection of service configurations </param>
+        /// <param name="writer">Binary Writer</param>
+        private static void SerializeConfigurations(IEnumerable<ServiceConfiguration> configurations, 
+            BinaryWriter writer)
+        {
+            var pos = writer.Stream.Position;
+            writer.WriteInt(0);  // Reserve count.
+
+            var cnt = 0;
+
+            foreach (var cfg in configurations)
+            {
+                ValidateConfiguration(cfg, string.Format("configurations[{0}]", cnt));
+                cfg.Write(writer);
+                cnt++;
+            }
+
+            IgniteArgumentCheck.Ensure(cnt > 0, "configurations", "empty collection");
+
+            writer.Stream.WriteInt(pos, cnt);
+        }
     }
 }
index ac9b4d9..bcbd4fb 100644 (file)
@@ -156,6 +156,38 @@ namespace Apache.Ignite.Core.Services
         Task DeployAsync(ServiceConfiguration configuration);
 
         /// <summary>
+        /// Deploys multiple services described by provided configurations. Depending on specified parameters, 
+        /// multiple instances of the same service may be deployed. Whenever topology changes,
+        /// Ignite will automatically rebalance the deployed services within cluster to make sure that each node
+        /// will end up with about equal number of deployed instances whenever possible.
+        /// <para/>
+        /// If deployment of some of the provided services fails, then <see cref="ServiceDeploymentException"/> 
+        /// containing a list of failed service configurations 
+        /// (<see cref="ServiceDeploymentException.FailedConfigurations"/>) will be thrown. It is guaranteed that all 
+        /// services  that were provided to this method and are not present in the list of failed services are 
+        /// successfully deployed by the moment of the exception being thrown.
+        /// Note that if exception is thrown, then partial deployment may have occurred.
+        /// </summary>
+        /// <param name="configurations">Collection of service configurations to be deployed.</param>
+        void DeployAll(IEnumerable<ServiceConfiguration> configurations);
+
+        /// <summary>
+        /// Asynchronously deploys multiple services described by provided configurations. Depending on specified 
+        /// parameters, multiple instances of the same service may be deployed (<see cref="ServiceConfiguration"/>).
+        /// Whenever topology changes, Ignite will automatically rebalance the deployed services within cluster to make
+        /// sure that each node  will end up with about equal number of deployed instances whenever possible.
+        /// <para/>
+        /// If deployment of some of the provided services fails, then <see cref="ServiceDeploymentException"/> 
+        /// containing a list of failed service configurations 
+        /// (<see cref="ServiceDeploymentException.FailedConfigurations"/>) will be thrown. It is guaranteed that all 
+        /// services, that were provided to this method and are not present in the list of failed services, are 
+        /// successfully deployed by the moment of the exception being thrown.
+        /// Note that if exception is thrown, then partial deployment may have occurred.
+        /// </summary>
+        /// <param name="configurations">Collection of service configurations to be deployed.</param>
+        Task DeployAllAsync(IEnumerable<ServiceConfiguration> configurations);
+
+        /// <summary>
         /// Cancels service deployment. If a service with specified name was deployed on the grid,
         /// then <see cref="IService.Cancel"/> method will be called on it.
         /// <para/>
index e91656f..a7b9e7f 100644 (file)
 
 namespace Apache.Ignite.Core.Services
 {
+    using System;
+    using System.Diagnostics;
+    using System.Diagnostics.CodeAnalysis;
+    using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cluster;
 
     /// <summary>
     /// Service configuration.
     /// </summary>
+    [Serializable]
     public class ServiceConfiguration
     {
         /// <summary>
@@ -57,6 +62,69 @@ namespace Apache.Ignite.Core.Services
         /// <summary>
         /// Gets or sets node filter used to filter nodes on which the service will be deployed.
         /// </summary>
-        public IClusterNodeFilter NodeFilter { get; set; } 
+        public IClusterNodeFilter NodeFilter { get; set; }
+
+        /// <summary>
+        /// Serializes the Service configuration using IBinaryRawWriter
+        /// </summary>
+        /// <param name="w">IBinaryRawWriter</param>
+        internal void Write(IBinaryRawWriter w)
+        {
+            Debug.Assert(w != null);
+
+            w.WriteString(Name);
+            w.WriteObject(Service);
+            w.WriteInt(TotalCount);
+            w.WriteInt(MaxPerNodeCount);
+            w.WriteString(CacheName);
+            w.WriteObject(AffinityKey);
+
+            if (NodeFilter != null)
+                w.WriteObject(NodeFilter);
+            else
+                w.WriteObject<object>(null);
+        }
+
+        /// <summary>
+        /// Default constructor
+        /// </summary>
+        public ServiceConfiguration()
+        {
+            // No-op.
+        }
+
+        /// <summary>
+        /// Deserialization constructor. Used to collect FailedConfigurations during ServiceDeploymentException 
+        /// </summary>
+        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
+        internal ServiceConfiguration(IBinaryRawReader r)
+        {
+            Debug.Assert(r != null);
+
+            Name = r.ReadString();
+
+            try
+            {
+                Service = r.ReadObject<IService>();
+            }
+            catch (Exception)
+            {
+                // Ignore exceptions in user deserealization code.
+            }
+
+            TotalCount = r.ReadInt();
+            MaxPerNodeCount = r.ReadInt();
+            CacheName = r.ReadString();
+            AffinityKey = r.ReadObject<object>();
+
+            try
+            {
+                NodeFilter = r.ReadObject<IClusterNodeFilter>();
+            }
+            catch (Exception)
+            {
+                // Ignore exceptions in user deserealization code.
+            }
+        }
     }
 }
\ No newline at end of file
index 825f91e..6a4b18f 100644 (file)
@@ -18,6 +18,7 @@
 namespace Apache.Ignite.Core.Services
 {
     using System;
+    using System.Collections.Generic;
     using System.Diagnostics.CodeAnalysis;
     using System.Runtime.Serialization;
     using Apache.Ignite.Core.Binary;
@@ -29,12 +30,18 @@ namespace Apache.Ignite.Core.Services
     [Serializable]
     public class ServiceDeploymentException : IgniteException
     {
-        /** Serializer key. */
+        /** Serializer key for BinaryCause. */
         private const string KeyBinaryCause = "BinaryCause";
 
+        /** Serializer key for Failed Configurations. */
+        private const string KeyFailedConfigurations = "FailedConfigurations";
+
         /** Cause. */
         private readonly IBinaryObject _binaryCause;
 
+        /** Configurations of services that failed to deploy */
+        private readonly ICollection<ServiceConfiguration> _failedCfgs;
+
         /// <summary>
         /// Initializes a new instance of the <see cref="ServiceDeploymentException"/> class.
         /// </summary>
@@ -63,14 +70,28 @@ namespace Apache.Ignite.Core.Services
         }
 
         /// <summary>
+        /// Initializes a new instance of the <see cref="ServiceDeploymentException"/> class with failed configurations.
+        /// </summary>
+        /// <param name="message">The message.</param>
+        /// <param name="cause">The cause.</param>
+        /// <param name="failedCfgs">List of failed configurations</param>
+        public ServiceDeploymentException(string message, Exception cause, ICollection<ServiceConfiguration> failedCfgs) 
+            : base(message, cause)
+        {
+            _failedCfgs = failedCfgs;
+        }
+
+        /// <summary>
         /// Initializes a new instance of the <see cref="ServiceDeploymentException"/> class.
         /// </summary>
         /// <param name="message">The message.</param>
         /// <param name="binaryCause">The binary cause.</param>
-        public ServiceDeploymentException(string message, IBinaryObject binaryCause)
-            : base(message)
+        /// <param name="failedCfgs">List of failed configurations</param>
+        public ServiceDeploymentException(string message, IBinaryObject binaryCause,
+            ICollection<ServiceConfiguration> failedCfgs) : base(message)
         {
             _binaryCause = binaryCause;
+            _failedCfgs = failedCfgs;
         }
 
         /// <summary>
@@ -82,6 +103,8 @@ namespace Apache.Ignite.Core.Services
             : base(info, ctx)
         {
             _binaryCause = (IBinaryObject)info.GetValue(KeyBinaryCause, typeof(IBinaryObject));
+            _failedCfgs = (ICollection<ServiceConfiguration>)info.GetValue(KeyFailedConfigurations, 
+                typeof(ICollection<ServiceConfiguration>));
         }
 
         /// <summary>
@@ -104,8 +127,17 @@ namespace Apache.Ignite.Core.Services
         public override void GetObjectData(SerializationInfo info, StreamingContext context)
         {
             info.AddValue(KeyBinaryCause, _binaryCause);
+            info.AddValue(KeyFailedConfigurations, _failedCfgs);
 
             base.GetObjectData(info, context);
         }
+
+        /// <summary>
+        /// Configurations of services that failed to deploy, could be null
+        /// </summary>
+        public ICollection<ServiceConfiguration> FailedConfigurations
+        {
+            get { return _failedCfgs; }
+        }
     }
 }
\ No newline at end of file