QPID-8185: [JMS AMQP 0-x][AMQP 0-8..0-91] Make sure that client closes TCP connection...
authorAlex Rudyy <orudyy@apache.org>
Tue, 8 May 2018 15:34:27 +0000 (16:34 +0100)
committerAlex Rudyy <orudyy@apache.org>
Thu, 10 May 2018 16:11:06 +0000 (17:11 +0100)
client/src/main/java/org/apache/qpid/client/AMQConnection.java
client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java

index c629414..4426fb6 100644 (file)
@@ -1203,7 +1203,7 @@ public class AMQConnection extends Closeable implements CommonConnection, Refere
                 }
                 catch (JMSException e)
                 {
-                    _logger.error("Error closing connection", e);
+                    _logger.warn("Error closing connection", e);
                     throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing connection: " + e), e);
                 }
                 finally
@@ -1271,7 +1271,7 @@ public class AMQConnection extends Closeable implements CommonConnection, Refere
                 }
                 catch (JMSException e)
                 {
-                    _logger.error("Error closing session: " + e);
+                    _logger.warn("Error closing session: " + e);
                     sessionException = e;
                 }
             }
index 5d59e50..6b50f90 100644 (file)
@@ -672,10 +672,18 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
     /** More convenient method to write a frame and wait for it's response. */
     public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws QpidException, FailoverException
     {
-        return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
+        return writeCommandFrameAndWaitForReply(frame,
+                                                new SpecificMethodFrameListener(frame.getChannel(),
+                                                                                responseClass,
+                                                                                getConnectionDetails()),
                                                 timeout);
     }
 
+    public String getConnectionDetails()
+    {
+        return getLocalAddress() + "-" + getRemoteAddress();
+    }
+
     public void closeSession(AMQSession session) throws QpidException
     {
         _protocolSession.closeSession(session);
@@ -707,17 +715,19 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
                 final AMQFrame frame = body.generateFrame(0);
 
                 syncWrite(frame, ConnectionCloseOkBody.class, timeout);
-                _network.close();
-                closed();
-            }
+             }
             catch (AMQTimeoutException e)
             {
-                closed();
+                _logger.debug("Timeout on sending connection close : " + e);
             }
             catch (FailoverException e)
             {
                 _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
             }
+            finally
+            {
+                _network.close();
+            }
         }
     }
 
index 1acb3a1..6c7738a 100644 (file)
@@ -929,7 +929,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
 
         public QueueDeclareOkHandler()
         {
-            super(getChannelId(), QueueDeclareOkBody.class);
+            super(getChannelId(), QueueDeclareOkBody.class, getProtocolHandler().getConnectionDetails());
         }
 
         public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
index 41166e0..8412849 100644 (file)
@@ -362,17 +362,19 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
                               && (connectionDelegate80.isConfirmedPublishSupported()
                                || (!getSession().isTransacted() && connectionDelegate80.isConfirmedPublishNonTransactionalSupported()));
 
+        AMQProtocolHandler protocolHandler = getConnection().getProtocolHandler();
         if(!useConfirms)
         {
-            getConnection().getProtocolHandler().writeFrame(compositeFrame);
+            protocolHandler.writeFrame(compositeFrame);
         }
         else
         {
-            final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId());
+            final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId(),
+                                                                                                  protocolHandler.getConnectionDetails());
             try
             {
 
-                getConnection().getProtocolHandler().writeCommandFrameAndWaitForReply(compositeFrame,
+                protocolHandler.writeCommandFrameAndWaitForReply(compositeFrame,
                                                                                       frameListener);
 
                 if(frameListener.isRejected())
@@ -468,9 +470,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
          *
          * @param channelId The channel id to filter incoming methods with.
          */
-        public PublishConfirmMessageListener(final int channelId)
+        public PublishConfirmMessageListener(final int channelId, String connectionDetails)
         {
-            super(channelId);
+            super(channelId, connectionDetails);
         }
 
         @Override
index 6618b34..5c56ad7 100644 (file)
@@ -55,6 +55,7 @@ import org.apache.qpid.protocol.AMQMethodListener;
 public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener
 {
 
+    private final String _connectionDetails;
     /** Holds the channel id for the channel upon which this listener is waiting for a response. */
     private int _channelId;
 
@@ -62,10 +63,12 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth
      * Creates a new method listener, that filters incoming method to just those that match the specified channel id.
      *
      * @param channelId The channel id to filter incoming methods with.
+     * @param connectionDetails
      */
-    public BlockingMethodFrameListener(int channelId)
+    public BlockingMethodFrameListener(int channelId, final String connectionDetails)
     {
         _channelId = channelId;
+        _connectionDetails = connectionDetails;
     }
 
     /**
@@ -121,4 +124,9 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth
         }
     }
 
+    @Override
+    public String getConnectionDetails()
+    {
+        return _connectionDetails;
+    }
 }
index 5f0a935..ce7c03a 100644 (file)
@@ -82,6 +82,12 @@ public class StateWaiter extends BlockingWaiter<AMQState>
         return _awaitStates.contains(state);
     }
 
+    @Override
+    public String getConnectionDetails()
+    {
+        return null;
+    }
+
     /**
      * Await for the required State to be achieved within the default timeout.
      * @return The achieved state that was requested.
index f0d7feb..9a3f733 100644 (file)
@@ -28,9 +28,9 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener
 {
     private final Class _expectedClass;
 
-    public SpecificMethodFrameListener(int channelId, Class expectedClass)
+    public SpecificMethodFrameListener(int channelId, Class expectedClass, final String connectionDetails)
     {
-        super(channelId);
+        super(channelId, connectionDetails);
         _expectedClass = expectedClass;
     }
 
index 66be535..23adf3c 100644 (file)
@@ -170,8 +170,8 @@ public abstract class BlockingWaiter<T>
                             final String errorMsg = String.format(
                                     "The server's response was not received within the time-out period of %d ms. "
                                     + "Possible reasons include: the server may be too busy, the network may be "
-                                    + "overloaded, or this JVM itself may be too busy to process the response.",
-                                    timeout);
+                                    + "overloaded, or this JVM itself may be too busy to process the response. [%s]",
+                                    timeout, getConnectionDetails() == null ? "" : getConnectionDetails());
                             _error = new AMQTimeoutException(errorMsg, null);
                             _ready = true;
                         }
@@ -338,4 +338,6 @@ public abstract class BlockingWaiter<T>
         return new QpidException("Waiter was closed.", null);
     }
 
+    public abstract String getConnectionDetails();
+
 }
index 74a1809..69d180e 100644 (file)
@@ -286,7 +286,7 @@ public class AMQProtocolHandlerTest extends QpidTestCase
          */
         public BlockToAccessFrameListener(int channelId)
         {
-            super(channelId);
+            super(channelId, "Test");
             _logger.info("Creating a listener:" + this);
         }