IGNITE-10748 Remove dead code in TcpCommunicationSpi for the tcp client creation...
authorMaxim Muzafarov <maxmuzaf@gmail.com>
Thu, 14 Feb 2019 13:51:13 +0000 (16:51 +0300)
committerDmitriy Pavlov <dpavlov@apache.org>
Thu, 14 Feb 2019 13:51:13 +0000 (16:51 +0300)
Signed-off-by: Dmitriy Pavlov <dpavlov@apache.org>
modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/HandshakeException.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java
modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java

index 243f707..de029f7 100755 (executable)
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -143,6 +144,7 @@ import org.apache.ignite.spi.TimeoutStrategy;
 import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
+import org.apache.ignite.spi.communication.tcp.internal.HandshakeException;
 import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
 import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
@@ -2915,7 +2917,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             curClients0[connIdx] : null;
 
                         if (client0 == null) {
-                            client0 = createNioClient(node, connIdx);
+                            client0 = createCommunicationClient(node, connIdx);
 
                             if (client0 != null) {
                                 addNodeClient(node, connIdx, client0);
@@ -2989,7 +2991,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx)
+    @Nullable private GridCommunicationClient createCommunicationClient(ClusterNode node, int connIdx)
         throws IgniteCheckedException {
         assert node != null;
 
@@ -3275,9 +3277,45 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * @throws IgniteCheckedException If failed.
      */
     protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+        GridNioSession session = createNioSession(node, connIdx);
+
+        return session == null ?
+            null : new GridTcpNioCommunicationClient(connIdx, session, log);
+    }
+
+    /**
+     * Returns the established TCP/IP connection between the current node and remote server. A handshake process of
+     * negotiation between two communicating nodes will be performed before the {@link GridNioSession} created.
+     * <p>
+     *     The handshaking process contains of these steps:
+     *
+     *     <ol>
+     *         <li>The local node opens a new {@link SocketChannel} in the <em>blocking</em> mode.</li>
+     *         <li>The local node calls {@link SocketChannel#connect(SocketAddress)} to remote node.</li>
+     *         <li>The remote GridNioAcceptWorker thread accepts new connection.</li>
+     *         <li>The remote node sends back the {@link NodeIdMessage}.</li>
+     *         <li>The local node reads NodeIdMessage from created channel.</li>
+     *         <li>The local node sends the {@link HandshakeMessage2} to remote.</li>
+     *         <li>The remote node processes {@link HandshakeMessage2} in {@link GridNioServerListener#onMessage(GridNioSession, Object)}.</li>
+     *         <li>The remote node sends back the {@link RecoveryLastReceivedMessage}.</li>
+     *     </ol>
+     *
+     *     The handshaking process ends.
+     * </p>
+     * <p>
+     *     <em>Note.</em> The {@link HandshakeTimeoutObject} is created to control execution timeout during the
+     *     whole handshaking process.
+     * </p>
+     *
+     * @param node Remote node identifier to connect with.
+     * @param connIdx Connection index based on configured {@link ConnectionPolicy}.
+     * @return A {@link GridNioSession} connection representation.
+     * @throws IgniteCheckedException If establish connection fails.
+     */
+    private GridNioSession createNioSession(ClusterNode node, int connIdx) throws IgniteCheckedException {
         Collection<InetSocketAddress> addrs = nodeAddresses(node);
 
-        GridCommunicationClient client = null;
+        GridNioSession session = null;
         IgniteCheckedException errs = null;
 
         long totalTimeout;
@@ -3299,7 +3337,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 maxConnTimeout
             );
 
-            while (client == null) { // Reconnection on handshake timeout.
+            while (session == null) { // Reconnection on handshake timeout.
                 if (stopping)
                     throw new IgniteSpiException("Node is stopping.");
 
@@ -3332,6 +3370,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
                     GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
 
+                    assert recoveryDesc != null :
+                        "Recovery descriptor not found [connKey=" + connKey + ", rmtNode=" + node.id() + ']';
+
                     if (!recoveryDesc.reserve()) {
                         U.closeQuiet(ch);
 
@@ -3370,14 +3411,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             sslMeta.sslEngine(sslEngine);
                         }
 
-                        Integer handshakeConnIdx = connIdx;
+                        ClusterNode locNode = getLocalNode();
+
+                        if (locNode == null)
+                            throw new IgniteCheckedException("Local node has not been started or " +
+                                "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
 
                         rcvCnt = safeTcpHandshake(ch,
-                            recoveryDesc,
                             node.id(),
                             connTimeoutStgy.nextTimeout(currTimeout),
                             sslMeta,
-                            handshakeConnIdx);
+                            new HandshakeMessage2(locNode.id(),
+                                recoveryDesc.incrementConnectCount(),
+                                recoveryDesc.received(),
+                                connIdx));
 
                         if (rcvCnt == ALREADY_CONNECTED)
                             return null;
@@ -3416,20 +3463,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             throw new IgniteCheckedException("Unsupported negative receivedCount [rcvCnt=" + rcvCnt +
                                 ", senderNode=" + node + ']');
 
-                        meta.put(CONN_IDX_META, connKey);
-
-                        if (recoveryDesc != null) {
-                            recoveryDesc.onHandshake(rcvCnt);
-
-                            meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc);
-                        }
+                        recoveryDesc.onHandshake(rcvCnt);
 
-                        GridNioSession ses = nioSrvr.createSession(ch, meta, false, null).get();
+                        meta.put(CONN_IDX_META, connKey);
+                        meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc);
 
-                        client = new GridTcpNioCommunicationClient(connIdx, ses, log);
+                        session = nioSrvr.createSession(ch, meta, false, null).get();
                     }
                     finally {
-                        if (client == null) {
+                        if (session == null) {
                             U.closeQuiet(ch);
 
                             if (recoveryDesc != null)
@@ -3438,10 +3480,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     }
                 }
                 catch (IgniteSpiOperationTimeoutException e) { // Handshake is timed out.
-                    if (client != null) {
-                        client.forceClose();
+                    if (session != null) {
+                        session.close();
 
-                        client = null;
+                        session = null;
                     }
 
                     onException("Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy +
@@ -3477,10 +3519,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 }
                 catch (Exception e) {
                     // Most probably IO error on socket connect or handshake.
-                    if (client != null) {
-                        client.forceClose();
+                    if (session != null) {
+                        session.close();
 
-                        client = null;
+                        session = null;
                     }
 
                     onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);
@@ -3527,14 +3569,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 }
             }
 
-            if (client != null)
+            if (session != null)
                 break;
         }
 
-        if (client == null)
-            processClientCreationError(node, addrs, errs == null ? new IgniteCheckedException("No clients found") : errs);
+        if (session == null)
+            processSessionCreationError(node, addrs, errs == null ? new IgniteCheckedException("No session found") : errs);
 
-        return client;
+        return session;
     }
 
     /**
@@ -3552,14 +3594,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     }
 
     /**
-     * Process errors if TCP client to remote node hasn't been created.
+     * Process errors if TCP/IP {@link GridNioSession} creation to remote node hasn't been performed.
      *
      * @param node Remote node.
      * @param addrs Remote node addresses.
      * @param errs TCP client creation errors.
      * @throws IgniteCheckedException If failed.
      */
-    protected void processClientCreationError(
+    protected void processSessionCreationError(
         ClusterNode node,
         Collection<InetSocketAddress> addrs,
         IgniteCheckedException errs
@@ -3654,34 +3696,33 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * Performs handshake in timeout-safe way.
      *
      * @param ch Socket channel.
-     * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
      * @param rmtNodeId Remote node.
      * @param timeout Timeout for handshake.
      * @param sslMeta Session meta.
-     * @param handshakeConnIdx Non null connection index if need send it in handshake.
+     * @param msg {@link HandshakeMessage} or {@link HandshakeMessage2} to send.
      * @return Handshake response.
      * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
     private long safeTcpHandshake(
         SocketChannel ch,
-        @Nullable GridNioRecoveryDescriptor recovery,
         UUID rmtNodeId,
         long timeout,
         GridSslMeta sslMeta,
-        @Nullable Integer handshakeConnIdx
+        HandshakeMessage msg
     ) throws IgniteCheckedException {
         HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(ch, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 
-        long rcvCnt = 0;
+        long rcvCnt;
 
         try {
             BlockingSslHandler sslHnd = null;
 
             ByteBuffer buf;
 
+            // Step 1. Get remote node response with the remote nodeId value.
             if (isSslEnabled()) {
                 assert sslMeta != null;
 
@@ -3757,134 +3798,98 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
             else
                 U.writeFully(ch, ByteBuffer.wrap(U.IGNITE_HEADER));
 
-            ClusterNode locNode = getLocalNode();
-
-            if (locNode == null)
-                throw new IgniteCheckedException("Local node has not been started or " +
-                    "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
-
-            if (recovery != null) {
-                HandshakeMessage msg;
-
-                int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
-
-                if (handshakeConnIdx != null) {
-                    msg = new HandshakeMessage2(locNode.id(),
-                        recovery.incrementConnectCount(),
-                        recovery.received(),
-                        handshakeConnIdx);
-
-                    msgSize += 4;
-                }
-                else {
-                    msg = new HandshakeMessage(locNode.id(),
-                        recovery.incrementConnectCount(),
-                        recovery.received());
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Writing handshake message [locNodeId=" + locNode.id() +
-                        ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+            // Step 2. Prepare Handshake message to send to the remote node.
+            if (log.isDebugEnabled())
+                log.debug("Writing handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
 
-                buf = ByteBuffer.allocate(msgSize);
+            buf = ByteBuffer.allocate(msg.getMessageSize());
 
-                buf.order(ByteOrder.nativeOrder());
+            buf.order(ByteOrder.nativeOrder());
 
-                boolean written = msg.writeTo(buf, null);
+            boolean written = msg.writeTo(buf, null);
 
-                assert written;
+            assert written;
 
-                buf.flip();
+            buf.flip();
 
-                if (isSslEnabled()) {
-                    assert sslHnd != null;
+            if (isSslEnabled()) {
+                assert sslHnd != null;
 
-                    U.writeFully(ch, sslHnd.encrypt(buf));
-                }
-                else
-                    U.writeFully(ch, buf);
+                U.writeFully(ch, sslHnd.encrypt(buf));
             }
-            else {
-                if (isSslEnabled()) {
-                    assert sslHnd != null;
+            else
+                U.writeFully(ch, buf);
 
-                    U.writeFully(ch, sslHnd.encrypt(ByteBuffer.wrap(NodeIdMessage.nodeIdBytesWithType(safeLocalNodeId()))));
-                }
-                else
-                    U.writeFully(ch, ByteBuffer.wrap(NodeIdMessage.nodeIdBytesWithType(safeLocalNodeId())));
-            }
+            if (log.isDebugEnabled())
+                log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
 
-            if (recovery != null) {
-                if (log.isDebugEnabled())
-                    log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+            // Step 3. Waiting for response from the remote node with their receive count message.
+            if (isSslEnabled()) {
+                assert sslHnd != null;
 
-                if (isSslEnabled()) {
-                    assert sslHnd != null;
+                buf = ByteBuffer.allocate(1000);
+                buf.order(ByteOrder.nativeOrder());
 
-                    buf = ByteBuffer.allocate(1000);
-                    buf.order(ByteOrder.nativeOrder());
+                ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
+                decode.order(ByteOrder.nativeOrder());
 
-                    ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
-                    decode.order(ByteOrder.nativeOrder());
+                for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+                    int read = ch.read(buf);
 
-                    for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
-                        int read = ch.read(buf);
+                    if (read == -1)
+                        throw new HandshakeException("Failed to read remote node recovery handshake " +
+                            "(connection closed).");
 
-                        if (read == -1)
-                            throw new HandshakeException("Failed to read remote node recovery handshake " +
-                                "(connection closed).");
+                    buf.flip();
 
-                        buf.flip();
+                    ByteBuffer decode0 = sslHnd.decode(buf);
 
-                        ByteBuffer decode0 = sslHnd.decode(buf);
+                    i += decode0.remaining();
 
-                        i += decode0.remaining();
+                    decode = appendAndResizeIfNeeded(decode, decode0);
 
-                        decode = appendAndResizeIfNeeded(decode, decode0);
+                    buf.clear();
+                }
 
-                        buf.clear();
-                    }
+                decode.flip();
 
-                    decode.flip();
+                rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
 
-                    rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
+                if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
+                    decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
 
-                    if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
-                        decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+                    sslMeta.decodedBuffer(decode);
+                }
 
-                        sslMeta.decodedBuffer(decode);
-                    }
+                ByteBuffer inBuf = sslHnd.inputBuffer();
 
-                    ByteBuffer inBuf = sslHnd.inputBuffer();
+                if (inBuf.position() > 0)
+                    sslMeta.encodedBuffer(inBuf);
+            }
+            else {
+                buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
 
-                    if (inBuf.position() > 0)
-                        sslMeta.encodedBuffer(inBuf);
-                }
-                else {
-                    buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+                buf.order(ByteOrder.nativeOrder());
 
-                    buf.order(ByteOrder.nativeOrder());
+                for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+                    int read = ch.read(buf);
 
-                    for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
-                        int read = ch.read(buf);
+                    if (read == -1)
+                        throw new HandshakeException("Failed to read remote node recovery handshake " +
+                            "(connection closed).");
 
-                        if (read == -1)
-                            throw new HandshakeException("Failed to read remote node recovery handshake " +
-                                "(connection closed).");
+                    i += read;
+                }
 
-                        i += read;
-                    }
+                rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
+            }
 
-                    rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
-                }
+            if (log.isDebugEnabled())
+                log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
 
+            if (rcvCnt == -1) {
                 if (log.isDebugEnabled())
-                    log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
-
-                if (rcvCnt == -1) {
-                    if (log.isDebugEnabled())
-                        log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
-                }
+                    log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
             }
         }
         catch (IOException e) {
@@ -4111,19 +4116,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         return S.toString(TcpCommunicationSpi.class, this);
     }
 
-    /** Internal exception class for proper timeout handling. */
-    private static class HandshakeException extends IgniteCheckedException {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * @param msg Error message.
-         */
-        HandshakeException(String msg) {
-            super(msg);
-        }
-    }
-
     /**
      * This worker takes responsibility to shut the server down when stopping,
      * No other thread shall stop passed server.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/HandshakeException.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/HandshakeException.java
new file mode 100644 (file)
index 0000000..f5ffce8
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Internal exception class for proper timeout handling.
+ */
+public class HandshakeException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param msg Error message.
+     */
+    public HandshakeException(String msg) {
+        super(msg);
+    }
+}
index f845b0b..0b76b74 100644 (file)
@@ -99,6 +99,13 @@ public class HandshakeMessage implements Message {
         return nodeId;
     }
 
+    /**
+     * @return Message size in bytes.
+     */
+    public int getMessageSize() {
+        return MESSAGE_FULL_SIZE;
+    }
+
     /** {@inheritDoc} */
     @Override public void onAckReceived() {
         // No-op.
index 2207813..026d9a4 100644 (file)
@@ -32,6 +32,9 @@ public class HandshakeMessage2 extends HandshakeMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Message size in bytes including {@link HandshakeMessage} fields. */
+    public static final int HANDSHAKE2_MESSAGE_SIZE = MESSAGE_FULL_SIZE + 4;
+
     /** */
     private int connIdx;
 
@@ -65,6 +68,11 @@ public class HandshakeMessage2 extends HandshakeMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public int getMessageSize() {
+        return HANDSHAKE2_MESSAGE_SIZE;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         if (!super.writeTo(buf, writer))
             return false;
index ccacbc1..38147e1 100644 (file)
@@ -616,7 +616,7 @@ class ZookeeperDiscoverySpiTestBase extends GridCommonAbstractTest {
             int connIdx
         ) throws IgniteCheckedException {
             if (failure && !matrix.hasConnection(getLocalNode(), node)) {
-                processClientCreationError(node, null, new IgniteCheckedException("Test", new SocketTimeoutException()));
+                processSessionCreationError(node, null, new IgniteCheckedException("Test", new SocketTimeoutException()));
 
                 return null;
             }