QPID-8185: [JMS AMQP 0-x][AMQP 0-8..0-91] Stop handling incoming frames on session...
authorAlex Rudyy <orudyy@apache.org>
Thu, 10 May 2018 16:02:05 +0000 (17:02 +0100)
committerAlex Rudyy <orudyy@apache.org>
Thu, 10 May 2018 16:10:57 +0000 (17:10 +0100)
client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java

index 51eb4cf..9b5c6a8 100644 (file)
@@ -42,6 +42,8 @@ import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
@@ -229,6 +231,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
     @Override
     public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws QpidException
     {
+        if (isClosedForInput(channelId))
+        {
+            _logger.debug("Ignoring content header as channel {} closed", channelId);
+            return;
+        }
+
         final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
                                                : _channelId2UnprocessedMsgMap.get(channelId));
 
@@ -252,6 +260,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
     @Override
     public void contentBodyReceived(final int channelId, ContentBody contentBody) throws QpidException
     {
+        if (isClosedForInput(channelId))
+        {
+            _logger.debug("Ignoring content body as channel {} closed", channelId);
+            return;
+        }
         UnprocessedMessage_0_8 msg;
         final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
         if (fastAccess)
@@ -469,7 +482,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
     @Override
     public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws QpidException
     {
-        _protocolHandler.methodBodyReceived(channel, amqMethodBody);
+        if ( channel == 0
+             || !isClosedForInput(channel)
+             || (isClosing(channel) && (amqMethodBody instanceof ChannelCloseBody || amqMethodBody instanceof ChannelCloseOkBody)))
+        {
+            _protocolHandler.methodBodyReceived(channel, amqMethodBody);
+        }
+        else
+        {
+            _logger.debug("Ignoring method {} as channel {} closed on {}", amqMethodBody, channel);
+        }
     }
 
     public void notifyError(Exception error)
@@ -514,4 +536,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
     {
         return _methodProcessor;
     }
+
+    private boolean isClosing(final int channelId)
+    {
+        return _closingChannels.containsKey(channelId);
+    }
+
+    private boolean isClosedForInput(final int channelId)
+    {
+        AMQSession session;
+        return channelId > 0 && ((session = _connection.getSession(channelId)) == null || session.isClosed());
+    }
+
 }