AMQCLI-3 - Cleanup and add tests
authorChristopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Wed, 8 Mar 2017 18:48:25 +0000 (13:48 -0500)
committerChristopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Wed, 8 Mar 2017 18:48:25 +0000 (13:48 -0500)
activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java [moved from activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java with 85% similarity]
activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java
activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java [new file with mode: 0644]

index 0022d51..ecb7abd 100644 (file)
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
-import io.airlift.airline.Arguments;
 import io.airlift.airline.Cli;
 import io.airlift.airline.Cli.CliBuilder;
 import io.airlift.airline.Command;
index 93bd439..b17d505 100644 (file)
@@ -32,7 +32,7 @@ public class ArtemisXmlMessageRecoveryListener implements MessageRecoveryListene
     static final Logger LOG = LoggerFactory.getLogger(ArtemisXmlMessageRecoveryListener.class);
 
     private final ArtemisJournalMarshaller xmlMarshaller;
-    private final OpenWireMessageTypeConverter converter;
+    private final OpenWireCoreMessageTypeConverter converter;
 
     /**
      * @param file
@@ -41,7 +41,7 @@ public class ArtemisXmlMessageRecoveryListener implements MessageRecoveryListene
             final ArtemisJournalMarshaller xmlMarshaller) {
         super();
         this.xmlMarshaller = xmlMarshaller;
-        this.converter = new OpenWireMessageTypeConverter(store);
+        this.converter = new OpenWireCoreMessageTypeConverter(store);
     }
 
 
@@ -32,20 +32,27 @@ import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.apache.activemq.store.kahadb.KahaDBUtil;
 
-public class OpenWireMessageTypeConverter implements OpenWireExportConverter<MessageType> {
+/**
+ * Message Converter that first converts an OpenWire message to a Core Message and then uses
+ * the Core message to convert to an Artemis XML MessageType.
+ */
+public class OpenWireCoreMessageTypeConverter implements OpenWireExportConverter<MessageType> {
 
     private final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat());
     private final KahaDBStore store;
 
-
     /**
      * @param store
      */
-    public OpenWireMessageTypeConverter(KahaDBStore store) {
+    public OpenWireCoreMessageTypeConverter(KahaDBStore store) {
         super();
         this.store = store;
     }
 
+    public OpenWireCoreMessageTypeConverter() {
+        this(null);
+    }
+
     /* (non-Javadoc)
      * @see org.apache.activemq.cli.kahadb.exporter.MessageConverter#convert(org.apache.activemq.Message)
      */
@@ -68,7 +75,7 @@ public class OpenWireMessageTypeConverter implements OpenWireExportConverter<Mes
                 messageType.setProperties(propertiesType);
             }
 
-            messageType.setQueues(convertQueue(message));
+            messageType.setQueues(convertQueues(message));
             messageType.setBody(convertBody(serverMessage));
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
@@ -77,8 +84,16 @@ public class OpenWireMessageTypeConverter implements OpenWireExportConverter<Mes
         return messageType;
     }
 
-    private QueuesType convertQueue(final Message message) throws Exception {
-        if (message.getDestination().isQueue()) {
+    /**
+     * Determine the destinations associated with this message
+     * Will be one destination for a Queue message or 1 or more for a Topic
+     *
+     * @param message
+     * @return
+     * @throws Exception
+     */
+    private QueuesType convertQueues(final Message message) throws Exception {
+        if (store == null || message.getDestination().isQueue()) {
             return QueuesType.builder()
                     .withQueue(QueueType.builder()
                             .withName(message.getDestination().getPhysicalName()).build())
index a006cef..31bfc8f 100644 (file)
@@ -17,8 +17,6 @@
 package org.apache.activemq.cli.kahadb.exporter;
 
 import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
@@ -54,7 +52,6 @@ public class MultiKahaDbExporterTest extends ExporterTest {
     @Override
     public void exportStore(File kahaDbDir, File xmlFile) throws Exception {
         Exporter.exportMultiKahaDbStore(kahaDbDir, xmlFile);
-
     }
 
 }
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java
new file mode 100644 (file)
index 0000000..8b327a6
--- /dev/null
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter.artemis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.activemq.artemis.cli.commands.tools.XmlDataConstants;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.cli.schema.MessageType;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.IdGenerator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class OpenWireCoreMessageTypeConverterTest {
+
+    @Rule
+    public TemporaryFolder storeFolder = new TemporaryFolder();
+
+    //Adapter used for durable subscription conversion to know which messages haven't been acked
+    protected KahaDBPersistenceAdapter adapter;
+    protected KahaDBStore store;
+    protected ConnectionContext context = new ConnectionContext();
+    protected IdGenerator id = new IdGenerator();
+
+    @Before
+    public void before() throws Exception {
+        adapter = new KahaDBPersistenceAdapter();
+        adapter.setJournalMaxFileLength(1024 * 1024);
+        adapter.setDirectory(storeFolder.getRoot());
+        adapter.start();
+        store = adapter.getStore();
+    }
+
+    @After
+    public void after() throws Exception {
+        adapter.stop();
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setText("test");
+        message.setDestination(new ActiveMQQueue("test.queue"));
+        message.setMessageId(new MessageId(id.generateId() + ":1", 0));
+
+        OpenWireCoreMessageTypeConverter c = new OpenWireCoreMessageTypeConverter();
+        MessageType messageType = c.convert(message);
+
+        assertEquals(XmlDataConstants.TEXT_TYPE_PRETTY, messageType.getType());
+        assertEquals("test.queue", messageType.getQueues().getQueue().get(0).getName());
+    }
+
+
+    @Test
+    public void testTopicNoStore() throws Exception {
+
+        String topicName = "test.topic";
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setText("test");
+        message.setDestination(new ActiveMQTopic(topicName));
+        message.setMessageId(new MessageId(id.generateId() + ":1", 0));
+
+        OpenWireCoreMessageTypeConverter c = new OpenWireCoreMessageTypeConverter();
+        MessageType messageType = c.convert(message);
+
+        assertEquals(XmlDataConstants.TEXT_TYPE_PRETTY, messageType.getType());
+        assertEquals(topicName, messageType.getQueues().getQueue().get(0).getName());
+    }
+
+    @Test
+    public void testTopicWithStoreNoSubscriptions() throws Exception {
+
+        String topicName = "test.topic";
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setText("test");
+        message.setDestination(new ActiveMQTopic(topicName));
+        message.setMessageId(new MessageId(id.generateId() + ":1", 0));
+
+        TopicMessageStore ms = adapter.createTopicMessageStore(new ActiveMQTopic(topicName));
+        ms.addMessage(context, message);
+
+        OpenWireCoreMessageTypeConverter c = new OpenWireCoreMessageTypeConverter(store);
+        MessageType messageType = c.convert(message);
+
+        assertEquals(XmlDataConstants.TEXT_TYPE_PRETTY, messageType.getType());
+        assertTrue(messageType.getQueues().getQueue().isEmpty());
+    }
+
+    @Test
+    public void testTopicWithStoreOneSub() throws Exception {
+
+        String topicName = "test.topic";
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setText("test");
+        message.setDestination(new ActiveMQTopic(topicName));
+        message.setMessageId(new MessageId(id.generateId() + ":1", 0));
+
+        TopicMessageStore ms = adapter.createTopicMessageStore(new ActiveMQTopic(topicName));
+        ms.addSubscription(new SubscriptionInfo("clientId", "subName"), false);
+        ms.addMessage(context, message);
+
+        OpenWireCoreMessageTypeConverter c = new OpenWireCoreMessageTypeConverter(store);
+        MessageType messageType = c.convert(message);
+
+        assertEquals(XmlDataConstants.TEXT_TYPE_PRETTY, messageType.getType());
+        assertEquals(ActiveMQDestination.createQueueNameForDurableSubscription(true, "clientId", "subName"),
+                messageType.getQueues().getQueue().get(0).getName());
+    }
+}