ARIES-1880 - Use builder in subscribe ARIES-1880 10/head
authorChristian Schneider <cschneid@adobe.com>
Fri, 4 Jan 2019 14:08:03 +0000 (15:08 +0100)
committerChristian Schneider <cschneid@adobe.com>
Fri, 4 Jan 2019 14:08:03 +0000 (15:08 +0100)
org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequest.java [new file with mode: 0644]
org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
pom.xml

index 2cd1138a196e774b35c7ca50b38a9800b0cd2244..a66279d2c6a57563a38798588d7fa64fa7802925 100644 (file)
@@ -18,7 +18,6 @@
 package org.apache.aries.events.api;
 
 import java.util.Map;
-import java.util.function.Consumer;
 
 /**
  * Journaled messaging API
@@ -39,7 +38,7 @@ public interface Messaging {
      * @param callback will be called for each message received
      * @return Returned subscription must be closed by the caller to unsubscribe
      */
-    Subscription subscribe(String topic, Position position, Seek seek, Consumer<Received> callback);
+    Subscription subscribe(SubscribeRequest request);
 
     /**
      * Create a message with payload and metadata
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequest.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequest.java
new file mode 100644 (file)
index 0000000..a209968
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.aries.events.api;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.function.Consumer;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+@ParametersAreNonnullByDefault
+public class SubscribeRequest {
+    private final String topic;
+    private final Consumer<Received> callback;
+    private Position position;
+    private Seek seek = Seek.latest;
+    
+    public SubscribeRequest(String topic, Consumer<Received> callback) {
+        this.topic = topic;
+        this.callback = callback;
+    }
+
+    public static SubscribeRequest to(String topic, Consumer<Received> callback) {
+        return new SubscribeRequest(topic, callback);
+    }
+    
+    public SubscribeRequest startAt(Position position) {
+        this.position = position;
+        return this;
+    }
+    
+    public SubscribeRequest seek(Seek seek) {
+        this.seek = requireNonNull(seek, "Seek must not be null");
+        return this;
+    }
+    
+    public String getTopic() {
+        return topic;
+    }
+    
+    public Position getPosition() {
+        return position;
+    }
+    
+    public Seek getSeek() {
+        return seek;
+    }
+    
+    public Consumer<Received> getCallback() {
+        return callback;
+    }
+}
index 8966c16d540f87bc06edecf19d95e574a8d39390..634887b3a791d5a55c4aa74b7ea3579b86418d5f 100644 (file)
@@ -19,13 +19,11 @@ package org.apache.aries.events.memory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
 
 import org.apache.aries.events.api.Message;
 import org.apache.aries.events.api.Messaging;
 import org.apache.aries.events.api.Position;
-import org.apache.aries.events.api.Received;
-import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
 import org.apache.aries.events.api.Type;
 import org.osgi.service.component.annotations.Component;
@@ -42,9 +40,9 @@ public class InMemoryMessaging implements Messaging {
     }
 
     @Override
-    public Subscription subscribe(String topicName, Position position, Seek seek, Consumer<Received> callback) {
-        Topic topic = getOrCreate(topicName);
-        return topic.subscribe(position, seek, callback);
+    public Subscription subscribe(SubscribeRequest request) {
+        Topic topic = getOrCreate(request.getTopic());
+        return topic.subscribe(request);
     }
 
     @Override
index e9175a4ed1da288108ceeab0dd09d0a3c7d2cdb2..8d4755e11f8b642870ba85dc7a46fe30d06f0249 100644 (file)
@@ -27,6 +27,7 @@ import org.apache.aries.events.api.Message;
 import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.Received;
 import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,10 +48,10 @@ public class Topic {
         return new MemoryPosition(offset);
     }
 
-    public Subscription subscribe(Position position, Seek seek, Consumer<Received> callback) {
-        long startOffset = getStartOffset((MemoryPosition) position, seek);
+    public Subscription subscribe(SubscribeRequest request) {
+        long startOffset = getStartOffset((MemoryPosition) request.getPosition(), request.getSeek());
         log.debug("Consuming from " + startOffset);
-        return new TopicSubscription(startOffset, callback);
+        return new TopicSubscription(startOffset, request.getCallback());
     }
 
     private long getStartOffset(MemoryPosition position, Seek seek) {
@@ -59,10 +60,8 @@ public class Topic {
         } else {
             if (seek == Seek.earliest) {
                 return this.journal.getFirstOffset();
-            } else if (seek == Seek.latest) {
-                return this.journal.getLastOffset() + 1;
             } else {
-                throw new IllegalArgumentException("Seek must not be null");
+                return this.journal.getLastOffset() + 1;
             }
         }
     }
index 62d88c06a89d816ec1760db1cb9f28f48f376dcd..aaf77e0e9baedda4967a4b7c4581cdcb9cb20704 100644 (file)
@@ -1,5 +1,6 @@
 package org.apache.aries.events.memory;
 
+import static org.apache.aries.events.api.SubscribeRequest.to;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
@@ -23,6 +24,7 @@ import org.apache.aries.events.api.Messaging;
 import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.Received;
 import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
 import org.junit.After;
 import org.junit.Before;
@@ -64,7 +66,7 @@ public class MessagingTest {
     
     @Test
     public void testSend() {
-        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        subscribe(to("test", callback).seek(Seek.earliest));
         String content = "testcontent";
         send("test", content);
         verify(callback, timeout(1000)).accept(messageCaptor.capture());
@@ -75,22 +77,22 @@ public class MessagingTest {
         assertThat(received.getMessage().getProperties().get("my"), equalTo("testvalue"));
     }
     
-    @Test(expected=IllegalArgumentException.class)
+    @Test(expected=NullPointerException.class)
     public void testInvalidSubscribe() {
-        messaging.subscribe("test", null, null, callback);
+        subscribe(to("test", callback).seek(null));
     }
     
     @Test
     public void testExceptionInHandler() {
         doThrow(new RuntimeException("Expected exception")).when(callback).accept(Mockito.any(Received.class));
-        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        subscribe(to("test", callback));
         send("test", "testcontent");
         verify(callback, timeout(1000)).accept(messageCaptor.capture());
     }
 
     @Test
     public void testEarliestBefore() {
-        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        subscribe(to("test", callback).seek(Seek.earliest));
         send("test", "testcontent");
         send("test", "testcontent2");
         verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
@@ -100,7 +102,7 @@ public class MessagingTest {
     @Test
     public void testEarliestAfter() {
         send("test", "testcontent");
-        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        subscribe(to("test", callback).seek(Seek.earliest));
         send("test", "testcontent2");
         verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
         assertThat(messageContents(), contains("testcontent", "testcontent2"));
@@ -108,7 +110,7 @@ public class MessagingTest {
     
     @Test
     public void testLatestBefore() {
-        subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+        subscribe(to("test", callback));
         send("test", "testcontent");
         send("test", "testcontent2");
         verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
@@ -118,7 +120,7 @@ public class MessagingTest {
     @Test
     public void testLatest() {
         send("test", "testcontent");
-        subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+        subscribe(to("test", callback));
         send("test", "testcontent2");
         verify(callback, timeout(1000)).accept(messageCaptor.capture());
         assertThat(messageContents(), contains("testcontent2"));
@@ -128,10 +130,14 @@ public class MessagingTest {
     public void testFrom1() {
         send("test", "testcontent");
         send("test", "testcontent2");
-        subscriptions.add(messaging.subscribe("test", new MemoryPosition(1l), Seek.earliest, callback));
+        subscribe(to("test", callback).startAt(new MemoryPosition(1l)).seek(Seek.earliest));
         verify(callback, timeout(1000)).accept(messageCaptor.capture());
         assertThat(messageContents(), contains("testcontent2"));
     }
+    
+    private void subscribe(SubscribeRequest request) {
+        this.subscriptions.add(messaging.subscribe(request));
+    }
 
     private List<String> messageContents() {
         return messageCaptor.getAllValues().stream()
diff --git a/pom.xml b/pom.xml
index bf7e8b41c0e500684f9f3d39830007c2e65b444d..8010756ab2314ad9de9326690d684e45003b3b58 100644 (file)
--- a/pom.xml
+++ b/pom.xml
             <artifactId>slf4j-api</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>2.0.0</version>
+        </dependency>
 
         <dependency>
             <groupId>org.slf4j</groupId>