QPIDJMS-431 Refactor the FailoverProvider to improve performance
authorTimothy Bish <tabish121@gmail.com>
Wed, 14 Nov 2018 18:03:40 +0000 (13:03 -0500)
committerTimothy Bish <tabish121@gmail.com>
Wed, 14 Nov 2018 18:03:40 +0000 (13:03 -0500)
Refactor how the failover provider handles the workload passing though it
and how it deals with connection failures to make its presence as transparent
as possible to overall client performance.

qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java

index 320beaf..dc35ba5 100644 (file)
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.jms.JMSException;
 import javax.jms.JMSSecurityException;
@@ -75,6 +76,10 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
 
     private static final Logger LOG = LoggerFactory.getLogger(FailoverProvider.class);
 
+    private static enum FailoverServerListAction {
+        ADD, REPLACE, IGNORE
+    }
+
     public static final int UNLIMITED = -1;
 
     private static final int UNDEFINED = -1;
@@ -95,8 +100,8 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     private final FailoverUriPool uris;
     private ScheduledFuture<?> requestTimeoutTask;
 
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     private final ScheduledThreadPoolExecutor serializer;
-    private final ScheduledThreadPoolExecutor connectionHub;
     private final AtomicBoolean closed = new AtomicBoolean();
     private final AtomicBoolean failed = new AtomicBoolean();
     private final AtomicBoolean closingConnection = new AtomicBoolean(false);
@@ -109,7 +114,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     // Current state of connection / reconnection
     private final ReconnectControls reconnectControl = new ReconnectControls();
     private IOException failureCause;
-    private URI connectedURI;
+    private volatile URI connectedURI;
     private volatile JmsConnectionInfo connectionInfo;
 
     // Timeout values configured via JmsConnectionInfo
@@ -129,28 +134,15 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
 
     private FailoverServerListAction amqpOpenServerListAction = FailoverServerListAction.REPLACE;
 
-    public FailoverProvider(Map<String, String> nestedOptions, ProviderFutureFactory futureFactory) {
-        this(null, nestedOptions, futureFactory);
-    }
-
-    public FailoverProvider(List<URI> uris, ProviderFutureFactory futureFactory) {
-        this(uris, null, futureFactory);
-    }
-
     public FailoverProvider(List<URI> uris, Map<String, String> nestedOptions, ProviderFutureFactory futureFactory) {
         this.uris = new FailoverUriPool(uris, nestedOptions);
         this.futureFactory = futureFactory;
 
-        serializer = new ScheduledThreadPoolExecutor(1, new QpidJMSThreadFactory("FailoverProvider: serialization thread", true));
+        // All Connection attempts happen in this executor thread as well as handling
+        // failed connection events and other maintenance work.
+        serializer = new ScheduledThreadPoolExecutor(1, new QpidJMSThreadFactory("FailoverProvider: async work thread", true));
         serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
         serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
-
-        // All Connection attempts happen in this schedulers thread.  Once a connection
-        // is established it will hand the open connection back to the serializer thread
-        // for state recovery.
-        connectionHub = new ScheduledThreadPoolExecutor(1, new QpidJMSThreadFactory("FailoverProvider: connect thread", true));
-        connectionHub.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-        connectionHub.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
     }
 
     @Override
@@ -174,49 +166,49 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     public void close() {
         if (closed.compareAndSet(false, true)) {
             final ProviderFuture request = futureFactory.createFuture();
-            serializer.execute(new Runnable() {
 
-                @Override
-                public void run() {
-                    try {
-                        IOException error = failureCause != null ? failureCause : new IOException("Connection closed");
-                        final List<FailoverRequest> pending;
-                        synchronized (requests) {
-                            pending = new ArrayList<FailoverRequest>(requests.values());
-                        }
-                        for (FailoverRequest request : pending) {
-                            if (!request.isComplete()) {
-                                request.onFailure(error);
-                            }
+            serializer.execute(() -> {
+                // At this point the closed flag is set and any threads running through this
+                // provider will see it as being closed and react accordingly.  Any events
+                // that fire from the active provider that is being closed will read a closed
+                // state and not trigger new events into the executor. Any event that might be
+                // sitting behind this one would also read the closed state and perform no
+                // further work while we do a graceful shutdown of the executor service.
+                lock.readLock().lock();
+                try {
+                    IOException error = failureCause != null ? failureCause : new IOException("Connection closed");
+                    final List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                    for (FailoverRequest pendingRequest : pending) {
+                        if (!pendingRequest.isComplete()) {
+                            pendingRequest.onFailure(error);
                         }
+                    }
 
-                        if (requestTimeoutTask != null) {
-                            requestTimeoutTask.cancel(false);
-                        }
+                    if (requestTimeoutTask != null) {
+                        requestTimeoutTask.cancel(false);
+                    }
 
-                        if (provider != null) {
-                            provider.close();
-                        }
-                    } catch (Exception e) {
-                        LOG.debug("Caught exception while closing connection");
-                    } finally {
-                        ThreadPoolUtils.shutdownGraceful(connectionHub);
-                        if (serializer != null) {
-                            serializer.shutdown();
-                        }
-                        request.onSuccess();
+                    if (provider != null) {
+                        provider.close();
                     }
+                } catch (Exception e) {
+                    LOG.warn("Error caught while closing Provider: ", e.getMessage());
+                } finally {
+                    lock.readLock().unlock();
+                    request.onSuccess();
                 }
             });
 
             try {
-                if (this.closeTimeout < 0) {
+                if (getCloseTimeout() < 0) {
                     request.sync();
                 } else {
-                    request.sync(Math.max(MINIMUM_TIMEOUT, closeTimeout), TimeUnit.MILLISECONDS);
+                    request.sync(getCloseTimeout(), TimeUnit.MILLISECONDS);
                 }
             } catch (IOException e) {
-                LOG.warn("Error caught while closing Provider: ", e.getMessage());
+                LOG.warn("Error caught while closing Provider: {}", e.getMessage() != null ? e.getMessage() : "<Unknown Error>");
+            } finally {
+                ThreadPoolUtils.shutdownGraceful(serializer);
             }
         }
     }
@@ -224,11 +216,11 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     @Override
     public void create(final JmsResource resource, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
         checkClosed();
-        FailoverRequest pending = null;
+        final FailoverRequest pending;
         if (resource instanceof JmsConnectionInfo) {
             pending = new CreateConnectionRequest(request) {
                 @Override
-                public void doTask() throws Exception {
+                public void doTask(Provider provider) throws Exception {
                     JmsConnectionInfo connectionInfo = (JmsConnectionInfo) resource;
 
                     // Collect the timeouts we will handle in this provider.
@@ -247,7 +239,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         } else {
             pending = new FailoverRequest(request, requestTimeout) {
                 @Override
-                public void doTask() throws Exception {
+                public void doTask(Provider provider) throws Exception {
                     provider.create(resource, this);
                 }
 
@@ -271,7 +263,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             };
         }
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -279,7 +271,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request, requestTimeout) {
             @Override
-            public void doTask() throws Exception {
+            public void doTask(Provider provider) throws Exception {
                 provider.start(resource, this);
             }
 
@@ -289,7 +281,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         };
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -297,7 +289,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request, requestTimeout) {
             @Override
-            public void doTask() throws Exception {
+            public void doTask(Provider provider) throws Exception {
                 provider.stop(resource, this);
             }
 
@@ -307,7 +299,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         };
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -315,7 +307,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request, requestTimeout) {
             @Override
-            public void doTask() throws IOException, JMSException, UnsupportedOperationException {
+            public void doTask(Provider provider) throws IOException, JMSException, UnsupportedOperationException {
                 if (resourceId instanceof JmsConnectionInfo) {
                    closingConnection.set(true);
                 }
@@ -334,7 +326,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         };
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -342,7 +334,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request, sendTimeout) {
             @Override
-            public void doTask() throws Exception {
+            public void doTask(Provider provider) throws Exception {
                 provider.send(envelope, this);
             }
 
@@ -357,7 +349,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         };
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -365,13 +357,13 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request, requestTimeout) {
             @Override
-            public void doTask() throws Exception {
+            public void doTask(Provider provider) throws Exception {
                 provider.acknowledge(sessionId, ackType, this);
             }
 
             @Override
             public boolean succeedsWhenOffline() {
-                // Allow this to succeed, acks would be stale.
+                // Allow this to succeed, acknowledgement would be stale after reconnect.
                 return true;
             }
 
@@ -381,7 +373,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         };
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -389,13 +381,13 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request, requestTimeout) {
             @Override
-            public void doTask() throws Exception {
+            public void doTask(Provider provider) throws Exception {
                 provider.acknowledge(envelope, ackType, this);
             }
 
             @Override
             public boolean succeedsWhenOffline() {
-                // Allow this to succeed, acks would be stale.
+                // Allow this to succeed, acknowledgement would be stale after reconnect.
                 return true;
             }
 
@@ -405,7 +397,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         };
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -413,7 +405,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request, requestTimeout) {
             @Override
-            public void doTask() throws Exception {
+            public void doTask(Provider provider) throws Exception {
                 provider.commit(transactionInfo, nextTransactionInfo, this);
             }
 
@@ -435,7 +427,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         };
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -443,7 +435,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request, requestTimeout) {
             @Override
-            public void doTask() throws Exception {
+            public void doTask(Provider provider) throws Exception {
                 provider.rollback(transactionInfo, nextTransactionInfo, this);
             }
 
@@ -458,7 +450,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         };
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -466,7 +458,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request, requestTimeout) {
             @Override
-            public void doTask() throws Exception {
+            public void doTask(Provider provider) throws Exception {
                 provider.recover(sessionId, this);
             }
 
@@ -481,7 +473,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         };
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -489,7 +481,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request, requestTimeout) {
             @Override
-            public void doTask() throws Exception {
+            public void doTask(Provider provider) throws Exception {
                 provider.unsubscribe(subscription, this);
             }
 
@@ -499,7 +491,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         };
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -507,7 +499,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request) {
             @Override
-            public void doTask() throws Exception {
+            public void doTask(Provider provider) throws Exception {
                 provider.pull(consumerId, timeout, this);
             }
 
@@ -517,7 +509,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         };
 
-        serializer.execute(pending);
+        pending.run();
     }
 
     @Override
@@ -528,169 +520,197 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     //--------------- Connection Error and Recovery methods ------------------//
 
     /**
-     * This method is always called from within the FailoverProvider's serialization thread.
-     *
+     * This method is always called from within the FailoverProvider's state lock.
+     * <p>
      * When a failure is encountered either from an outgoing request or from an error fired
      * from the underlying Provider instance this method is called to determine if a reconnect
-     * is allowed and if so a new reconnect cycle is triggered on the connection thread.
+     * is allowed and if so a new reconnect cycle is triggered on the serialization executor.
      *
+     * @param provider
+     *                   the Provider that was in use when the failure occurred.
      * @param cause
      *        the error that triggered the failure of the provider.
      */
-    private void handleProviderFailure(final IOException cause) {
-        if (provider != null) {
-            LOG.debug("handling Provider failure: {}", cause.getMessage());
-            LOG.trace("stack", cause);
+    private void handleProviderFailure(final Provider provider, final IOException cause) {
+        if (closingConnection.get() || closed.get() || failed.get()) {
+            return;
+        }
 
-            provider.setProviderListener(closedListener);
-            URI failedURI = this.provider.getRemoteURI();
-            try {
-                provider.close();
-            } catch (Throwable error) {
-                LOG.trace("Caught exception while closing failed provider: {}", error.getMessage());
+        serializer.execute(() -> {
+            if (closingConnection.get() || closed.get() || failed.get()) {
+                return;
             }
-            provider = null;
 
-            if (reconnectControl.isReconnectAllowed(cause)) {
-                if (cause instanceof ProviderRedirectedException) {
-                    ProviderRedirectedException redirect = (ProviderRedirectedException) cause;
+            lock.readLock().lock();
+            try {
+                // It is possible that another failed request signaled an error for the same provider
+                // and we already cleaned up the old failed provider and scheduled a reconnect that
+                // has already succeeded, so we need to ensure that we don't kill a valid provider.
+                if (provider == FailoverProvider.this.provider) {
+                    LOG.debug("handling Provider failure: {}", cause.getMessage());
+                    LOG.trace("stack", cause);
+                    FailoverProvider.this.provider = null;
+
+                    provider.setProviderListener(closedListener);
+                    URI failedURI = provider.getRemoteURI();
                     try {
-                        uris.addFirst(redirect.getRedirectionURI());
-                    } catch (Exception error) {
-                        LOG.warn("Could not construct redirection URI from remote provided information");
+                        provider.close();
+                    } catch (Throwable error) {
+                        LOG.trace("Caught exception while closing failed provider: {}", error.getMessage());
                     }
-                }
 
-                ProviderListener listener = this.listener;
-                if (listener != null) {
-                    listener.onConnectionInterrupted(failedURI);
-                }
+                    if (reconnectControl.isReconnectAllowed(cause)) {
+                        if (cause instanceof ProviderRedirectedException) {
+                            ProviderRedirectedException redirect = (ProviderRedirectedException) cause;
+                            try {
+                                uris.addFirst(redirect.getRedirectionURI());
+                            } catch (Exception error) {
+                                LOG.warn("Could not construct redirection URI from remote provided information");
+                            }
+                        }
 
-                final List<FailoverRequest> pending;
-                synchronized (requests) {
-                    pending = new ArrayList<FailoverRequest>(requests.values());
-                }
-                for (FailoverRequest request : pending) {
-                    request.whenOffline(cause);
-                }
+                        ProviderListener listener = this.listener;
+                        if (listener != null) {
+                            listener.onConnectionInterrupted(failedURI);
+                        }
 
-                // Start watching for request timeouts while we are offline, unless we already are.
-                if (requestTimeoutTask == null) {
-                    long sweeperInterval = getRequestSweeperInterval();
-                    if (sweeperInterval > 0) {
-                        LOG.trace("Request timeout monitoring enabled: interval = {}ms", sweeperInterval);
-                        requestTimeoutTask = serializer.scheduleWithFixedDelay(
-                            new FailoverRequestSweeper(), sweeperInterval, sweeperInterval, TimeUnit.MILLISECONDS);
-                    }
-                }
+                        final List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                        for (FailoverRequest request : pending) {
+                            request.whenOffline(cause);
+                        }
 
-                triggerReconnectionAttempt();
-            } else {
-                ProviderListener listener = this.listener;
-                if (listener != null) {
-                    listener.onConnectionFailure(cause);
+                        // Start watching for request timeouts while we are offline, unless we already are.
+                        if (requestTimeoutTask == null) {
+                            long sweeperInterval = getRequestSweeperInterval();
+                            if (sweeperInterval > 0) {
+                                LOG.trace("Request timeout monitoring enabled: interval = {}ms", sweeperInterval);
+                                requestTimeoutTask = serializer.scheduleWithFixedDelay(
+                                    new FailoverRequestSweeper(), sweeperInterval, sweeperInterval, TimeUnit.MILLISECONDS);
+                            }
+                        }
+
+                        triggerReconnectionAttempt();
+                    } else {
+                        failed.set(true);
+                        failureCause = cause;
+                        ProviderListener listener = this.listener;
+                        if (listener != null) {
+                            listener.onConnectionFailure(cause);
+                        }
+                    }
+                } else {
+                    LOG.trace("Ignoring duplicate provider failed event for provider: {}", provider);
                 }
+            } finally {
+                lock.readLock().unlock();
             }
-        }
+        });
     }
 
     /**
-     * Called from the reconnection thread.  This method enqueues a new task that
-     * will attempt to recover connection state, once successful, normal operations
-     * will resume.  If an error occurs while attempting to recover the JMS framework
-     * state then a reconnect cycle is again triggered on the connection thread.
+     * Called from the serialization executor after a successful connection.
+     * <p>
+     * This method enqueues a new task that will attempt to recover connection state.
+     * Once successfully recovered, normal operations will resume.  If an error occurs
+     * while attempting to recover the JMS framework state then a reconnect cycle is again
+     * triggered on the connection thread.
      *
      * @param provider
      *        The newly connect Provider instance that will become active.
      */
     private void initializeNewConnection(final Provider provider) {
-        this.serializer.execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    FailoverProvider.this.provider = provider;
-                    provider.setProviderListener(FailoverProvider.this);
-                    connectedURI = provider.getRemoteURI();
-
-                    if (reconnectControl.isRecoveryRequired()) {
-                        LOG.debug("Signalling connection recovery: {}", provider);
-
-                        // Stage 1: Allow listener to recover its resources
-                        try {
-                            listener.onConnectionRecovery(provider);
-                        } finally {
-                            // Stage 2: If the provider knows of others lets add them to the URI pool
-                            //          even if something failed here we can learn of new hosts so we
-                            //          always process the potential Open frame failover URI results.
-                            processAlternates(provider.getAlternateURIs());
-                        }
+        serializer.execute(() -> {
+            // Disallow other processing in the provider while we attempt to establish this
+            // provider as the new one for recovery, any incoming work stops until we finish
+            // and either recover or go back into a failed state.
+            lock.writeLock().lock();
 
-                        // Stage 3: Connection state recovered, get newly configured message factory.
-                        FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
+            try {
+                // In case a close is in play as we are reconnecting we close out the connected
+                // provider instance and return here to allow any pending close operations to
+                // finish now.
+                if (closingConnection.get() || closed.get() || failed.get()) {
+                    try {
+                        provider.close();
+                    } catch(Throwable ignore) {
+                        LOG.trace("Ingoring failure to close failed provider: {}", provider, ignore);
+                    }
 
-                        // Stage 4: Restart consumers, send pull commands, etc.
-                        listener.onConnectionRecovered(provider);
+                    return;
+                }
 
-                        // Stage 5: Let the client know that connection has restored.
-                        listener.onConnectionRestored(provider.getRemoteURI());
+                FailoverProvider.this.provider = provider;
+                provider.setProviderListener(FailoverProvider.this);
+                connectedURI = provider.getRemoteURI();
 
-                        // Last step: Send pending actions.
-                        final List<FailoverRequest> pending;
-                        synchronized (requests) {
-                            pending = new ArrayList<FailoverRequest>(requests.values());
-                        }
-                        for (FailoverRequest request : pending) {
-                            if (!request.isComplete()) {
-                                request.run();
-                            }
-                        }
+                if (reconnectControl.isRecoveryRequired()) {
+                    LOG.debug("Signalling connection recovery: {}", provider);
 
-                        reconnectControl.connectionEstablished();
-                    } else {
+                    // Stage 1: Allow listener to recover its resources
+                    try {
+                        listener.onConnectionRecovery(provider);
+                    } finally {
+                        // Stage 2: If the provider knows of others lets add them to the URI pool
+                        //          even if something failed here we can learn of new hosts so we
+                        //          always process the potential Open frame failover URI results.
                         processAlternates(provider.getAlternateURIs());
+                    }
 
-                        // Last step: Send pending actions.
-                        final List<FailoverRequest> pending;
-                        synchronized (requests) {
-                            pending = new ArrayList<FailoverRequest>(requests.values());
-                        }
-                        for (FailoverRequest request : pending) {
-                            if (!request.isComplete()) {
-                                request.run();
-                            }
+                    // Stage 3: Connection state recovered, get newly configured message factory.
+                    FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
+
+                    // Stage 4: Restart consumers, send pull commands, etc.
+                    listener.onConnectionRecovered(provider);
+
+                    // Stage 5: Let the client know that connection has restored.
+                    listener.onConnectionRestored(provider.getRemoteURI());
+
+                    // Last step: Send pending actions.
+                    final List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                    for (FailoverRequest request : pending) {
+                        if (!request.isComplete()) {
+                            request.run();
                         }
                     }
 
-                    // Cancel timeout processing since we are connected again.  We waited until
-                    // now for the case where we are continually getting bounced from otherwise
-                    // live servers, we want the timeout to remain scheduled in that case so that
-                    // it doesn't keep getting rescheduled and never actually time anything out.
-                    if (requestTimeoutTask != null) {
-                        requestTimeoutTask.cancel(false);
-                        requestTimeoutTask = null;
+                    reconnectControl.connectionEstablished();
+                } else {
+                    processAlternates(provider.getAlternateURIs());
+
+                    // Last step: Send pending actions.
+                    final List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                    for (FailoverRequest request : pending) {
+                        if (!request.isComplete()) {
+                            request.run();
+                        }
                     }
+                }
 
-                } catch (Throwable error) {
-                    LOG.trace("Connection attempt:[{}] to: {} failed", reconnectControl.reconnectAttempts, provider.getRemoteURI());
-                    handleProviderFailure(IOExceptionSupport.create(error));
+                // Cancel timeout processing since we are connected again.  We waited until
+                // now for the case where we are continually getting bounced from otherwise
+                // live servers, we want the timeout to remain scheduled in that case so that
+                // it doesn't keep getting rescheduled and never actually time anything out.
+                if (requestTimeoutTask != null) {
+                    requestTimeoutTask.cancel(false);
+                    requestTimeoutTask = null;
                 }
+            } catch (Throwable error) {
+                LOG.trace("Connection attempt:[{}] to: {} failed", reconnectControl.reconnectAttempts, provider.getRemoteURI());
+                handleProviderFailure(provider, IOExceptionSupport.create(error));
+            } finally {
+                lock.writeLock().unlock();
             }
         });
     }
 
     /**
      * Called when the Provider was either first created or when a connection failure has
-     * been reported.  A reconnection attempt is executed on the connection thread either
-     * immediately or after a delay based on configuration an number of attempts that have
+     * been reported.  A reconnection attempt is executed by the serialization executor either
+     * immediately or after a delay based on configuration and number of attempts that have
      * elapsed.  If a new Provider is able to be created and connected then a recovery task
-     * is scheduled on the main serializer thread.  If the connect attempt fails another
+     * is scheduled with the serialization executor.  If the connect attempt fails another
      * attempt is scheduled based on the configured delay settings until a max attempts
      * limit is hit, if one is set.
-     *
-     * Since the initialize is put on the serializer thread this thread stops and does
-     * not queue another connect task.  This allows for the reconnect delay to be reset
-     * and a failure to initialize a new connection restarts the connect process from the
-     * point of view that connection was lost and an immediate attempt cycle should start.
      */
     private void triggerReconnectionAttempt() {
         if (closingConnection.get() || closed.get() || failed.get()) {
@@ -740,7 +760,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                                     provider = null;
                                 }
 
-                                if(reconnectControl.isStoppageCause(e)) {
+                                if (reconnectControl.isStoppageCause(e)) {
                                     LOG.trace("Stopping attempt due to type of failure");
                                     break;
                                 }
@@ -769,10 +789,8 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     }
 
     /**
-     * Called when the reconnection executor has tried for the last time based on max reconnects
-     * configuration and we now consider this connection attempt to be failed.  This method will
-     * run the failure processing on the serializer executor to ensure that all events back to the
-     * client layer happen from the same thread.
+     * Called when the provider has tried to reconnect for the last time based on reconnection policy
+     * configuration and we now consider this connection attempt to be failed.
      *
      * @param lastFailure the last failure encountered while trying to (re)connect.
      */
@@ -833,15 +851,9 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         if (closingConnection.get() || closed.get() || failed.get()) {
             return;
         }
-        serializer.execute(new Runnable() {
-            @Override
-            public void run() {
-                if (!closingConnection.get() && !closed.get() && !failed.get()) {
-                    LOG.debug("Failover: the provider reports failure: {}", ex.getMessage());
-                    handleProviderFailure(ex);
-                }
-            }
-        });
+
+        LOG.debug("Failover: the provider reports failure: {}", ex.getMessage());
+        handleProviderFailure(provider, ex);
     }
 
     @Override
@@ -858,15 +870,9 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         if (closingConnection.get() || closed.get() || failed.get()) {
             return;
         }
-        serializer.execute(new Runnable() {
-            @Override
-            public void run() {
-                if (!closingConnection.get() && !closed.get() && !failed.get()) {
-                    LOG.debug("Failover: the provider reports an async error: {}", ex.getMessage());
-                    listener.onProviderException(ex);
-                }
-            }
-        });
+
+        LOG.debug("Provider reports an async error: {}", ex.getMessage());
+        listener.onProviderException(ex);
     }
 
     //--------------- Processing for server-provided alternate URIs  ---------//
@@ -911,21 +917,11 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     //--------------- URI update and rebalance methods -----------------------//
 
     public void add(final URI uri) {
-        connectionHub.execute(new Runnable() {
-            @Override
-            public void run() {
-                uris.add(uri);
-            }
-        });
+        serializer.execute(() -> uris.add(uri));
     }
 
     public void remove(final URI uri) {
-        connectionHub.execute(new Runnable() {
-            @Override
-            public void run() {
-                uris.remove(uri);
-            }
-        });
+        serializer.execute(() -> uris.remove(uri));
     }
 
     //--------------- Property Getters and Setters ---------------------------//
@@ -1084,8 +1080,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
 
     @Override
     public String toString() {
-        return "FailoverProvider: " +
-               (connectedURI == null ? "unconnected" : connectedURI.toString());
+        return "FailoverProvider: " + (connectedURI == null ? "unconnected" : connectedURI.toString());
     }
 
     //--------------- FailoverProvider Request Timeout Support ---------------//
@@ -1108,15 +1103,17 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
 
         @Override
         public void run() {
-            final List<FailoverRequest> pending;
-            synchronized (requests) {
-                pending = new ArrayList<FailoverRequest>(requests.values());
-            }
-            for (FailoverRequest request : pending) {
-                if (request.isExpired()) {
-                    LOG.trace("Task {} has timed out, sending failure notice.", request);
-                    request.onFailure(request.createTimedOutException());
+            lock.readLock().lock();
+            try {
+                final List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                for (FailoverRequest request : pending) {
+                    if (request.isExpired()) {
+                        LOG.trace("Task {} has timed out, sending failure notice.", request);
+                        request.onFailure(request.createTimedOutException());
+                    }
                 }
+            } finally {
+                lock.readLock().unlock();
             }
         }
     }
@@ -1135,6 +1132,8 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         private final long requestStarted = System.nanoTime();
         private final long requestTimeout;
 
+        protected Provider activeProvider;
+
         public FailoverRequest(AsyncResult watcher) {
             this(watcher, JmsConnectionInfo.INFINITE);
         }
@@ -1147,57 +1146,74 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
 
         @Override
         public void run() {
-            requests.put(id, this);
-            if (provider == null) {
-                whenOffline(new IOException("Connection failed."));
-            } else {
-                try {
-                    LOG.debug("Executing Failover Task: {} ({})", this, id);
-                    doTask();
-                } catch (UnsupportedOperationException e) {
-                    requests.remove(id);
-                    getWrappedRequest().onFailure(e);
-                } catch (JMSException jmsEx) {
-                    requests.remove(id);
-                    getWrappedRequest().onFailure(jmsEx);
-                } catch (Throwable e) {
-                    LOG.debug("Caught exception while executing task: {} - {}", this, e.getMessage());
-                    whenOffline(IOExceptionSupport.create(e));
-                    handleProviderFailure(IOExceptionSupport.create(e));
+            lock.readLock().lock();
+            try {
+                // Snapshot the current provider as this action is scoped to that
+                // instance and any failure we report should reflect the provider
+                // that was in use when the failure happened.
+                activeProvider = provider;
+                requests.put(id, this);
+                if (activeProvider == null) {
+                    whenOffline(new IOException("Connection failed."));
+                } else {
+                    try {
+                        LOG.debug("Executing Failover Task: {} ({})", this, id);
+                        doTask(activeProvider);
+                    } catch (UnsupportedOperationException e) {
+                        requests.remove(id);
+                        getWrappedRequest().onFailure(e);
+                    } catch (JMSException jmsEx) {
+                        requests.remove(id);
+                        getWrappedRequest().onFailure(jmsEx);
+                    } catch (Throwable e) {
+                        LOG.debug("Caught exception while executing task: {} - {}", this, e.getMessage());
+                        whenOffline(IOExceptionSupport.create(e));
+                        handleProviderFailure(activeProvider, IOExceptionSupport.create(e));
+                    }
                 }
+            } finally {
+                lock.readLock().unlock();
             }
         }
 
         @Override
         public void onFailure(final Throwable error) {
-            if (error instanceof JMSException || closingConnection.get() || closed.get() || failed.get()) {
-                requests.remove(id);
-                super.onFailure(error);
-            } else {
-                LOG.debug("Request received error: {}", error.getMessage());
-                serializer.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        IOException ioError = IOExceptionSupport.create(error);
-                        whenOffline(ioError);
-                        handleProviderFailure(ioError);
-                    }
-                });
+            lock.readLock().lock();
+            try {
+                if (error instanceof JMSException || closingConnection.get() || closed.get() || failed.get()) {
+                    requests.remove(id);
+                    super.onFailure(error);
+                } else {
+                    LOG.debug("Request received error: {}", error.getMessage());
+                    IOException ioError = IOExceptionSupport.create(error);
+                    whenOffline(ioError);
+                    handleProviderFailure(activeProvider, ioError);
+                }
+            } finally {
+                lock.readLock().unlock();
             }
         }
 
         @Override
         public void onSuccess() {
-            requests.remove(id);
+            lock.readLock().lock();
+            try {
+                requests.remove(id);
+            } finally {
+                lock.readLock().unlock();
+            }
             super.onSuccess();
         }
 
         /**
          * Called to execute the specific task that was requested.
          *
+         * @param provider
+         *             The provider instance to use when performing this action.
+         *
          * @throws Exception if an error occurs during task execution.
          */
-        public abstract void doTask() throws Exception;
+        public abstract void doTask(Provider provider) throws Exception;
 
         /**
          * Should the request just succeed when the Provider is not connected.
@@ -1253,9 +1269,6 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
      * that if the connection is successfully established that the connection established event
      * is triggered once before moving on to sending only connection interrupted and restored
      * events.
-     *
-     * The connection state events must all be triggered from the FailoverProvider's serialization
-     * thread, this class ensures that the connection established event follows that pattern.
      */
     protected abstract class CreateConnectionRequest extends FailoverRequest {
 
@@ -1265,29 +1278,35 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
 
         @Override
         public void onSuccess() {
-            serializer.execute(() -> {
+            lock.readLock().lock();
+            try {
                 LOG.trace("First connection requst has completed:");
                 FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
                 processAlternates(provider.getAlternateURIs());
                 listener.onConnectionEstablished(provider.getRemoteURI());
                 reconnectControl.connectionEstablished();
                 CreateConnectionRequest.this.signalConnected();
-            });
+            } finally {
+                lock.readLock().unlock();
+            }
         }
 
         @Override
         public void onFailure(final Throwable result) {
-            if (closingConnection.get() || closed.get() || failed.get()) {
-                requests.remove(id);
-                super.onFailure(result);
-            } else {
-                LOG.debug("Request received error: {}", result.getMessage());
-                serializer.execute(() -> {
+            lock.readLock().lock();
+            try {
+                if (closingConnection.get() || closed.get() || failed.get()) {
+                    requests.remove(id);
+                    super.onFailure(result);
+                } else {
+                    LOG.debug("Request received error: {}", result.getMessage());
                     // If we managed to receive an Open frame it might contain
                     // a failover update so process it before handling the error.
                     processAlternates(provider.getAlternateURIs());
-                    handleProviderFailure(IOExceptionSupport.create(result));
-                });
+                    handleProviderFailure(activeProvider, IOExceptionSupport.create(result));
+                }
+            } finally {
+                lock.readLock().unlock();
             }
         }
 
@@ -1296,9 +1315,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         }
     }
 
-    private static enum FailoverServerListAction {
-        ADD, REPLACE, IGNORE
-    }
+    //----- Reconnection Control State Management ----------------------------//
 
     private class ReconnectControls {
 
@@ -1321,27 +1338,27 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                 if (!recoveryRequired) {
                     if (reconnectAttempts == 0) {
                         LOG.trace("Initial connect attempt will be performed immediately");
-                        connectionHub.execute(runnable);
+                        serializer.execute(runnable);
                     } else if (reconnectAttempts == 1 && initialReconnectDelay > 0) {
                         LOG.trace("Delayed initial reconnect attempt will be in {} milliseconds", initialReconnectDelay);
-                        connectionHub.schedule(runnable, initialReconnectDelay, TimeUnit.MILLISECONDS);
+                        serializer.schedule(runnable, initialReconnectDelay, TimeUnit.MILLISECONDS);
                     } else {
                         long delay = reconnectControl.nextReconnectDelay();
                         LOG.trace("Next reconnect attempt will be in {} milliseconds", delay);
-                        connectionHub.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+                        serializer.schedule(runnable, delay, TimeUnit.MILLISECONDS);
                     }
                 } else if (reconnectAttempts == 0) {
                     if (initialReconnectDelay > 0) {
                         LOG.trace("Delayed initial reconnect attempt will be in {} milliseconds", initialReconnectDelay);
-                        connectionHub.schedule(runnable, initialReconnectDelay, TimeUnit.MILLISECONDS);
+                        serializer.schedule(runnable, initialReconnectDelay, TimeUnit.MILLISECONDS);
                     } else {
                         LOG.trace("Initial Reconnect attempt will be performed immediately");
-                        connectionHub.execute(runnable);
+                        serializer.execute(runnable);
                     }
                 } else {
                     long delay = reconnectControl.nextReconnectDelay();
                     LOG.trace("Next reconnect attempt will be in {} milliseconds", delay);
-                    connectionHub.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+                    serializer.schedule(runnable, delay, TimeUnit.MILLISECONDS);
                 }
             } catch (Throwable unrecoverable) {
                 reportReconnectFailure(unrecoverable);
@@ -1373,11 +1390,10 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         }
 
         public boolean isReconnectAllowed(Throwable cause) {
-            // If a connection attempts fail due to Security errors than
-            // we abort reconnection as there is a configuration issue and
-            // we want to avoid a spinning reconnect cycle that can never
-            // complete.
-            if(isStoppageCause(cause)) {
+            // If a connection attempts fail due to Security errors than we abort
+            // reconnection as there is a configuration issue and we want to avoid
+            // a spinning reconnect cycle that can never complete.
+            if (isStoppageCause(cause)) {
                 return false;
             }
 
index 3365154..25d6db4 100644 (file)
@@ -2303,6 +2303,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout = 20000)
     public void testCreateConsumerAfterConnectionDrops() throws Exception {
         try (TestAmqpPeer originalPeer = new TestAmqpPeer();
@@ -2729,6 +2730,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout = 20000)
     public void testRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListener() throws Exception {
         Symbol errorCondition = AmqpError.RESOURCE_DELETED;
@@ -2737,6 +2739,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
         doRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListenerTestImpl(errorCondition, errorDescription);
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout = 20000)
     public void testRemotelyCloseConsumerWithMessageListenerWithoutErrorFiresJMSExceptionListener() throws Exception {
         // As above but with the peer not including any error condition in its consumer close
@@ -2768,21 +2771,27 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             });
 
             testPeer.expectBegin();
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            testPeer.expectBegin();
+            Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session2.createQueue("myQueue");
 
             // Create a consumer, then remotely end it afterwards.
             testPeer.expectReceiverAttach();
             testPeer.expectLinkFlow();
-            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, errorCondition, errorDescription, 25);
+            testPeer.expectEnd();
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, errorCondition, errorDescription, 10);
 
-            Queue queue = session.createQueue("myQueue");
-            final MessageConsumer consumer = session.createConsumer(queue);
+            final MessageConsumer consumer = session2.createConsumer(queue);
             consumer.setMessageListener(new MessageListener() {
                 @Override
                 public void onMessage(Message message) {
                 }
             });
 
+            // Close first session to allow the receiver remote close timing to be deterministic
+            session1.close();
+
             // Verify the consumer gets marked closed
             testPeer.waitForAllHandlersToComplete(1000);
             assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() {
@@ -2813,6 +2822,11 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             // Try closing it explicitly, should effectively no-op in client.
             // The test peer will throw during close if it sends anything.
             consumer.close();
+
+            // Shut the connection down
+            testPeer.expectClose();
+            connection.close();
+            testPeer.waitForAllHandlersToComplete(1000);
         }
     }
 
index e5291d9..9c624d4 100644 (file)
@@ -94,7 +94,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testCreateProviderOnlyUris() {
-        provider = new FailoverProvider(uris, futuresFactory);
+        provider = new FailoverProvider(uris, Collections.emptyMap(), futuresFactory);
         assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
         assertNull(provider.getRemoteURI());
         assertNotNull(provider.getNestedOptions());
@@ -106,7 +106,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
         Map<String, String> options = new HashMap<String, String>();
         options.put("transport.tcpNoDelay", "true");
 
-        provider = new FailoverProvider(options, futuresFactory);
+        provider = new FailoverProvider(Collections.emptyList(), options, futuresFactory);
         assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
         assertNull(provider.getRemoteURI());
         assertNotNull(provider.getNestedOptions());
@@ -453,13 +453,13 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testAmqpOpenServerListActionDefault() {
-        provider = new FailoverProvider(uris, futuresFactory);
+        provider = new FailoverProvider(uris, Collections.emptyMap(), futuresFactory);
         assertEquals("REPLACE", provider.getAmqpOpenServerListAction());
     }
 
     @Test(timeout = 30000)
     public void testSetGetAmqpOpenServerListAction() {
-        provider = new FailoverProvider(uris, futuresFactory);
+        provider = new FailoverProvider(uris, Collections.emptyMap(), futuresFactory);
         String action = "ADD";
         assertFalse(action.equals(provider.getAmqpOpenServerListAction()));
 
@@ -469,7 +469,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testSetInvalidAmqpOpenServerListActionThrowsIAE() {
-        provider = new FailoverProvider(uris, futuresFactory);
+        provider = new FailoverProvider(uris, Collections.emptyMap(), futuresFactory);
         try {
             provider.setAmqpOpenServerListAction("invalid");
             fail("no exception was thrown");
index b857994..6b7b856 100644 (file)
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.discovery;
 
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -91,7 +92,7 @@ public class DiscoveryProviderFactory extends ProviderFactory {
         }
 
         // Failover will apply the nested options to each URI while attempting to connect.
-        FailoverProvider failover = new FailoverProvider(nestedOptions, futureFactory);
+        FailoverProvider failover = new FailoverProvider(Collections.emptyList(), nestedOptions, futureFactory);
         Map<String, String> leftOverDiscoveryOptions = PropertyUtil.setProperties(failover, mainOptions);
 
         DiscoveryProvider discovery = new DiscoveryProvider(remoteURI, failover);