IGNITE-8766 More informative thread naming for TcpDiscoverySpi - Fixes #4317.
authorSergey Chugunov <sergey.chugunov@gmail.com>
Wed, 16 Jan 2019 13:20:56 +0000 (16:20 +0300)
committerAlexey Goncharuk <alexey.goncharuk@gmail.com>
Wed, 16 Jan 2019 13:20:56 +0000 (16:20 +0300)
Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

index 13aeafa..d9b74fd 100755 (executable)
@@ -10502,6 +10502,48 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Puts additional text to thread name.
+     * Calls {@code enhanceThreadName(Thread.currentThread(), text)}.
+     * For details see {@link #enhanceThreadName(Thread, String)}.
+     *
+     * @param text Text to be set in thread name in square [] braces. Does nothing if cannot find suitable braces.
+     */
+    public static void enhanceThreadName(String text) {
+        enhanceThreadName(Thread.currentThread(), text);
+    }
+
+    /**
+     * Puts additional text to thread name. It finds first square braces in thread name and
+     * sets required text in them. For example, thread has name "my-thread-[]-%gridname%". After
+     * calling {@code enhanceThreadName(thread, "myText")}, the name of the thread will
+     * be "my-thread-[myText]-%gridname%".<br>
+     * This allows to set additional mutable info to thread, like remote host IP address or so.
+     *
+     * @param thread Thread to be renamed, it must contain square braces in name []. Does nothing if {@code null}.
+     * @param text Text to be set in thread name in square [] braces. Does nothing if cannot find suitable braces.
+     */
+    public static void enhanceThreadName(@Nullable Thread thread, String text) {
+        if (thread == null)
+            return;
+
+        String threadName = thread.getName();
+
+        int idxStart = threadName.indexOf('[');
+        int idxEnd = threadName.indexOf(']');
+
+        if (idxStart < 0 || idxEnd < 0 || idxStart >= idxEnd)
+            return;
+
+        StringBuilder sb = new StringBuilder(threadName.length());
+
+        sb.append(threadName, 0, idxStart + 1);
+        sb.append(text);
+        sb.append(threadName, idxEnd, threadName.length());
+
+        thread.setName(sb.toString());
+    }
+
+    /**
      * @param ctx Context.
      *
      * @return instance of current baseline topology if it exists
index 56b1a06..c2c5e87 100644 (file)
@@ -1037,7 +1037,7 @@ class ClientImpl extends TcpDiscoveryImpl {
         /**
          */
         SocketReader() {
-            super(spi.ignite().name(), "tcp-client-disco-sock-reader", log);
+            super(spi.ignite().name(), "tcp-client-disco-sock-reader-[]", log);
         }
 
         /**
@@ -1085,6 +1085,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                 SocketStream sockStream;
                 UUID rmtNodeId;
 
+                // Disconnected from router node.
+                U.enhanceThreadName("");
+
                 synchronized (mux) {
                     if (stopReadLatch != null) {
                         stopReadLatch.countDown();
@@ -1104,6 +1107,10 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 Socket sock = sockStream.socket();
 
+                U.enhanceThreadName(U.id8(rmtNodeId)
+                    + ' ' + sockStream.sock.getInetAddress().getHostAddress()
+                    + ":" + sockStream.sock.getPort());
+
                 try {
                     InputStream in = sockStream.stream();
 
index 0edb497..879c8b8 100644 (file)
@@ -209,6 +209,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private RingMessageWorker msgWorker;
 
+    /** Thread executing message worker */
+    private Thread msgWorkerThread;
+
     /** Client message workers. */
     protected ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap<>();
 
@@ -359,7 +362,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         msgWorker = new RingMessageWorker(log);
 
-        new MessageWorkerDiscoveryThread(msgWorker, log).start();
+        msgWorkerThread = new MessageWorkerDiscoveryThread(msgWorker, log);
+        msgWorkerThread.start();
 
         if (tcpSrvr == null)
             tcpSrvr = new TcpServer(log);
@@ -393,6 +397,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         joinTopology();
 
+        if (locNode.order() == 1)
+            U.enhanceThreadName(msgWorkerThread, "crd");
+
         spi.stats.onJoinFinished();
 
         if (spi.ipFinder.isShared()) {
@@ -2668,7 +2675,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param log Logger.
          */
         private RingMessageWorker(IgniteLogger log) {
-            super("tcp-disco-msg-worker", log, 10, getWorkerRegistry(spi));
+            super("tcp-disco-msg-worker-[]", log, 10, getWorkerRegistry(spi));
 
             initConnectionCheckThreshold();
 
@@ -2868,6 +2875,15 @@ class ServerImpl extends TcpDiscoveryImpl {
                 failureThresholdReached = false;
             }
 
+            if (next != null && sock != null) {
+                // Messages that change topology.
+                if (msg instanceof TcpDiscoveryNodeLeftMessage || msg instanceof TcpDiscoveryNodeFailedMessage ||
+                    msg instanceof TcpDiscoveryNodeAddFinishedMessage || msg instanceof TcpDiscoveryNodeAddedMessage) {
+                    U.enhanceThreadName(U.id8(next.id()) + ' ' + sock.getInetAddress().getHostAddress()
+                        + ":" + sock.getPort() + (isLocalNodeCoordinator() ? " crd" : ""));
+                }
+            }
+
             spi.stats.onMessageProcessingFinished(msg);
         }
 
@@ -5834,7 +5850,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @throws IgniteSpiException In case of error.
          */
         TcpServer(IgniteLogger log) throws IgniteSpiException {
-            super(spi.ignite().name(), "tcp-disco-srvr", log, getWorkerRegistry(spi));
+            super(spi.ignite().name(), "tcp-disco-srvr-[]", log, getWorkerRegistry(spi));
 
             int lastPort = spi.locPortRange == 0 ? spi.locPort : spi.locPort + spi.locPortRange - 1;
 
@@ -5881,6 +5897,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             Throwable err = null;
 
             try {
+                U.enhanceThreadName(":" + port);
+
                 while (!isCancelled()) {
                     Socket sock;
 
@@ -5975,7 +5993,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param sock Socket to read data from.
          */
         SocketReader(Socket sock) {
-            super(spi.ignite().name(), "tcp-disco-sock-reader", log);
+            super(spi.ignite().name(), "tcp-disco-sock-reader-[]", log);
 
             this.sock = sock;
 
@@ -6106,6 +6124,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     this.nodeId = nodeId;
 
+                    U.enhanceThreadName(U.id8(nodeId) + ' ' + sock.getInetAddress().getHostAddress()
+                        + ":" + sock.getPort() + (req.client() ? " client" : ""));
+
                     TcpDiscoveryHandshakeResponse res =
                         new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
 
@@ -6584,6 +6605,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (log.isInfoEnabled())
                     log.info("Finished serving remote node connection [rmtAddr=" + rmtAddr +
                         ", rmtPort=" + sock.getPort());
+
+                if (isLocalNodeCoordinator() && !ring.hasRemoteServerNodes())
+                    U.enhanceThreadName(msgWorkerThread, "crd");
             }
         }
 
@@ -6873,7 +6897,9 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param log Logger.
          */
         private ClientMessageWorker(Socket sock, UUID clientNodeId, IgniteLogger log) {
-            super("tcp-disco-client-message-worker", log, Math.max(spi.metricsUpdateFreq, 10), null);
+            super("tcp-disco-client-message-worker-[" + U.id8(clientNodeId)
+                + ' ' + sock.getInetAddress().getHostAddress()
+                + ":" + sock.getPort() + ']', log, Math.max(spi.metricsUpdateFreq, 10), null);
 
             this.sock = sock;
             this.clientNodeId = clientNodeId;