Optimize retry for FailbackRegistry. (#2763)
author时无两丶 <442367943@qq.com>
Tue, 11 Dec 2018 10:00:30 +0000 (18:00 +0800)
committerIan Luo <ian.luo@gmail.com>
Tue, 11 Dec 2018 10:00:30 +0000 (18:00 +0800)
* Abstract retry task

* Task for retry.

* Fix sth.

* Finish Optimize. fix ci failed.

* Optimize retry for FailbackRegistry.
The retry operation splits into specific operations, such as subscriptions and registrations. This approach allows for very precise retry control.

* Optimize retry for FailbackRegistry.
The retry operation splits into specific operations, such as subscriptions and registrations. This approach allows for very precise retry control.

* Optimize logger warn's msg.

* Optimize FailedNotifiedTask's run method.
Optimize addXXXTask, directly return if we already have a retry task.

* Optimize notify logic, just notify when the urls is not empty.

* Optimize notify logic, just notify when the urls is not empty.

* Optimize timer that use daemon thread.

dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java [new file with mode: 0644]
dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java [new file with mode: 0644]
dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java [new file with mode: 0644]
dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java [new file with mode: 0644]
dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java [new file with mode: 0644]
dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java [new file with mode: 0644]
dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/support/FailbackRegistryTest.java
dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistry.java
dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java

diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/AbstractRetryTask.java
new file mode 100644 (file)
index 0000000..b299b2f
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.retry;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.common.timer.Timer;
+import org.apache.dubbo.common.timer.TimerTask;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * AbstractRetryTask
+ */
+public abstract class AbstractRetryTask implements TimerTask {
+
+    protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * url for retry task
+     */
+    protected final URL url;
+
+    /**
+     * registry for this task
+     */
+    protected final FailbackRegistry registry;
+
+    /**
+     * retry period
+     */
+    protected final long retryPeriod;
+
+    /**
+     * task name for this task
+     */
+    protected final String taskName;
+
+    private volatile boolean cancel;
+
+    AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) {
+        if (url == null || StringUtils.isBlank(taskName)) {
+            throw new IllegalArgumentException();
+        }
+        this.url = url;
+        this.registry = registry;
+        this.taskName = taskName;
+        cancel = false;
+        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
+    }
+
+    public void cancel() {
+        cancel = true;
+    }
+
+    public boolean isCancel() {
+        return cancel;
+    }
+
+    protected void reput(Timeout timeout, long tick) {
+        if (timeout == null) {
+            throw new IllegalArgumentException();
+        }
+
+        Timer timer = timeout.timer();
+        if (timer.isStop() || timeout.isCancelled() || isCancel()) {
+            return;
+        }
+
+        timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) {
+            // other thread cancel this timeout or stop the timer.
+            return;
+        }
+        if (logger.isInfoEnabled()) {
+            logger.info(taskName + " : " + url);
+        }
+        try {
+            doRetry(url, registry, timeout);
+        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
+            logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t);
+            // reput this task when catch exception.
+            reput(timeout, retryPeriod);
+        }
+    }
+
+    protected abstract void doRetry(URL url, FailbackRegistry registry, Timeout timeout);
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedNotifiedTask.java
new file mode 100644 (file)
index 0000000..9ccb673
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.retry;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * FailedNotifiedTask
+ */
+public final class FailedNotifiedTask extends AbstractRetryTask {
+
+    private static final String NAME = "retry subscribe";
+
+    private final NotifyListener listener;
+
+    private final List<URL> urls = new CopyOnWriteArrayList<>();
+
+    public FailedNotifiedTask(URL url, NotifyListener listener) {
+        super(url, null, NAME);
+        if (listener == null) {
+            throw new IllegalArgumentException();
+        }
+        this.listener = listener;
+    }
+
+    public void addUrlToRetry(List<URL> urls) {
+        if (CollectionUtils.isEmpty(urls)) {
+            return;
+        }
+        this.urls.addAll(urls);
+    }
+
+    public void removeRetryUrl(List<URL> urls) {
+        this.urls.removeAll(urls);
+    }
+
+    @Override
+    protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
+        if (CollectionUtils.isNotEmpty(urls)) {
+            listener.notify(urls);
+            urls.clear();
+        }
+        reput(timeout, retryPeriod);
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedRegisteredTask.java
new file mode 100644 (file)
index 0000000..c4d9cc6
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.retry;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+/**
+ * FailedRegisteredTask
+ */
+public final class FailedRegisteredTask extends AbstractRetryTask {
+
+    private static final String NAME = "retry register";
+
+    public FailedRegisteredTask(URL url, FailbackRegistry registry) {
+        super(url, registry, NAME);
+    }
+
+    @Override
+    protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
+        registry.doRegister(url);
+        registry.removeFailedRegisteredTask(url);
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedSubscribedTask.java
new file mode 100644 (file)
index 0000000..06d1ec3
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.retry;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+/**
+ * FailedSubscribedTask
+ */
+public final class FailedSubscribedTask extends AbstractRetryTask {
+
+    private static final String NAME = "retry subscribe";
+
+    private final NotifyListener listener;
+
+    public FailedSubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) {
+        super(url, registry, NAME);
+        if (listener == null) {
+            throw new IllegalArgumentException();
+        }
+        this.listener = listener;
+    }
+
+    @Override
+    protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
+        registry.doSubscribe(url, listener);
+        registry.removeFailedSubscribedTask(url, listener);
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnregisteredTask.java
new file mode 100644 (file)
index 0000000..4cf3aa4
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.retry;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+/**
+ * FailedUnregisteredTask
+ */
+public final class FailedUnregisteredTask extends AbstractRetryTask {
+
+    private static final String NAME = "retry unregister";
+
+    public FailedUnregisteredTask(URL url, FailbackRegistry registry) {
+        super(url, registry, NAME);
+    }
+
+    @Override
+    protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
+        registry.doUnregister(url);
+        registry.removeFailedUnregisteredTask(url);
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/FailedUnsubscribedTask.java
new file mode 100644 (file)
index 0000000..6814583
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.registry.retry;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+/**
+ * FailedUnsubscribedTask
+ */
+public final class FailedUnsubscribedTask extends AbstractRetryTask {
+
+    private static final String NAME = "retry unsubscribe";
+
+    private final NotifyListener listener;
+
+    public FailedUnsubscribedTask(URL url, FailbackRegistry registry, NotifyListener listener) {
+        super(url, registry, NAME);
+        if (listener == null) {
+            throw new IllegalArgumentException();
+        }
+        this.listener = listener;
+    }
+
+    @Override
+    protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
+        registry.unsubscribe(url, listener);
+        registry.removeFailedUnsubscribedTask(url, listener);
+    }
+}
index 5c456d5..5d2d2f0 100644 (file)
@@ -18,10 +18,14 @@ package org.apache.dubbo.registry.support;
 \r
 import org.apache.dubbo.common.Constants;\r
 import org.apache.dubbo.common.URL;\r
-import org.apache.dubbo.common.utils.ConcurrentHashSet;\r
-import org.apache.dubbo.common.utils.ExecutorUtil;\r
+import org.apache.dubbo.common.timer.HashedWheelTimer;\r
 import org.apache.dubbo.common.utils.NamedThreadFactory;\r
 import org.apache.dubbo.registry.NotifyListener;\r
+import org.apache.dubbo.registry.retry.FailedNotifiedTask;\r
+import org.apache.dubbo.registry.retry.FailedRegisteredTask;\r
+import org.apache.dubbo.registry.retry.FailedSubscribedTask;\r
+import org.apache.dubbo.registry.retry.FailedUnregisteredTask;\r
+import org.apache.dubbo.registry.retry.FailedUnsubscribedTask;\r
 \r
 import java.util.HashMap;\r
 import java.util.HashSet;\r
@@ -30,108 +34,197 @@ import java.util.Map;
 import java.util.Set;\r
 import java.util.concurrent.ConcurrentHashMap;\r
 import java.util.concurrent.ConcurrentMap;\r
-import java.util.concurrent.Executors;\r
-import java.util.concurrent.Future;\r
-import java.util.concurrent.ScheduledExecutorService;\r
-import java.util.concurrent.ScheduledFuture;\r
 import java.util.concurrent.TimeUnit;\r
 \r
 /**\r
  * FailbackRegistry. (SPI, Prototype, ThreadSafe)\r
- *\r
  */\r
 public abstract class FailbackRegistry extends AbstractRegistry {\r
 \r
-    // Scheduled executor service\r
-    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));\r
-\r
-    // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry\r
-    private final ScheduledFuture<?> retryFuture;\r
+    /*  retry task map */\r
 \r
-    private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();\r
+    private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();\r
 \r
-    private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();\r
+    private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();\r
 \r
-    private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();\r
+    private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();\r
 \r
-    private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();\r
+    private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();\r
 \r
-    private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();\r
+    private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();\r
 \r
     /**\r
      * The time in milliseconds the retryExecutor will wait\r
      */\r
     private final int retryPeriod;\r
 \r
+    // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry\r
+    private final HashedWheelTimer retryTimer;\r
+\r
     public FailbackRegistry(URL url) {\r
         super(url);\r
         this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);\r
-        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {\r
-            @Override\r
-            public void run() {\r
-                // Check and connect to the registry\r
-                try {\r
-                    retry();\r
-                } catch (Throwable t) { // Defensive fault tolerance\r
-                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);\r
-                }\r
-            }\r
-        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);\r
+\r
+        // since the retry task will not be very much. 128 ticks is enough.\r
+        retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);\r
     }\r
 \r
-    public Future<?> getRetryFuture() {\r
-        return retryFuture;\r
+    public void removeFailedRegisteredTask(URL url) {\r
+        failedRegistered.remove(url);\r
     }\r
 \r
-    public Set<URL> getFailedRegistered() {\r
-        return failedRegistered;\r
+    public void removeFailedUnregisteredTask(URL url) {\r
+        failedUnregistered.remove(url);\r
     }\r
 \r
-    public Set<URL> getFailedUnregistered() {\r
-        return failedUnregistered;\r
+    public void removeFailedSubscribedTask(URL url, NotifyListener listener) {\r
+        Holder h = new Holder(url, listener);\r
+        failedSubscribed.remove(h);\r
     }\r
 \r
-    public Map<URL, Set<NotifyListener>> getFailedSubscribed() {\r
-        return failedSubscribed;\r
+    public void removeFailedUnsubscribedTask(URL url, NotifyListener listener) {\r
+        Holder h = new Holder(url, listener);\r
+        failedUnsubscribed.remove(h);\r
     }\r
 \r
-    public Map<URL, Set<NotifyListener>> getFailedUnsubscribed() {\r
-        return failedUnsubscribed;\r
+    public void removeFailedNotifiedTask(URL url, NotifyListener listener) {\r
+        Holder h = new Holder(url, listener);\r
+        failedNotified.remove(h);\r
     }\r
 \r
-    public Map<URL, Map<NotifyListener, List<URL>>> getFailedNotified() {\r
-        return failedNotified;\r
+    private void addFailedRegistered(URL url) {\r
+        FailedRegisteredTask oldOne = failedRegistered.get(url);\r
+        if (oldOne != null) {\r
+            return;\r
+        }\r
+        FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);\r
+        oldOne = failedRegistered.putIfAbsent(url, newTask);\r
+        if (oldOne == null) {\r
+            // never has a retry task. then start a new task for retry.\r
+            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);\r
+        }\r
+    }\r
+\r
+    private void removeFailedRegistered(URL url) {\r
+        FailedRegisteredTask f = failedRegistered.remove(url);\r
+        if (f != null) {\r
+            f.cancel();\r
+        }\r
+    }\r
+\r
+    private void addFailedUnregistered(URL url) {\r
+        FailedUnregisteredTask oldOne = failedUnregistered.get(url);\r
+        if (oldOne != null) {\r
+            return;\r
+        }\r
+        FailedUnregisteredTask newTask = new FailedUnregisteredTask(url, this);\r
+        oldOne = failedUnregistered.putIfAbsent(url, newTask);\r
+        if (oldOne == null) {\r
+            // never has a retry task. then start a new task for retry.\r
+            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);\r
+        }\r
+    }\r
+\r
+    private void removeFailedUnregistered(URL url) {\r
+        FailedUnregisteredTask f = failedUnregistered.remove(url);\r
+        if (f != null) {\r
+            f.cancel();\r
+        }\r
     }\r
 \r
     private void addFailedSubscribed(URL url, NotifyListener listener) {\r
-        Set<NotifyListener> listeners = failedSubscribed.get(url);\r
-        if (listeners == null) {\r
-            failedSubscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());\r
-            listeners = failedSubscribed.get(url);\r
+        Holder h = new Holder(url, listener);\r
+        FailedSubscribedTask oldOne = failedSubscribed.get(h);\r
+        if (oldOne != null) {\r
+            return;\r
+        }\r
+        FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener);\r
+        oldOne = failedSubscribed.putIfAbsent(h, newTask);\r
+        if (oldOne == null) {\r
+            // never has a retry task. then start a new task for retry.\r
+            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);\r
         }\r
-        listeners.add(listener);\r
     }\r
 \r
     private void removeFailedSubscribed(URL url, NotifyListener listener) {\r
-        Set<NotifyListener> listeners = failedSubscribed.get(url);\r
-        if (listeners != null) {\r
-            listeners.remove(listener);\r
+        Holder h = new Holder(url, listener);\r
+        FailedSubscribedTask f = failedSubscribed.remove(h);\r
+        if (f != null) {\r
+            f.cancel();\r
+        }\r
+        removeFailedUnsubscribed(url, listener);\r
+        removeFailedNotified(url, listener);\r
+    }\r
+\r
+    private void addFailedUnsubscribed(URL url, NotifyListener listener) {\r
+        Holder h = new Holder(url, listener);\r
+        FailedUnsubscribedTask oldOne = failedUnsubscribed.get(h);\r
+        if (oldOne != null) {\r
+            return;\r
+        }\r
+        FailedUnsubscribedTask newTask = new FailedUnsubscribedTask(url, this, listener);\r
+        oldOne = failedUnsubscribed.putIfAbsent(h, newTask);\r
+        if (oldOne == null) {\r
+            // never has a retry task. then start a new task for retry.\r
+            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);\r
+        }\r
+    }\r
+\r
+    private void removeFailedUnsubscribed(URL url, NotifyListener listener) {\r
+        Holder h = new Holder(url, listener);\r
+        FailedUnsubscribedTask f = failedUnsubscribed.remove(h);\r
+        if (f != null) {\r
+            f.cancel();\r
         }\r
-        listeners = failedUnsubscribed.get(url);\r
-        if (listeners != null) {\r
-            listeners.remove(listener);\r
+    }\r
+\r
+    private void addFailedNotified(URL url, NotifyListener listener, List<URL> urls) {\r
+        Holder h = new Holder(url, listener);\r
+        FailedNotifiedTask newTask = new FailedNotifiedTask(url, listener);\r
+        FailedNotifiedTask f = failedNotified.putIfAbsent(h, newTask);\r
+        if (f == null) {\r
+            // never has a retry task. then start a new task for retry.\r
+            newTask.addUrlToRetry(urls);\r
+            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);\r
+        } else {\r
+            // just add urls which needs retry.\r
+            newTask.addUrlToRetry(urls);\r
         }\r
-        Map<NotifyListener, List<URL>> notified = failedNotified.get(url);\r
-        if (notified != null) {\r
-            notified.remove(listener);\r
+    }\r
+\r
+    private void removeFailedNotified(URL url, NotifyListener listener) {\r
+        Holder h = new Holder(url, listener);\r
+        FailedNotifiedTask f = failedNotified.remove(h);\r
+        if (f != null) {\r
+            f.cancel();\r
         }\r
     }\r
 \r
+    public ConcurrentMap<URL, FailedRegisteredTask> getFailedRegistered() {\r
+        return failedRegistered;\r
+    }\r
+\r
+    public ConcurrentMap<URL, FailedUnregisteredTask> getFailedUnregistered() {\r
+        return failedUnregistered;\r
+    }\r
+\r
+    public ConcurrentMap<Holder, FailedSubscribedTask> getFailedSubscribed() {\r
+        return failedSubscribed;\r
+    }\r
+\r
+    public ConcurrentMap<Holder, FailedUnsubscribedTask> getFailedUnsubscribed() {\r
+        return failedUnsubscribed;\r
+    }\r
+\r
+    public ConcurrentMap<Holder, FailedNotifiedTask> getFailedNotified() {\r
+        return failedNotified;\r
+    }\r
+\r
     @Override\r
     public void register(URL url) {\r
         super.register(url);\r
-        failedRegistered.remove(url);\r
-        failedUnregistered.remove(url);\r
+        removeFailedRegistered(url);\r
+        removeFailedUnregistered(url);\r
         try {\r
             // Sending a registration request to the server side\r
             doRegister(url);\r
@@ -153,15 +246,15 @@ public abstract class FailbackRegistry extends AbstractRegistry {
             }\r
 \r
             // Record a failed registration request to a failed list, retry regularly\r
-            failedRegistered.add(url);\r
+            addFailedRegistered(url);\r
         }\r
     }\r
 \r
     @Override\r
     public void unregister(URL url) {\r
         super.unregister(url);\r
-        failedRegistered.remove(url);\r
-        failedUnregistered.remove(url);\r
+        removeFailedRegistered(url);\r
+        removeFailedUnregistered(url);\r
         try {\r
             // Sending a cancellation request to the server side\r
             doUnregister(url);\r
@@ -183,7 +276,7 @@ public abstract class FailbackRegistry extends AbstractRegistry {
             }\r
 \r
             // Record a failed registration request to a failed list, retry regularly\r
-            failedUnregistered.add(url);\r
+            addFailedUnregistered(url);\r
         }\r
     }\r
 \r
@@ -245,12 +338,7 @@ public abstract class FailbackRegistry extends AbstractRegistry {
             }\r
 \r
             // Record a failed registration request to a failed list, retry regularly\r
-            Set<NotifyListener> listeners = failedUnsubscribed.get(url);\r
-            if (listeners == null) {\r
-                failedUnsubscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());\r
-                listeners = failedUnsubscribed.get(url);\r
-            }\r
-            listeners.add(listener);\r
+            addFailedUnsubscribed(url, listener);\r
         }\r
     }\r
 \r
@@ -266,12 +354,7 @@ public abstract class FailbackRegistry extends AbstractRegistry {
             doNotify(url, listener, urls);\r
         } catch (Exception t) {\r
             // Record a failed registration request to a failed list, retry regularly\r
-            Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);\r
-            if (listeners == null) {\r
-                failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());\r
-                listeners = failedNotified.get(url);\r
-            }\r
-            listeners.put(listener, urls);\r
+            addFailedNotified(url, listener, urls);\r
             logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);\r
         }\r
     }\r
@@ -289,7 +372,7 @@ public abstract class FailbackRegistry extends AbstractRegistry {
                 logger.info("Recover register url " + recoverRegistered);\r
             }\r
             for (URL url : recoverRegistered) {\r
-                failedRegistered.add(url);\r
+                addFailedRegistered(url);\r
             }\r
         }\r
         // subscribe\r
@@ -307,156 +390,49 @@ public abstract class FailbackRegistry extends AbstractRegistry {
         }\r
     }\r
 \r
-    // Retry the failed actions\r
-    protected void retry() {\r
-        if (!failedRegistered.isEmpty()) {\r
-            Set<URL> failed = new HashSet<URL>(failedRegistered);\r
-            if (failed.size() > 0) {\r
-                if (logger.isInfoEnabled()) {\r
-                    logger.info("Retry register " + failed);\r
-                }\r
-                try {\r
-                    for (URL url : failed) {\r
-                        try {\r
-                            doRegister(url);\r
-                            failedRegistered.remove(url);\r
-                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry\r
-                            logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);\r
-                        }\r
-                    }\r
-                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry\r
-                    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);\r
-                }\r
-            }\r
-        }\r
-        if (!failedUnregistered.isEmpty()) {\r
-            Set<URL> failed = new HashSet<URL>(failedUnregistered);\r
-            if (!failed.isEmpty()) {\r
-                if (logger.isInfoEnabled()) {\r
-                    logger.info("Retry unregister " + failed);\r
-                }\r
-                try {\r
-                    for (URL url : failed) {\r
-                        try {\r
-                            doUnregister(url);\r
-                            failedUnregistered.remove(url);\r
-                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry\r
-                            logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);\r
-                        }\r
-                    }\r
-                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry\r
-                    logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);\r
-                }\r
-            }\r
-        }\r
-        if (!failedSubscribed.isEmpty()) {\r
-            Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);\r
-            for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {\r
-                if (entry.getValue() == null || entry.getValue().size() == 0) {\r
-                    failed.remove(entry.getKey());\r
-                }\r
-            }\r
-            if (failed.size() > 0) {\r
-                if (logger.isInfoEnabled()) {\r
-                    logger.info("Retry subscribe " + failed);\r
-                }\r
-                try {\r
-                    for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {\r
-                        URL url = entry.getKey();\r
-                        Set<NotifyListener> listeners = entry.getValue();\r
-                        for (NotifyListener listener : listeners) {\r
-                            try {\r
-                                doSubscribe(url, listener);\r
-                                listeners.remove(listener);\r
-                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry\r
-                                logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);\r
-                            }\r
-                        }\r
-                    }\r
-                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry\r
-                    logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);\r
-                }\r
-            }\r
-        }\r
-        if (!failedUnsubscribed.isEmpty()) {\r
-            Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);\r
-            for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {\r
-                if (entry.getValue() == null || entry.getValue().isEmpty()) {\r
-                    failed.remove(entry.getKey());\r
-                }\r
-            }\r
-            if (failed.size() > 0) {\r
-                if (logger.isInfoEnabled()) {\r
-                    logger.info("Retry unsubscribe " + failed);\r
-                }\r
-                try {\r
-                    for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {\r
-                        URL url = entry.getKey();\r
-                        Set<NotifyListener> listeners = entry.getValue();\r
-                        for (NotifyListener listener : listeners) {\r
-                            try {\r
-                                doUnsubscribe(url, listener);\r
-                                listeners.remove(listener);\r
-                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry\r
-                                logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);\r
-                            }\r
-                        }\r
-                    }\r
-                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry\r
-                    logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);\r
-                }\r
-            }\r
-        }\r
-        if (!failedNotified.isEmpty()) {\r
-            Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);\r
-            for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {\r
-                if (entry.getValue() == null || entry.getValue().size() == 0) {\r
-                    failed.remove(entry.getKey());\r
-                }\r
-            }\r
-            if (failed.size() > 0) {\r
-                if (logger.isInfoEnabled()) {\r
-                    logger.info("Retry notify " + failed);\r
-                }\r
-                try {\r
-                    for (Map<NotifyListener, List<URL>> values : failed.values()) {\r
-                        for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {\r
-                            try {\r
-                                NotifyListener listener = entry.getKey();\r
-                                List<URL> urls = entry.getValue();\r
-                                listener.notify(urls);\r
-                                values.remove(listener);\r
-                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry\r
-                                logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);\r
-                            }\r
-                        }\r
-                    }\r
-                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry\r
-                    logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);\r
-                }\r
-            }\r
-        }\r
-    }\r
-\r
     @Override\r
     public void destroy() {\r
         super.destroy();\r
-        try {\r
-            retryFuture.cancel(true);\r
-        } catch (Throwable t) {\r
-            logger.warn(t.getMessage(), t);\r
-        }\r
-        ExecutorUtil.gracefulShutdown(retryExecutor, retryPeriod);\r
+        retryTimer.stop();\r
     }\r
 \r
     // ==== Template method ====\r
 \r
-    protected abstract void doRegister(URL url);\r
+    public abstract void doRegister(URL url);\r
+\r
+    public abstract void doUnregister(URL url);\r
+\r
+    public abstract void doSubscribe(URL url, NotifyListener listener);\r
+\r
+    public abstract void doUnsubscribe(URL url, NotifyListener listener);\r
 \r
-    protected abstract void doUnregister(URL url);\r
+    static class Holder {\r
 \r
-    protected abstract void doSubscribe(URL url, NotifyListener listener);\r
+        private final URL url;\r
 \r
-    protected abstract void doUnsubscribe(URL url, NotifyListener listener);\r
+        private final NotifyListener notifyListener;\r
+\r
+        Holder(URL url, NotifyListener notifyListener) {\r
+            if (url == null || notifyListener == null) {\r
+                throw new IllegalArgumentException();\r
+            }\r
+            this.url = url;\r
+            this.notifyListener = notifyListener;\r
+        }\r
 \r
+        @Override\r
+        public int hashCode() {\r
+            return url.hashCode() + notifyListener.hashCode();\r
+        }\r
+\r
+        @Override\r
+        public boolean equals(Object obj) {\r
+            if (obj instanceof Holder) {\r
+                Holder h = (Holder) obj;\r
+                return this.url.equals(h.url) && this.notifyListener.equals(h.notifyListener);\r
+            } else {\r
+                return false;\r
+            }\r
+        }\r
+    }\r
 }\r
index 7f3ac3a..6db670c 100644 (file)
@@ -20,7 +20,6 @@ import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.URL;\r
 import org.apache.dubbo.common.utils.CollectionUtils;\r
 import org.apache.dubbo.registry.NotifyListener;\r
-\r
 import org.junit.Assert;\r
 import org.junit.Before;\r
 import org.junit.Test;\r
@@ -53,8 +52,7 @@ public class FailbackRegistryTest {
     }\r
 \r
     /**\r
-     * Test method for\r
-     * {@link org.apache.dubbo.registry.support.FailbackRegistry#retry()}.\r
+     * Test method for retry\r
      *\r
      * @throws Exception\r
      */\r
@@ -62,7 +60,9 @@ public class FailbackRegistryTest {
     public void testDoRetry() throws Exception {\r
 \r
         final AtomicReference<Boolean> notified = new AtomicReference<Boolean>(false);\r
-        final CountDownLatch latch = new CountDownLatch(3);//All of them are called 3 times. Successful attempts to reduce the failure of 1. subscribe register will not be done again\r
+\r
+        // the latest latch just for 3. Because retry method has been removed.\r
+        final CountDownLatch latch = new CountDownLatch(2);\r
 \r
         NotifyListener listner = new NotifyListener() {\r
             @Override\r
@@ -79,7 +79,7 @@ public class FailbackRegistryTest {
 \r
         //Failure can not be called to listener.\r
         assertEquals(false, notified.get());\r
-        assertEquals(3, latch.getCount());\r
+        assertEquals(2, latch.getCount());\r
 \r
         registry.setBad(false);\r
 \r
@@ -191,7 +191,7 @@ public class FailbackRegistryTest {
                 notified.set(Boolean.TRUE);\r
             }\r
         };\r
-        \r
+\r
         MockRegistry mockRegistry = new MockRegistry(registryUrl, countDownLatch);\r
         mockRegistry.register(serviceUrl);\r
         mockRegistry.subscribe(serviceUrl, listener);\r
@@ -200,7 +200,8 @@ public class FailbackRegistryTest {
         mockRegistry.recover();\r
         countDownLatch.await();\r
         Assert.assertEquals(0, mockRegistry.getFailedRegistered().size());\r
-        Assert.assertEquals(null, mockRegistry.getFailedSubscribed().get(registryUrl));\r
+        FailbackRegistry.Holder h = new FailbackRegistry.Holder(registryUrl, listener);\r
+        Assert.assertEquals(null, mockRegistry.getFailedSubscribed().get(h));\r
         Assert.assertEquals(countDownLatch.getCount(), 0);\r
     }\r
 \r
@@ -224,7 +225,7 @@ public class FailbackRegistryTest {
         }\r
 \r
         @Override\r
-        protected void doRegister(URL url) {\r
+        public void doRegister(URL url) {\r
             if (bad) {\r
                 throw new RuntimeException("can not invoke!");\r
             }\r
@@ -234,7 +235,7 @@ public class FailbackRegistryTest {
         }\r
 \r
         @Override\r
-        protected void doUnregister(URL url) {\r
+        public void doUnregister(URL url) {\r
             if (bad) {\r
                 throw new RuntimeException("can not invoke!");\r
             }\r
@@ -244,7 +245,7 @@ public class FailbackRegistryTest {
         }\r
 \r
         @Override\r
-        protected void doSubscribe(URL url, NotifyListener listener) {\r
+        public void doSubscribe(URL url, NotifyListener listener) {\r
             if (bad) {\r
                 throw new RuntimeException("can not invoke!");\r
             }\r
@@ -254,7 +255,7 @@ public class FailbackRegistryTest {
         }\r
 \r
         @Override\r
-        protected void doUnsubscribe(URL url, NotifyListener listener) {\r
+        public void doUnsubscribe(URL url, NotifyListener listener) {\r
             if (bad) {\r
                 throw new RuntimeException("can not invoke!");\r
             }\r
@@ -263,16 +264,6 @@ public class FailbackRegistryTest {
         }\r
 \r
         @Override\r
-        protected void retry() {\r
-            super.retry();\r
-            if (bad) {\r
-                throw new RuntimeException("can not invoke!");\r
-            }\r
-            //System.out.println("do retry");\r
-            latch.countDown();\r
-        }\r
-\r
-        @Override\r
         public boolean isAvailable() {\r
             return true;\r
         }\r
index 45c94a7..2c6b8c1 100644 (file)
@@ -138,22 +138,22 @@ public class DubboRegistry extends FailbackRegistry {
     }\r
 \r
     @Override\r
-    protected void doRegister(URL url) {\r
+    public void doRegister(URL url) {\r
         registryService.register(url);\r
     }\r
 \r
     @Override\r
-    protected void doUnregister(URL url) {\r
+    public void doUnregister(URL url) {\r
         registryService.unregister(url);\r
     }\r
 \r
     @Override\r
-    protected void doSubscribe(URL url, NotifyListener listener) {\r
+    public void doSubscribe(URL url, NotifyListener listener) {\r
         registryService.subscribe(url, listener);\r
     }\r
 \r
     @Override\r
-    protected void doUnsubscribe(URL url, NotifyListener listener) {\r
+    public void doUnsubscribe(URL url, NotifyListener listener) {\r
         registryService.unsubscribe(url, listener);\r
     }\r
 \r
index 8ff7daa..06835da 100644 (file)
@@ -249,17 +249,17 @@ public class MulticastRegistry extends FailbackRegistry {
     }\r
 \r
     @Override\r
-    protected void doRegister(URL url) {\r
+    public void doRegister(URL url) {\r
         multicast(Constants.REGISTER + " " + url.toFullString());\r
     }\r
 \r
     @Override\r
-    protected void doUnregister(URL url) {\r
+    public void doUnregister(URL url) {\r
         multicast(Constants.UNREGISTER + " " + url.toFullString());\r
     }\r
 \r
     @Override\r
-    protected void doSubscribe(URL url, NotifyListener listener) {\r
+    public void doSubscribe(URL url, NotifyListener listener) {\r
         if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {\r
             admin = true;\r
         }\r
@@ -273,7 +273,7 @@ public class MulticastRegistry extends FailbackRegistry {
     }\r
 \r
     @Override\r
-    protected void doUnsubscribe(URL url, NotifyListener listener) {\r
+    public void doUnsubscribe(URL url, NotifyListener listener) {\r
         if (!Constants.ANY_VALUE.equals(url.getServiceInterface())\r
                 && url.getParameter(Constants.REGISTER_KEY, true)) {\r
             unregister(url);\r
index 43426c7..d7a8ee0 100644 (file)
@@ -109,7 +109,7 @@ public class ZookeeperRegistry extends FailbackRegistry {
     }\r
 \r
     @Override\r
-    protected void doRegister(URL url) {\r
+    public void doRegister(URL url) {\r
         try {\r
             zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));\r
         } catch (Throwable e) {\r
@@ -118,7 +118,7 @@ public class ZookeeperRegistry extends FailbackRegistry {
     }\r
 \r
     @Override\r
-    protected void doUnregister(URL url) {\r
+    public void doUnregister(URL url) {\r
         try {\r
             zkClient.delete(toUrlPath(url));\r
         } catch (Throwable e) {\r
@@ -127,7 +127,7 @@ public class ZookeeperRegistry extends FailbackRegistry {
     }\r
 \r
     @Override\r
-    protected void doSubscribe(final URL url, final NotifyListener listener) {\r
+    public void doSubscribe(final URL url, final NotifyListener listener) {\r
         try {\r
             if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {\r
                 String root = toRootPath();\r
@@ -195,7 +195,7 @@ public class ZookeeperRegistry extends FailbackRegistry {
     }\r
 \r
     @Override\r
-    protected void doUnsubscribe(URL url, NotifyListener listener) {\r
+    public void doUnsubscribe(URL url, NotifyListener listener) {\r
         ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);\r
         if (listeners != null) {\r
             ChildListener zkListener = listeners.get(listener);\r