QPIDJMS-429 Make use of newer proton-j APIs for send and decode
authorTimothy Bish <tabish121@gmail.com>
Mon, 12 Nov 2018 16:31:34 +0000 (11:31 -0500)
committerTimothy Bish <tabish121@gmail.com>
Mon, 12 Nov 2018 16:31:39 +0000 (11:31 -0500)
Use newer APIs to clean up some send code that handles dispositions and
use new section type enums to simplify the message decoding portion of
the codec.  Also implement the writeable buffer ensure remaining method
to better handle writes where the buffer needs to grow.

qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java

index 4b2e5b1..c3010d2 100644 (file)
@@ -34,13 +34,10 @@ import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.amqp.message.AmqpReadableBuffer;
 import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Outcome;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
@@ -211,54 +208,58 @@ public class AmqpFixedProducer extends AmqpProducer {
     public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
         DeliveryState state = delivery.getRemoteState();
         if (state != null) {
-
             InFlightSend send = (InFlightSend) delivery.getContext();
 
-            if (state instanceof Accepted) {
+            if (state.getType() == DeliveryStateType.Accepted) {
                 LOG.trace("Outcome of delivery was accepted: {}", delivery);
                 send.onSuccess();
-                super.processDeliveryUpdates(provider, delivery);
-                return;
+            } else {
+                applyDeliveryStateUpdate(send, delivery, state);
             }
+        }
 
-            Exception deliveryError = null;
-            Outcome outcome = null;
+        super.processDeliveryUpdates(provider, delivery);
+    }
 
-            if (state instanceof TransactionalState) {
-                LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);
-                outcome = ((TransactionalState) state).getOutcome();
-            } else if (state instanceof Outcome) {
-                outcome = (Outcome) state;
-            } else {
-                LOG.warn("Message send updated with unsupported state: {}", state);
-                outcome = null;
-            }
+    private void applyDeliveryStateUpdate(InFlightSend send, Delivery delivery, DeliveryState state) {
+        Exception deliveryError = null;
+        if (state == null) {
+            return;
+        }
 
-            if (outcome instanceof Accepted) {
+        switch (state.getType()) {
+            case Transactional:
+                LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);
+                applyDeliveryStateUpdate(send, delivery, (DeliveryState) ((TransactionalState) state).getOutcome());
+                break;
+            case Accepted:
                 LOG.trace("Outcome of delivery was accepted: {}", delivery);
                 send.onSuccess();
-            } else if (outcome instanceof Rejected) {
+                break;
+            case Rejected:
                 LOG.trace("Outcome of delivery was rejected: {}", delivery);
-                ErrorCondition remoteError = ((Rejected) outcome).getError();
+                ErrorCondition remoteError = ((Rejected) state).getError();
                 if (remoteError == null) {
                     remoteError = getEndpoint().getRemoteCondition();
                 }
 
                 deliveryError = AmqpSupport.convertToException(getParent().getProvider(), getEndpoint(), remoteError);
-            } else if (outcome instanceof Released) {
+                break;
+            case Released:
                 LOG.trace("Outcome of delivery was released: {}", delivery);
                 deliveryError = new JMSException("Delivery failed: released by receiver");
-            } else if (outcome instanceof Modified) {
+                break;
+            case Modified:
                 LOG.trace("Outcome of delivery was modified: {}", delivery);
                 deliveryError = new JMSException("Delivery failed: failure at remote");
-            }
-
-            if (deliveryError != null) {
-                send.onFailure(deliveryError);
-            }
+                break;
+            default:
+                LOG.warn("Message send updated with unsupported state: {}", state);
         }
 
-        super.processDeliveryUpdates(provider, delivery);
+        if (deliveryError != null) {
+            send.onFailure(deliveryError);
+        }
     }
 
     public AmqpSession getSession() {
index fc68747..7a75d25 100644 (file)
@@ -295,71 +295,36 @@ public final class AmqpCodec {
         Footer footer = null;
         Section section = null;
 
-        if (messageBytes.hasRemaining()) {
+        while (messageBytes.hasRemaining()) {
             section = (Section) decoder.readObject();
-        }
-
-        if (section instanceof Header) {
-            header = (Header) section;
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
-            }
-
-        }
-        if (section instanceof DeliveryAnnotations) {
-            deliveryAnnotations = (DeliveryAnnotations) section;
-
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
-            }
-
-        }
-        if (section instanceof MessageAnnotations) {
-            messageAnnotations = (MessageAnnotations) section;
-
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
-            }
-
-        }
-        if (section instanceof Properties) {
-            properties = (Properties) section;
-
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
-            }
 
-        }
-        if (section instanceof ApplicationProperties) {
-            applicationProperties = (ApplicationProperties) section;
-
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
-            }
-
-        }
-        if (section != null && !(section instanceof Footer)) {
-            body = section;
-
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
+            switch (section.getType()) {
+                case Header:
+                    header = (Header) section;
+                    break;
+                case DeliveryAnnotations:
+                    deliveryAnnotations = (DeliveryAnnotations) section;
+                    break;
+                case MessageAnnotations:
+                    messageAnnotations = (MessageAnnotations) section;
+                    break;
+                case Properties:
+                    properties = (Properties) section;
+                    break;
+                case ApplicationProperties:
+                    applicationProperties = (ApplicationProperties) section;
+                    break;
+                case Data:
+                case AmqpSequence:
+                case AmqpValue:
+                    body = section;
+                    break;
+                case Footer:
+                    footer = (Footer) section;
+                    break;
+                default:
+                    throw new IOException("Unknown Message Section forced decode abort.");
             }
-
-        }
-        if (section instanceof Footer) {
-            footer = (Footer) section;
         }
 
         decoder.setByteBuffer(null);
index 2f52bff..1934532 100644 (file)
@@ -106,6 +106,11 @@ public class AmqpWritableBuffer implements WritableBuffer {
     }
 
     @Override
+    public void ensureRemaining(int remianing) {
+        nettyBuffer.ensureWritable(remianing);
+    }
+
+    @Override
     public int position() {
         return nettyBuffer.writerIndex();
     }