Support multiple shared links (#2457)
authoryizhenqiang <manzhizhen@163.com>
Sun, 3 Feb 2019 08:35:21 +0000 (16:35 +0800)
committerIan Luo <ian.luo@gmail.com>
Sun, 3 Feb 2019 08:35:21 +0000 (16:35 +0800)
* make dubbo support multiple shared links, upgrading RPC throughput

* Fix compilation error

* Fix compilation error

* opti import

* if add {}

* checkstyle fail

* fix getSharedClient referenceCount calculation error bug

* 优化 import

* Fix the problem that the getSharedClient thread is not safe

* Fix the problem that the getSharedClient thread is not safe

* Try fixing ci error, https://travis-ci.org/apache/incubator-dubbo/jobs/453185295

* 将DEFAULT_CONNECTIONS_KEY修改成SERVICE_CONNECTIONS_KEY

* dubbo.xsd add shareconnections attribute,

* Optimize code format

* Fix mult connect ghost connect  problem

* format code

* Remove the concept of ghostClientMap and ghost connection. In fact, ghostClient is LazyConnectExchangeClient. At present, the LazyConnectExchangeClient object is added directly in ReferenceCountExchangeClient to realize the mapping relationship with ReferenceCountExchangeClient. The relationship between previous ghostClient and url mapping is not applicable to the current new share. Multiple connections.

* Optimize the ReferenceCountExchangeClient and remove the reference to the lazyConnectExchangeClient because it doesn't make much sense; add locks in the close operation of the AbstractClient, because connect, disconnect, and close should not be done at the same time.

* format code

* try remove close lock

* Restore close method

* Restore ReferenceCountExchangeClient reference to LazyConnectExchangeClient object

* Optimize the logic of using the LazyConnectExchangeClient inside the ReferenceCountExchangeClient; Supplemental shared multi-connected unit test

dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ConsumerConfig.java
dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java

index e5ac175..1180731 100644 (file)
@@ -146,7 +146,13 @@ public class Constants {
 
     public static final int DEFAULT_ALIVE = 60 * 1000;
 
-    public static final int DEFAULT_CONNECTIONS = 0;
+    /**
+     * By default, a consumer JVM instance and a provider JVM instance share a long TCP connection (except when connections are set),
+     * which can set the number of long TCP connections shared to avoid the bottleneck of sharing a single long TCP connection.
+     */
+    public static final String DEFAULT_SHARE_CONNECTIONS = "1";
+
+    public static final String SHARE_CONNECTIONS_KEY = "shareconnections";
 
     public static final int DEFAULT_ACCEPTS = 0;
 
index ce10655..3a556e7 100644 (file)
@@ -57,6 +57,12 @@ public class ConsumerConfig extends AbstractReferenceConfig {
      */\r
     private Integer queues;\r
 \r
+    /**\r
+     * By default, a TCP long-connection communication is shared between the consumer process and the provider process.\r
+     * This property can be set to share multiple TCP long-connection communications. Note that only the dubbo protocol takes effect.\r
+     */\r
+    private Integer shareconnections;\r
+\r
     @Override\r
     public void setTimeout(Integer timeout) {\r
         super.setTimeout(timeout);\r
@@ -118,4 +124,12 @@ public class ConsumerConfig extends AbstractReferenceConfig {
     public void setQueues(Integer queues) {\r
         this.queues = queues;\r
     }\r
+\r
+    public Integer getShareconnections() {\r
+        return shareconnections;\r
+    }\r
+\r
+    public void setShareconnections(Integer shareconnections) {\r
+        this.shareconnections = shareconnections;\r
+    }\r
 }\r
index c5b890c..41af239 100644 (file)
                         <xsd:documentation><![CDATA[ The thread pool queue size. ]]></xsd:documentation>\r
                     </xsd:annotation>\r
                 </xsd:attribute>\r
+                <xsd:attribute name="shareconnections" type="xsd:string">\r
+                    <xsd:annotation>\r
+                        <xsd:documentation>\r
+                            <![CDATA[ The default share connections. default share one connection. ]]></xsd:documentation>\r
+                    </xsd:annotation>\r
+                </xsd:attribute>\r
                 <xsd:anyAttribute namespace="##other" processContents="lax"/>\r
             </xsd:extension>\r
         </xsd:complexContent>\r
index 4138398..2afdc4d 100644 (file)
@@ -174,17 +174,22 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
     }
 
     protected void connect() throws RemotingException {
+
         connectLock.lock();
+
         try {
+
             if (isConnected()) {
                 return;
             }
 
             doConnect();
+
             if (!isConnected()) {
                 throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                         + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                         + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
+
             } else {
                 if (logger.isInfoEnabled()) {
                     logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
@@ -192,12 +197,15 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
                             + ", channel is " + this.getChannel());
                 }
             }
+
         } catch (RemotingException e) {
             throw e;
+
         } catch (Throwable e) {
             throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                     + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                     + ", cause: " + e.getMessage(), e);
+
         } finally {
             connectLock.unlock();
         }
@@ -241,11 +249,13 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
 
     @Override
     public void close() {
+
         try {
             super.close();
         } catch (Throwable e) {
             logger.warn(e.getMessage(), e);
         }
+
         try {
             if (executor != null) {
                 ExecutorUtil.shutdownNow(executor, 100);
@@ -253,11 +263,13 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
         } catch (Throwable e) {
             logger.warn(e.getMessage(), e);
         }
+
         try {
             disconnect();
         } catch (Throwable e) {
             logger.warn(e.getMessage(), e);
         }
+
         try {
             doClose();
         } catch (Throwable e) {
@@ -310,5 +322,4 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
      * @return channel
      */
     protected abstract Channel getChannel();
-
 }
index e9f42dd..f284dd4 100644 (file)
@@ -22,7 +22,9 @@ import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.serialize.support.SerializableClassRegistry;
 import org.apache.dubbo.common.serialize.support.SerializationOptimizer;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.ConfigUtils;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Channel;
@@ -49,11 +51,13 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * dubbo protocol support.
@@ -65,6 +69,7 @@ public class DubboProtocol extends AbstractProtocol {
     public static final int DEFAULT_PORT = 20880;
     private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke";
     private static DubboProtocol INSTANCE;
+
     /**
      * <host:port,Exchanger>
      */
@@ -72,8 +77,7 @@ public class DubboProtocol extends AbstractProtocol {
     /**
      * <host:port,Exchanger>
      */
-    private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<>();
+    private final Map<String, List<ReferenceCountExchangeClient>> referenceClientMap = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<>();
     private final Set<String> optimizers = new ConcurrentHashSet<>();
     /**
@@ -81,55 +85,60 @@ public class DubboProtocol extends AbstractProtocol {
      * servicekey-stubmethods
      */
     private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<>();
+
     private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
 
         @Override
         public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
-            if (message instanceof Invocation) {
-                Invocation inv = (Invocation) message;
-                Invoker<?> invoker = getInvoker(channel, inv);
-                // need to consider backward-compatibility if it's a callback
-                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
-                    String methodsStr = invoker.getUrl().getParameters().get("methods");
-                    boolean hasMethod = false;
-                    if (methodsStr == null || !methodsStr.contains(",")) {
-                        hasMethod = inv.getMethodName().equals(methodsStr);
-                    } else {
-                        String[] methods = methodsStr.split(",");
-                        for (String method : methods) {
-                            if (inv.getMethodName().equals(method)) {
-                                hasMethod = true;
-                                break;
-                            }
+
+            if (!(message instanceof Invocation)) {
+                throw new RemotingException(channel, "Unsupported request: "
+                        + (message == null ? null : (message.getClass().getName() + ": " + message))
+                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
+            }
+
+            Invocation inv = (Invocation) message;
+            Invoker<?> invoker = getInvoker(channel, inv);
+            // need to consider backward-compatibility if it's a callback
+            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
+                String methodsStr = invoker.getUrl().getParameters().get("methods");
+                boolean hasMethod = false;
+                if (methodsStr == null || !methodsStr.contains(",")) {
+                    hasMethod = inv.getMethodName().equals(methodsStr);
+                } else {
+                    String[] methods = methodsStr.split(",");
+                    for (String method : methods) {
+                        if (inv.getMethodName().equals(method)) {
+                            hasMethod = true;
+                            break;
                         }
                     }
-                    if (!hasMethod) {
-                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
-                                + " not found in callback service interface ,invoke will be ignored."
-                                + " please update the api interface. url is:"
-                                + invoker.getUrl()) + " ,invocation is :" + inv);
-                        return null;
-                    }
                 }
-                RpcContext rpcContext = RpcContext.getContext();
-                rpcContext.setRemoteAddress(channel.getRemoteAddress());
-                Result result = invoker.invoke(inv);
-
-                if (result instanceof AsyncRpcResult) {
-                    return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
-                } else {
-                    return CompletableFuture.completedFuture(result);
+                if (!hasMethod) {
+                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+                            + " not found in callback service interface ,invoke will be ignored."
+                            + " please update the api interface. url is:"
+                            + invoker.getUrl()) + " ,invocation is :" + inv);
+                    return null;
                 }
             }
-            throw new RemotingException(channel, "Unsupported request: "
-                    + (message == null ? null : (message.getClass().getName() + ": " + message))
-                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
+            RpcContext rpcContext = RpcContext.getContext();
+            rpcContext.setRemoteAddress(channel.getRemoteAddress());
+            Result result = invoker.invoke(inv);
+
+            if (result instanceof AsyncRpcResult) {
+                return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
+
+            } else {
+                return CompletableFuture.completedFuture(result);
+            }
         }
 
         @Override
         public void received(Channel channel, Object message) throws RemotingException {
             if (message instanceof Invocation) {
                 reply((ExchangeChannel) channel, message);
+
             } else {
                 super.received(channel, message);
             }
@@ -164,6 +173,7 @@ public class DubboProtocol extends AbstractProtocol {
             if (method == null || method.length() == 0) {
                 return null;
             }
+
             RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
             invocation.setAttachment(Constants.PATH_KEY, url.getPath());
             invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
@@ -172,6 +182,7 @@ public class DubboProtocol extends AbstractProtocol {
             if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                 invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
             }
+
             return invocation;
         }
     };
@@ -185,6 +196,7 @@ public class DubboProtocol extends AbstractProtocol {
             // load
             ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME);
         }
+
         return INSTANCE;
     }
 
@@ -213,24 +225,26 @@ public class DubboProtocol extends AbstractProtocol {
         boolean isStubServiceInvoke = false;
         int port = channel.getLocalAddress().getPort();
         String path = inv.getAttachments().get(Constants.PATH_KEY);
+
         // if it's callback service on client side
         isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
         if (isStubServiceInvoke) {
             port = channel.getRemoteAddress().getPort();
         }
+
         //callback
         isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
         if (isCallBackServiceInvoke) {
             path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
             inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
         }
-        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
 
+        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
         DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
 
         if (exporter == null) {
-            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " +
-                    exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
+            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
+                    ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
         }
 
         return exporter.getInvoker();
@@ -264,6 +278,7 @@ public class DubboProtocol extends AbstractProtocol {
                     logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                             "], has set stubproxy support event ,but no stub methods founded."));
                 }
+
             } else {
                 stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
             }
@@ -271,6 +286,7 @@ public class DubboProtocol extends AbstractProtocol {
 
         openServer(url);
         optimizeSerialization(url);
+
         return exporter;
     }
 
@@ -313,6 +329,7 @@ public class DubboProtocol extends AbstractProtocol {
         } catch (RemotingException e) {
             throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
         }
+
         str = url.getParameter(Constants.CLIENT_KEY);
         if (str != null && str.length() > 0) {
             Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
@@ -320,6 +337,7 @@ public class DubboProtocol extends AbstractProtocol {
                 throw new RpcException("Unsupported client type: " + str);
             }
         }
+
         return server;
     }
 
@@ -348,10 +366,13 @@ public class DubboProtocol extends AbstractProtocol {
             }
 
             optimizers.add(className);
+
         } catch (ClassNotFoundException e) {
             throw new RpcException("Cannot find the serialization optimizer class: " + className, e);
+
         } catch (InstantiationException e) {
             throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
+
         } catch (IllegalAccessException e) {
             throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
         }
@@ -360,65 +381,173 @@ public class DubboProtocol extends AbstractProtocol {
     @Override
     public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
         optimizeSerialization(url);
+
         // create rpc invoker.
         DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
         invokers.add(invoker);
+
         return invoker;
     }
 
     private ExchangeClient[] getClients(URL url) {
         // whether to share connection
-        boolean serviceShareConnect = false;
+
+        boolean useShareConnect = false;
+
         int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
+        List<ReferenceCountExchangeClient> shareClients = null;
         // if not configured, connection is shared, otherwise, one connection for one service
         if (connections == 0) {
-            serviceShareConnect = true;
-            connections = 1;
+            useShareConnect = true;
+
+            /**
+             * The xml configuration should have a higher priority than properties.
+             */
+            String shareConnectionsStr = url.getParameter(Constants.SHARE_CONNECTIONS_KEY, (String) null);
+            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(Constants.SHARE_CONNECTIONS_KEY,
+                    Constants.DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
+            shareClients = getSharedClient(url, connections);
         }
 
         ExchangeClient[] clients = new ExchangeClient[connections];
         for (int i = 0; i < clients.length; i++) {
-            if (serviceShareConnect) {
-                clients[i] = getSharedClient(url);
+            if (useShareConnect) {
+                clients[i] = shareClients.get(i);
+
             } else {
                 clients[i] = initClient(url);
             }
         }
+
         return clients;
     }
 
     /**
      * Get shared connection
+     *
+     * @param url
+     * @param connectNum connectNum must be greater than or equal to 1
      */
-    private ExchangeClient getSharedClient(URL url) {
+    private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
         String key = url.getAddress();
-        ReferenceCountExchangeClient client = referenceClientMap.get(key);
-        if (client != null) {
-            if (!client.isClosed()) {
-                client.incrementAndGetCount();
-                return client;
-            } else {
-                referenceClientMap.remove(key);
-            }
+        List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
+
+        if (checkClientCanUse(clients)) {
+            batchClientRefIncr(clients);
+            return clients;
         }
 
         locks.putIfAbsent(key, new Object());
         synchronized (locks.get(key)) {
-            if (referenceClientMap.containsKey(key)) {
-                return referenceClientMap.get(key);
+            clients = referenceClientMap.get(key);
+            // dubbo check
+            if (checkClientCanUse(clients)) {
+                batchClientRefIncr(clients);
+                return clients;
             }
 
-            ExchangeClient exchangeClient = initClient(url);
-            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
-            referenceClientMap.put(key, client);
-            ghostClientMap.remove(key);
+            // connectNum must be greater than or equal to 1
+            connectNum = Math.max(connectNum, 1);
+
+            // If the clients is empty, then the first initialization is
+            if (CollectionUtils.isEmpty(clients)) {
+                clients = buildReferenceCountExchangeClientList(url, connectNum);
+                referenceClientMap.put(key, clients);
+
+            } else {
+                for (int i = 0; i < clients.size(); i++) {
+                    ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
+                    // If there is a client in the list that is no longer available, create a new one to replace him.
+                    if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
+                        clients.set(i, buildReferenceCountExchangeClient(url));
+                        continue;
+                    }
+
+                    referenceCountExchangeClient.incrementAndGetCount();
+                }
+            }
+
+            /**
+             * I understand that the purpose of the remove operation here is to avoid the expired url key
+             * always occupying this memory space.
+             */
             locks.remove(key);
-            return client;
+
+            return clients;
+        }
+    }
+
+    /**
+     * Check if the client list is all available
+     *
+     * @param referenceCountExchangeClients
+     * @return true-available,false-unavailable
+     */
+    private boolean checkClientCanUse(List<ReferenceCountExchangeClient> referenceCountExchangeClients) {
+        if (CollectionUtils.isEmpty(referenceCountExchangeClients)) {
+            return false;
+        }
+
+        for (ReferenceCountExchangeClient referenceCountExchangeClient : referenceCountExchangeClients) {
+            // As long as one client is not available, you need to replace the unavailable client with the available one.
+            if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
+                return false;
+            }
         }
+
+        return true;
+    }
+
+    /**
+     * Add client references in bulk
+     *
+     * @param referenceCountExchangeClients
+     */
+    private void batchClientRefIncr(List<ReferenceCountExchangeClient> referenceCountExchangeClients) {
+        if (CollectionUtils.isEmpty(referenceCountExchangeClients)) {
+            return;
+        }
+
+        for (ReferenceCountExchangeClient referenceCountExchangeClient : referenceCountExchangeClients) {
+            if (referenceCountExchangeClient != null) {
+                referenceCountExchangeClient.incrementAndGetCount();
+            }
+        }
+    }
+
+    /**
+     * Bulk build client
+     *
+     * @param url
+     * @param connectNum
+     * @return
+     */
+    private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
+        List<ReferenceCountExchangeClient> clients = new CopyOnWriteArrayList<>();
+
+        for (int i = 0; i < connectNum; i++) {
+            clients.add(buildReferenceCountExchangeClient(url));
+        }
+
+        return clients;
+    }
+
+    /**
+     * Build a single client
+     *
+     * @param url
+     * @return
+     */
+    private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
+        ExchangeClient exchangeClient = initClient(url);
+
+        return new ReferenceCountExchangeClient(exchangeClient);
     }
 
     /**
      * Create new connection
+     *
+     * @param url
      */
     private ExchangeClient initClient(URL url) {
 
@@ -440,12 +569,15 @@ public class DubboProtocol extends AbstractProtocol {
             // connection should be lazy
             if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                 client = new LazyConnectExchangeClient(url, requestHandler);
+
             } else {
                 client = Exchangers.connect(url, requestHandler);
             }
+
         } catch (RemotingException e) {
             throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
         }
+
         return client;
     }
 
@@ -453,46 +585,64 @@ public class DubboProtocol extends AbstractProtocol {
     public void destroy() {
         for (String key : new ArrayList<>(serverMap.keySet())) {
             ExchangeServer server = serverMap.remove(key);
-            if (server != null) {
-                try {
-                    if (logger.isInfoEnabled()) {
-                        logger.info("Close dubbo server: " + server.getLocalAddress());
-                    }
-                    server.close(ConfigurationUtils.getServerShutdownTimeout());
-                } catch (Throwable t) {
-                    logger.warn(t.getMessage(), t);
+
+            if (server == null) {
+                continue;
+            }
+
+            try {
+                if (logger.isInfoEnabled()) {
+                    logger.info("Close dubbo server: " + server.getLocalAddress());
                 }
+
+                server.close(ConfigurationUtils.getServerShutdownTimeout());
+
+            } catch (Throwable t) {
+                logger.warn(t.getMessage(), t);
             }
         }
 
         for (String key : new ArrayList<>(referenceClientMap.keySet())) {
-            ExchangeClient client = referenceClientMap.remove(key);
-            if (client != null) {
-                try {
-                    if (logger.isInfoEnabled()) {
-                        logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
-                    }
-                    client.close(ConfigurationUtils.getServerShutdownTimeout());
-                } catch (Throwable t) {
-                    logger.warn(t.getMessage(), t);
-                }
+            List<ReferenceCountExchangeClient> clients = referenceClientMap.remove(key);
+
+            if (CollectionUtils.isEmpty(clients)) {
+                continue;
             }
-        }
 
-        for (String key : new ArrayList<>(ghostClientMap.keySet())) {
-            ExchangeClient client = ghostClientMap.remove(key);
-            if (client != null) {
-                try {
-                    if (logger.isInfoEnabled()) {
-                        logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
-                    }
-                    client.close(ConfigurationUtils.getServerShutdownTimeout());
-                } catch (Throwable t) {
-                    logger.warn(t.getMessage(), t);
-                }
+            for (ReferenceCountExchangeClient client : clients) {
+                closeReferenceCountExchangeClient(client);
             }
         }
+
         stubServiceMethodsMap.clear();
         super.destroy();
     }
+
+    /**
+     * close ReferenceCountExchangeClient
+     *
+     * @param client
+     */
+    private void closeReferenceCountExchangeClient(ReferenceCountExchangeClient client) {
+        if (client == null) {
+            return;
+        }
+
+        try {
+            if (logger.isInfoEnabled()) {
+                logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
+            }
+
+            client.close(ConfigurationUtils.getServerShutdownTimeout());
+
+            // TODO
+            /**
+             * At this time, ReferenceCountExchangeClient#client has been replaced with LazyConnectExchangeClient.
+             * Do you need to call client.close again to ensure that LazyConnectExchangeClient is also closed?
+             */
+
+        } catch (Throwable t) {
+            logger.warn(t.getMessage(), t);
+        }
+    }
 }
index eaebb19..f2bc453 100644 (file)
@@ -65,7 +65,6 @@ final class LazyConnectExchangeClient implements ExchangeClient {
         this.requestWithWarning = url.getParameter(REQUEST_WITH_WARNING_KEY, false);
     }
 
-
     private void initClient() throws RemotingException {
         if (client != null) {
             return;
index 8347115..7a72048 100644 (file)
@@ -26,7 +26,6 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
 import org.apache.dubbo.remoting.exchange.ResponseFuture;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -38,19 +37,12 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
     private final URL url;
     private final AtomicInteger referenceCount = new AtomicInteger(0);
 
-    //    private final ExchangeHandler handler;
-    private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap;
     private ExchangeClient client;
 
-
-    public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
+    public ReferenceCountExchangeClient(ExchangeClient client) {
         this.client = client;
         referenceCount.incrementAndGet();
         this.url = client.getUrl();
-        if (ghostClientMap == null) {
-            throw new IllegalStateException("ghostClientMap can not be null, url: " + url);
-        }
-        this.ghostClientMap = ghostClientMap;
     }
 
     @Override
@@ -151,10 +143,12 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
         if (referenceCount.decrementAndGet() <= 0) {
             if (timeout == 0) {
                 client.close();
+
             } else {
                 client.close(timeout);
             }
-            client = replaceWithLazyClient();
+
+            replaceWithLazyClient();
         }
     }
 
@@ -163,8 +157,13 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
         client.startClose();
     }
 
-    // ghost client
-    private LazyConnectExchangeClient replaceWithLazyClient() {
+    /**
+     * when closing the client, the client needs to be set to LazyConnectExchangeClient, and if a new call is made,
+     * the client will "resurrect".
+     *
+     * @return
+     */
+    private void replaceWithLazyClient() {
         // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false
         URL lazyUrl = url.addParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.FALSE)
                 .addParameter(Constants.RECONNECT_KEY, Boolean.FALSE)
@@ -173,14 +172,12 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
                 .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true)
                 .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient");
 
-        String key = url.getAddress();
-        // in worst case there's only one ghost connection.
-        LazyConnectExchangeClient gclient = ghostClientMap.get(key);
-        if (gclient == null || gclient.isClosed()) {
-            gclient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler());
-            ghostClientMap.put(key, gclient);
+        /**
+         * the order of judgment in the if statement cannot be changed.
+         */
+        if (!(client instanceof LazyConnectExchangeClient) || client.isClosed()) {
+            client = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler());
         }
-        return gclient;
     }
 
     @Override
@@ -192,3 +189,4 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
         referenceCount.incrementAndGet();
     }
 }
+
index c34f6d9..b77e5e1 100644 (file)
@@ -27,7 +27,6 @@ import org.apache.dubbo.rpc.Exporter;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.ProxyFactory;
 import org.apache.dubbo.rpc.protocol.dubbo.support.ProtocolUtils;
-
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -35,6 +34,12 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
 
 public class ReferenceCountExchangeClientTest {
 
@@ -80,7 +85,7 @@ public class ReferenceCountExchangeClientTest {
      */
     @Test
     public void test_share_connect() {
-        init(0);
+        init(0, 1);
         Assertions.assertEquals(demoClient.getLocalAddress(), helloClient.getLocalAddress());
         Assertions.assertEquals(demoClient, helloClient);
         destoy();
@@ -91,18 +96,43 @@ public class ReferenceCountExchangeClientTest {
      */
     @Test
     public void test_not_share_connect() {
-        init(1);
+        init(1, 1);
         Assertions.assertNotSame(demoClient.getLocalAddress(), helloClient.getLocalAddress());
         Assertions.assertNotSame(demoClient, helloClient);
         destoy();
     }
 
     /**
+     * test using multiple shared connections
+     */
+    @Test
+    public void test_mult_share_connect() {
+        // here a three shared connection is established between a consumer process and a provider process.
+        final int shareConnectionNum = 3;
+
+        init(0, shareConnectionNum);
+
+        List<ReferenceCountExchangeClient> helloReferenceClientList = getReferenceClientList(helloServiceInvoker);
+        Assertions.assertEquals(shareConnectionNum, helloReferenceClientList.size());
+
+        List<ReferenceCountExchangeClient> demoReferenceClientList = getReferenceClientList(demoServiceInvoker);
+        Assertions.assertEquals(shareConnectionNum, demoReferenceClientList.size());
+
+        // because helloServiceInvoker and demoServiceInvoker use share connect, so client list must be equal
+        Assertions.assertTrue(Objects.equals(helloReferenceClientList, demoReferenceClientList));
+
+        Assertions.assertEquals(demoClient.getLocalAddress(), helloClient.getLocalAddress());
+        Assertions.assertEquals(demoClient, helloClient);
+
+        destoy();
+    }
+
+    /**
      * test counter won't count down incorrectly when invoker is destroyed for multiple times
      */
     @Test
     public void test_multi_destory() {
-        init(0);
+        init(0, 1);
         DubboAppender.doStart();
         DubboAppender.clear();
         demoServiceInvoker.destroy();
@@ -119,16 +149,19 @@ public class ReferenceCountExchangeClientTest {
      */
     @Test
     public void test_counter_error() {
-        init(0);
+        init(0, 1);
         DubboAppender.doStart();
         DubboAppender.clear();
 
+        // because the two interfaces are initialized, the ReferenceCountExchangeClient reference counter is 2
         ReferenceCountExchangeClient client = getReferenceClient(helloServiceInvoker);
+
         // close once, counter counts down from 2 to 1, no warning occurs
         client.close();
         Assertions.assertEquals("hello", helloService.hello());
         Assertions.assertEquals(0, LogUtil.findMessage(errorMsg), "should not warning message");
-        // counter is incorrect, invocation still succeeds
+
+        // generally a client can only be closed once, here it is closed twice, counter is incorrect
         client.close();
 
         // wait close done.
@@ -138,6 +171,7 @@ public class ReferenceCountExchangeClientTest {
             Assertions.fail();
         }
 
+        // due to the effect of LazyConnectExchangeClient, the client will be "revived" whenever there is a call.
         Assertions.assertEquals("hello", helloService.hello());
         Assertions.assertEquals(1, LogUtil.findMessage(errorMsg), "should warning message");
 
@@ -150,7 +184,17 @@ public class ReferenceCountExchangeClientTest {
         // status switch to available once invoke again
         Assertions.assertEquals(true, helloServiceInvoker.isAvailable(), "client status available");
 
+        /**
+         * This is the third time to close the same client. Under normal circumstances,
+         * a client value should be closed once (that is, the shutdown operation is irreversible).
+         * After closing, the value of the reference counter of the client has become -1.
+         *
+         * But this is a bit special, because after the client is closed twice, there are several calls to helloService,
+         * that is, the client inside the ReferenceCountExchangeClient is actually active, so the third shutdown here is still effective,
+         * let the resurrection After the client is really closed.
+         */
         client.close();
+
         // client has been replaced with lazy client. lazy client is fetched from referenceclientmap, and since it's
         // been invoked once, it's close status is false
         Assertions.assertEquals(false, client.isClosed(), "client status close");
@@ -159,10 +203,13 @@ public class ReferenceCountExchangeClientTest {
     }
 
     @SuppressWarnings("unchecked")
-    private void init(int connections) {
+    private void init(int connections, int shareConnections) {
+        Assertions.assertTrue(connections >= 0);
+        Assertions.assertTrue(shareConnections >= 1);
+
         int port = NetUtils.getAvailablePort();
-        URL demoUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/demo?" + Constants.CONNECTIONS_KEY + "=" + connections);
-        URL helloUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/hello?" + Constants.CONNECTIONS_KEY + "=" + connections);
+        URL demoUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/demo?" + Constants.CONNECTIONS_KEY + "=" + connections + "&" + Constants.SHARE_CONNECTIONS_KEY + "=" + shareConnections);
+        URL helloUrl = URL.valueOf("dubbo://127.0.0.1:" + port + "/hello?" + Constants.CONNECTIONS_KEY + "=" + connections + "&" + Constants.SHARE_CONNECTIONS_KEY + "=" + shareConnections);
 
         demoExporter = export(new DemoServiceImpl(), IDemoService.class, demoUrl);
         helloExporter = export(new HelloServiceImpl(), IHelloService.class, helloUrl);
@@ -204,17 +251,42 @@ public class ReferenceCountExchangeClientTest {
     }
 
     private ReferenceCountExchangeClient getReferenceClient(Invoker<?> invoker) {
-        return (ReferenceCountExchangeClient) getInvokerClient(invoker);
+        return getReferenceClientList(invoker).get(0);
+    }
+
+    private List<ReferenceCountExchangeClient> getReferenceClientList(Invoker<?> invoker) {
+        List<ExchangeClient> invokerClientList = getInvokerClientList(invoker);
+
+        List<ReferenceCountExchangeClient> referenceCountExchangeClientList = new ArrayList<>(invokerClientList.size());
+        for (ExchangeClient exchangeClient : invokerClientList) {
+            Assertions.assertTrue(exchangeClient instanceof ReferenceCountExchangeClient);
+            referenceCountExchangeClientList.add((ReferenceCountExchangeClient) exchangeClient);
+        }
+
+        return referenceCountExchangeClientList;
     }
 
     private ExchangeClient getInvokerClient(Invoker<?> invoker) {
+        return getInvokerClientList(invoker).get(0);
+    }
+
+    private List<ExchangeClient> getInvokerClientList(Invoker<?> invoker) {
         @SuppressWarnings("rawtypes")
         DubboInvoker dInvoker = (DubboInvoker) invoker;
         try {
             Field clientField = DubboInvoker.class.getDeclaredField("clients");
             clientField.setAccessible(true);
             ExchangeClient[] clients = (ExchangeClient[]) clientField.get(dInvoker);
-            return clients[0];
+
+            List<ExchangeClient> clientList = new ArrayList<ExchangeClient>(clients.length);
+            for (ExchangeClient client : clients) {
+                clientList.add(client);
+            }
+
+            // sorting makes it easy to compare between lists
+            Collections.sort(clientList, Comparator.comparing(c -> Integer.valueOf(Objects.hashCode(c))));
+
+            return clientList;
 
         } catch (Exception e) {
             e.printStackTrace();