[KARAF-6009] Add filtering support in appenders 69/head
authorJean-Baptiste Onofré <jbonofre@apache.org>
Sat, 26 Jan 2019 14:51:17 +0000 (15:51 +0100)
committerJean-Baptiste Onofré <jb@nanthrax.net>
Sun, 27 Jan 2019 14:57:13 +0000 (15:57 +0100)
58 files changed:
appender/camel/pom.xml
appender/camel/src/main/java/org/apache/karaf/decanter/appender/camel/CamelAppender.java
appender/camel/src/test/java/org/apache/karaf/decanter/appender/camel/CamelAppenderTest.java [new file with mode: 0644]
appender/cassandra/pom.xml
appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
appender/elasticsearch-jest/pom.xml
appender/elasticsearch-jest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/jest/ElasticsearchAppender.java
appender/elasticsearch-jest/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/jest/TestElasticsearchAppender.java
appender/elasticsearch-native-1.x/pom.xml
appender/elasticsearch-native-1.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
appender/elasticsearch-native-1.x/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
appender/elasticsearch-native-2.x/pom.xml
appender/elasticsearch-native-2.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
appender/elasticsearch-native-2.x/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
appender/elasticsearch-rest/pom.xml
appender/elasticsearch-rest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/rest/ElasticsearchAppender.java
appender/elasticsearch-rest/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/rest/TestElasticsearchAppender.java
appender/file/pom.xml
appender/file/src/main/java/org/apache/karaf/decanter/appender/file/FileAppender.java
appender/file/src/test/java/org/apache/karaf/decanter/appender/file/TestFileAppender.java
appender/jdbc/pom.xml
appender/jdbc/src/main/java/org/apache/karaf/decanter/appender/jdbc/JdbcAppender.java
appender/jdbc/src/test/java/org/apache/karaf/decanter/appender/jdbc/TestJdbcAppender.java
appender/jms/pom.xml
appender/jms/src/main/java/org/apache/karaf/decanter/appender/jms/JmsAppender.java
appender/jms/src/test/java/org/apache/karaf/decanter/appender/jms/JmsAppenderTest.java
appender/kafka/pom.xml
appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java
appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedKafkaBroker.java [new file with mode: 0644]
appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedZooKeeper.java [new file with mode: 0644]
appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/KafkaAppenderTest.java [new file with mode: 0644]
appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/PortFinder.java [new file with mode: 0644]
appender/log/pom.xml
appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java
appender/mongodb/pom.xml
appender/mongodb/src/main/java/org/apache/karaf/decanter/appender/mongodb/MongoDbAppender.java
appender/mqtt/pom.xml
appender/mqtt/src/main/java/org/apache/karaf/decanter/appender/mqtt/MqttAppender.java
appender/mqtt/src/test/java/org/apache/karaf/decanter/appender/mqtt/TestMqttAppender.java
appender/orientdb/pom.xml
appender/orientdb/src/main/java/org/apache/karaf/decanter/appender/orientdb/OrientDBAppender.java
appender/pom.xml
appender/redis/pom.xml
appender/redis/src/main/java/org/apache/karaf/decanter/appender/redis/RedisAppender.java
appender/rest/pom.xml
appender/rest/src/main/java/org/apache/karaf/decanter/appender/rest/RestAppender.java
appender/socket/pom.xml
appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
appender/timescaledb/pom.xml
appender/timescaledb/src/main/java/org/apache/karaf/decanter/appender/timescaledb/TimescaleDbAppender.java
appender/utils/pom.xml [new file with mode: 0644]
appender/utils/src/main/java/org/apache/karaf/decanter/appender/utils/EventFilter.java [new file with mode: 0644]
appender/utils/src/test/java/org/apache/karaf/decanter/appender/utils/EventFilterTest.java [new file with mode: 0644]
appender/websocket-servlet/pom.xml
appender/websocket-servlet/src/main/java/org/apache/karaf/decanter/appender/websocket/DecanterWebSocketAppender.java
pom.xml

index 483e9eb..991ccf4 100644 (file)
             <artifactId>camel-core</artifactId>
             <version>${camel.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>
+                            *
+                        </Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.camel,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsannotations>*</_dsannotations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index 7e670b8..ffc0e3f 100644 (file)
@@ -19,6 +19,7 @@ package org.apache.karaf.decanter.appender.camel;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.cm.ConfigurationException;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
@@ -40,8 +41,10 @@ import java.util.HashMap;
 )
 public class CamelAppender implements EventHandler {
 
+    public static final String DESTINATION_URI_KEY = "destination.uri";
+
     private CamelContext camelContext;
-    private String destinationUri;
+    private Dictionary<String, Object> config;
 
     private final static Logger LOGGER = LoggerFactory.getLogger(CamelAppender.class);
 
@@ -52,24 +55,26 @@ public class CamelAppender implements EventHandler {
     }
     
     public void open(Dictionary<String, Object> config) throws ConfigurationException {
-        LOGGER.debug("Creating CamelContext, and use the {} URI", destinationUri);
-        this.camelContext = new DefaultCamelContext();
-        this.destinationUri = (String) config.get("destination.uri");
-        if (this.destinationUri == null) {
-            throw new ConfigurationException("destination.uri", "destination.uri is not defined");
+        this.config = config;
+        if (config.get(DESTINATION_URI_KEY) == null) {
+            throw new ConfigurationException(DESTINATION_URI_KEY, DESTINATION_URI_KEY + " is not defined");
         }
+        LOGGER.debug("Creating CamelContext, and use the {} URI", config.get(DESTINATION_URI_KEY));
+        this.camelContext = new DefaultCamelContext();
     }
 
     @Override
     public void handleEvent(Event event) {
-        HashMap<String, Object> data = new HashMap<>();
-        for (String name : event.getPropertyNames()) {
-            data.put(name, event.getProperty(name));
+        if (EventFilter.match(event, config)) {
+            HashMap<String, Object> data = new HashMap<>();
+            for (String name : event.getPropertyNames()) {
+                data.put(name, event.getProperty(name));
+            }
+            LOGGER.debug("Creating producer template");
+            ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
+            LOGGER.debug("Sending event data on {}", config.get(DESTINATION_URI_KEY));
+            producerTemplate.sendBody((String) config.get(DESTINATION_URI_KEY), data);
         }
-        LOGGER.debug("Creating producer template");
-        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
-        LOGGER.debug("Sending event data on {}", destinationUri);
-        producerTemplate.sendBody(destinationUri, data);
     }
 
     @Deactivate
diff --git a/appender/camel/src/test/java/org/apache/karaf/decanter/appender/camel/CamelAppenderTest.java b/appender/camel/src/test/java/org/apache/karaf/decanter/appender/camel/CamelAppenderTest.java
new file mode 100644 (file)
index 0000000..4fca767
--- /dev/null
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.camel;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+public class CamelAppenderTest {
+
+    private static final String TOPIC = "decanter/collect/jmx";
+    private static final long TIMESTAMP = 1454428780634L;
+
+    private DefaultCamelContext camelContext;
+
+    @Before
+    public void setup() throws Exception {
+        camelContext = new DefaultCamelContext();
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct-vm:decanter")
+                        .id("decanter-test")
+                        .log("Received ${body}")
+                        .to("mock:assert");
+            }
+        });
+        camelContext.start();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        camelContext.stop();
+    }
+
+    @Test
+    public void test() throws Exception {
+        CamelAppender appender = new CamelAppender();
+        Hashtable<String, Object> config = new Hashtable<>();
+        config.put(CamelAppender.DESTINATION_URI_KEY, "direct-vm:decanter");
+        appender.open(config);
+
+        Map<String, Object> data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        data.put("testKey", "testValue");
+        Event event = new Event(TOPIC, data);
+
+        appender.handleEvent(event);
+
+        Map<String, Object> expected = new HashMap<>();
+        expected.put("event.topics", "decanter/collect/jmx");
+        expected.putAll(data);
+        MockEndpoint mock = (MockEndpoint) camelContext.getEndpoint("mock:assert");
+        mock.expectedMessageCount(1);
+        mock.message(0).body().isEqualTo(expected);
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void testWithFilter() throws Exception {
+        CamelAppender appender = new CamelAppender();
+        Hashtable<String, Object> config = new Hashtable<>();
+        config.put(CamelAppender.DESTINATION_URI_KEY, "direct-vm:decanter");
+        config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+        config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+        appender.open(config);
+
+        Map<String, Object> data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        data.put("this is refused property name", "testValue");
+        data.put("key", "value");
+        Event event = new Event(TOPIC, data);
+        appender.handleEvent(event);
+
+        data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        data.put("property", "this is a refused value");
+        data.put("key", "value");
+        event = new Event(TOPIC, data);
+        appender.handleEvent(event);
+
+        data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        data.put("accepted", "value");
+        event = new Event(TOPIC, data);
+        appender.handleEvent(event);
+
+        Map<String, Object> expected = new HashMap<>();
+        expected.put("event.topics", "decanter/collect/jmx");
+        expected.putAll(data);
+        MockEndpoint mock = (MockEndpoint) camelContext.getEndpoint("mock:assert");
+        mock.expectedMessageCount(1);
+        mock.message(0).body().isEqualTo(expected);
+        mock.assertIsSatisfied();
+    }
+
+}
index 9fe131d..da62f8e 100644 (file)
        <name>Apache Karaf :: Decanter :: Appender :: Cassandra</name>
 
     <dependencies>
-
         <dependency>
             <groupId>org.apache.karaf.decanter</groupId>
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
+               <dependency>
+                       <groupId>org.apache.karaf.decanter.appender</groupId>
+                       <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+               </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
 
     <build>
         <plugins>
+                       <plugin>
+                               <groupId>org.apache.felix</groupId>
+                               <artifactId>maven-bundle-plugin</artifactId>
+                               <inherited>true</inherited>
+                               <extensions>true</extensions>
+                               <configuration>
+                                       <obrRepository>NONE</obrRepository>
+                                       <instructions>
+                                               <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                                               <Export-Package>!*</Export-Package>
+                                               <Import-Package>
+                                                       *
+                                               </Import-Package>
+                                               <Private-Package>
+                                                       org.apache.karaf.decanter.appender.cassandra,
+                                                       org.apache.karaf.decanter.appender.utils
+                                               </Private-Package>
+                                               <_dsannotations>*</_dsannotations>
+                                       </instructions>
+                               </configuration>
+                       </plugin>
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
index 7c5d159..e6ad035 100644 (file)
@@ -20,6 +20,7 @@ import java.util.Dictionary;
 import java.util.List;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -45,10 +46,19 @@ import com.datastax.driver.core.exceptions.InvalidQueryException;
 )
 public class CassandraAppender implements EventHandler {
 
+    public static String KEYSPACE_PROPERTY = "keyspace.name";
+    public static String TABLE_PROPERTY = "table.name";
+    public static String CASSANDRA_HOST_PROPERTY = "cassandra.host";
+    public static String CASSANDRA_PORT_PROPERTY = "cassandra.port";
+
+    public static String KEYSPACE_DEFAULT = "decanter";
+    public static String TABLE_DEFAULT = "decanter";
+    public static String CASSANDRA_HOST_DEFAULT = "localhost";
+    public static String CASSANDRA_PORT_DEFAULT = "9042";
+
     private final static Logger LOGGER = LoggerFactory.getLogger(CassandraAppender.class);
 
-    private String keyspace;
-    private String tableName;
+    private Dictionary<String, Object> config;
 
     @Reference
     public Marshaller marshaller;
@@ -70,10 +80,9 @@ public class CassandraAppender implements EventHandler {
     }
 
     void activate(Dictionary<String, Object> config) {
-        this.keyspace = getValue(config, "keyspace.name", "decanter");
-        this.tableName = getValue(config, "table.name", "decanter");
-        String host = getValue(config, "cassandra.host", "localhost");
-        Integer port = Integer.parseInt(getValue(config, "cassandra.port", "9042"));
+        this.config = config;
+        String host = getValue(config, CASSANDRA_HOST_PROPERTY, CASSANDRA_HOST_DEFAULT);
+        Integer port = Integer.parseInt(getValue(config, CASSANDRA_PORT_PROPERTY, CASSANDRA_PORT_DEFAULT));
         Builder clusterBuilder = Cluster.builder().addContactPoint(host);
         if (port != null) {
             clusterBuilder.withPort(port);
@@ -93,21 +102,25 @@ public class CassandraAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        LOGGER.trace("Looking for the Cassandra datasource");
-        try (Session session = cluster.connect()){
-            useKeyspace(session, keyspace);
-            createTable(session, keyspace, tableName);
-
-            Long timestamp = (Long) event.getProperty("timestamp");
-            if (timestamp == null) {
-                timestamp = System.currentTimeMillis();
+        if (EventFilter.match(event, config)) {
+            LOGGER.trace("Looking for the Cassandra datasource");
+            try (Session session = cluster.connect()) {
+                String keyspace = getValue(config, KEYSPACE_PROPERTY, KEYSPACE_DEFAULT);
+                String tableName = getValue(config, TABLE_PROPERTY, TABLE_DEFAULT);
+                useKeyspace(session, keyspace);
+                createTable(session, keyspace, tableName);
+
+                Long timestamp = (Long) event.getProperty("timestamp");
+                if (timestamp == null) {
+                    timestamp = System.currentTimeMillis();
+                }
+                String jsonSt = marshaller.marshal(event);
+                session.execute(String.format(insertTemplate, tableName), timestamp, jsonSt);
+
+                LOGGER.trace("Data inserted into {} table", tableName);
+            } catch (Exception e) {
+                LOGGER.error("Can't store in the database", e);
             }
-            String jsonSt = marshaller.marshal(event);
-            session.execute(String.format(insertTemplate, tableName), timestamp, jsonSt);
-
-            LOGGER.trace("Data inserted into {} table", tableName);
-        } catch (Exception e) {
-            LOGGER.error("Can't store in the database", e);
         }
     }
 
index d4af92f..9f4bc37 100644 (file)
@@ -30,8 +30,10 @@ import java.util.Map;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.osgi.service.event.Event;
@@ -84,14 +86,14 @@ public class CassandraAppenderTest {
     }
 
     @Test
-    public void testHandleEvent() throws Exception {
+    public void test() throws Exception {
         Marshaller marshaller = new JsonMarshaller();
         CassandraAppender appender = new CassandraAppender();
         Dictionary<String, Object> config = new Hashtable<String, Object>();
-        config.put("cassandra.host", CASSANDRA_HOST);
-        config.put("cassandra.port", CASSANDRA_PORT);
-        config.put("keyspace.name", KEYSPACE);
-        config.put("table.name", TABLE_NAME);
+        config.put(CassandraAppender.CASSANDRA_PORT_PROPERTY, CASSANDRA_HOST);
+        config.put(CassandraAppender.CASSANDRA_PORT_PROPERTY, CASSANDRA_PORT);
+        config.put(CassandraAppender.KEYSPACE_PROPERTY, KEYSPACE);
+        config.put(CassandraAppender.TABLE_PROPERTY, TABLE_NAME);
         appender.marshaller = marshaller;
         appender.activate(config);
         
@@ -101,10 +103,11 @@ public class CassandraAppenderTest {
         
         appender.handleEvent(event);
         
-        Session session = getSesion();
+        Session session = getSession();
         
         ResultSet execute = session.execute("SELECT * FROM "+ KEYSPACE+"."+TABLE_NAME+";");
         List<Row> all = execute.all();
+        Assert.assertEquals(1, all.size());
         assertThat(all, not(nullValue()));
         
         assertThat(all.get(0).getTimestamp("timeStamp").getTime(), is(TIMESTAMP));
@@ -112,7 +115,54 @@ public class CassandraAppenderTest {
         session.close();
     }
 
-    private Session getSesion() {
+    @Test
+    public void testWithFilter() throws Exception {
+        Marshaller marshaller = new JsonMarshaller();
+        CassandraAppender appender = new CassandraAppender();
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put(CassandraAppender.CASSANDRA_PORT_PROPERTY, CASSANDRA_HOST);
+        config.put(CassandraAppender.CASSANDRA_PORT_PROPERTY, CASSANDRA_PORT);
+        config.put(CassandraAppender.KEYSPACE_PROPERTY, KEYSPACE);
+        config.put(CassandraAppender.TABLE_PROPERTY, TABLE_NAME);
+        config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+        config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+        appender.marshaller = marshaller;
+        appender.activate(config);
+
+        Map<String, Object> data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        data.put("this is a refused property", "value");
+        Event event = new Event(TOPIC, data);
+
+        appender.handleEvent(event);
+
+        data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        data.put("property", "this is a refused value");
+        event = new Event(TOPIC, data);
+
+        appender.handleEvent(event);
+
+        data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        data.put("accepted", "accepted");
+        event = new Event(TOPIC, data);
+
+        appender.handleEvent(event);
+
+        Session session = getSession();
+
+        ResultSet execute = session.execute("SELECT * FROM "+ KEYSPACE+"."+TABLE_NAME+";");
+        List<Row> all = execute.all();
+        Assert.assertEquals(1, all.size());
+        assertThat(all, not(nullValue()));
+
+        assertThat(all.get(0).getTimestamp("timeStamp").getTime(), is(TIMESTAMP));
+
+        session.close();
+    }
+
+    private Session getSession() {
         Builder clusterBuilder = Cluster.builder().addContactPoint(CASSANDRA_HOST);
         clusterBuilder.withPort(Integer.valueOf(CASSANDRA_PORT));
 
index 5f05dde..af6f48f 100644 (file)
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>io.searchbox</groupId>
             <artifactId>jest</artifactId>
             <version>2.0.2</version>
             <plugin>
                 <groupId>org.apache.felix</groupId>
                 <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
                 <configuration>
+                    <obrRepository>NONE</obrRepository>
                     <instructions>
-                        <Import-Package>!javax.servlet*, org.apache.log;resolution:=optional,*</Import-Package>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>
+                            !javax.servlet*,
+                            org.apache.log;resolution:=optional,
+                            *
+                        </Import-Package>
                         <Private-Package>
                             org.apache.karaf.decanter.appender.elasticsearch.jest,
+                            org.apache.karaf.decanter.appender.utils,
                             org.apache.http*,
                             org.apache.commons*,
                             io.searchbox*,
                             com.google*
                         </Private-Package>
+                        <_dsannotations>*</_dsannotations>
                     </instructions>
                 </configuration>
             </plugin>
index 9b8c5f2..cd0266e 100644 (file)
@@ -41,6 +41,7 @@ import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.TrustStrategy;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -70,10 +71,23 @@ public class ElasticsearchAppender implements EventHandler {
     @Reference
     public Marshaller marshaller;
 
+    public static String ADDRESS_PROPERTY = "address";
+    public static String USERNAME_PROPERTY = "username";
+    public static String PASSWORD_PROPERTY = "password";
+    public static String INDEX_PREFIX_PROPERTY = "index.prefix";
+    public static String INDEX_TYPE_PROPERTY = "index.type";
+    public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+
+    public static String ADDRESS_DEFAULT = "http://localhost:9200";
+    public static String USERNAME_DEFAULT = null;
+    public static String PASSWORD_DEFAULT = null;
+    public static String INDEX_PREFIX_DEFAULT = "karaf";
+    public static String INDEX_TYPE_DEFAULT = "decanter";
+    public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+
+    private Dictionary<String, Object> config;
+
     private JestClient client;
-    private String indexPrefix;
-    private boolean indexTimestamped;
-    private String indexType;
 
     private final SimpleDateFormat tsFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss,SSS'Z'");
     private final SimpleDateFormat indexDateFormat = new SimpleDateFormat("yyyy.MM.dd");
@@ -87,10 +101,11 @@ public class ElasticsearchAppender implements EventHandler {
     }
     
     public void open(Dictionary<String, Object> config) {
-        String addressesString = getValue(config, "address", "http://localhost:9200");
+        this.config = config;
+        String addressesString = getValue(config, ADDRESS_PROPERTY, ADDRESS_DEFAULT);
         Set<String> addresses = new HashSet<String>(Arrays.asList(addressesString.split(";")));
-        String username = getValue(config, "username", null);
-        String password = getValue(config, "password", null);
+        String username = getValue(config, USERNAME_PROPERTY, USERNAME_DEFAULT);
+        String password = getValue(config, PASSWORD_PROPERTY, PASSWORD_DEFAULT);
         Builder builder = new HttpClientConfig.Builder(addresses).readTimeout(10000)
             .multiThreaded(true);
         
@@ -129,10 +144,6 @@ public class ElasticsearchAppender implements EventHandler {
         TimeZone tz = TimeZone.getTimeZone( "UTC" );
         tsFormat.setTimeZone(tz);
         indexDateFormat.setTimeZone(tz);
-
-        indexPrefix = getValue(config, "index.prefix", "karaf");
-        indexTimestamped = Boolean.parseBoolean(getValue(config, "index.event.timestamped", "true"));
-        indexType = getValue(config, "index.type", "decanter");
     }
     
     private String getValue(Dictionary<String, Object> config, String key, String defaultValue) {
@@ -147,18 +158,20 @@ public class ElasticsearchAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        try {
-            send(event);
-        } catch (Exception e) {
-            LOGGER.warn("Can't append into Elasticsearch", e);
+        if (EventFilter.match(event, config)) {
+            try {
+                send(event);
+            } catch (Exception e) {
+                LOGGER.warn("Can't append into Elasticsearch", e);
+            }
         }
     }
 
     private void send(Event event) throws Exception {
-        String indexName = getIndexName(indexPrefix, getDate(event));
+        String indexName = getIndexName(getValue(config, INDEX_PREFIX_PROPERTY, INDEX_PREFIX_DEFAULT), getDate(event));
         String jsonSt = marshaller.marshal(event);
 
-        JestResult result = client.execute(new Index.Builder(jsonSt).index(indexName).type(indexType).build());
+        JestResult result = client.execute(new Index.Builder(jsonSt).index(indexName).type(getValue(config, INDEX_TYPE_PROPERTY, INDEX_TYPE_DEFAULT)).build());
 
         if (!result.isSucceeded()) {
             throw new IllegalStateException(result.getErrorMessage());
@@ -172,6 +185,7 @@ public class ElasticsearchAppender implements EventHandler {
     }
 
     private String getIndexName(String prefix, Date date) {
+        boolean indexTimestamped = Boolean.parseBoolean(getValue(config, INDEX_EVENT_TIMESTAMPED_PROPERTY, INDEX_EVENT_TIMESTAMPED_DEFAULT));
         if (indexTimestamped) {
             return prefix + "-" + indexDateFormat.format(date);
         } else {
index 41b7a22..4636c93 100644 (file)
@@ -18,11 +18,14 @@ package org.apache.karaf.decanter.appender.elasticsearch.jest;
 
 import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
 
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.node.Node;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.osgi.service.event.Event;
 
@@ -30,6 +33,7 @@ import static org.elasticsearch.node.NodeBuilder.*;
 
 import java.util.Dictionary;
 import java.util.Hashtable;
+import java.util.Map;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
 import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
@@ -41,9 +45,10 @@ public class TestElasticsearchAppender {
     private static final int PORT = 9301;
     private static final int HTTP_PORT = 9201;
 
-    @Test
-    public void testAppender() throws Exception {
+    private Node node;
 
+    @Before
+    public void setup() throws Exception {
         Settings settings = settingsBuilder()
                 .put("cluster.name", CLUSTER_NAME)
                 .put("http.enabled", "true")
@@ -57,14 +62,22 @@ public class TestElasticsearchAppender {
                 .put("path.plugins", "target/plugins")
                 .build();
 
-        Node node = nodeBuilder().settings(settings).node();
+        node = nodeBuilder().settings(settings).node();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        node.close();
+    }
 
+    @Test
+    public void test() throws Exception {
         Marshaller marshaller = new JsonMarshaller();
         ElasticsearchAppender appender = new ElasticsearchAppender();
         appender.marshaller = marshaller;
         Dictionary<String, Object> config = new Hashtable<>();
-        config.put("address", "http://" + HOST + ":" + HTTP_PORT);
-        appender.open(config );
+        config.put(ElasticsearchAppender.ADDRESS_PROPERTY, "http://" + HOST + ":" + HTTP_PORT);
+        appender.open(config);
         appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
         appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
         appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
@@ -76,7 +89,37 @@ public class TestElasticsearchAppender {
         }
 
         Assert.assertEquals(3L, node.client().count(Requests.countRequest()).actionGet().getCount());
-        node.close();
+    }
+
+    @Test
+    public void testWithFilter() throws Exception {
+        Marshaller marshaller = new JsonMarshaller();
+        ElasticsearchAppender appender = new ElasticsearchAppender();
+        appender.marshaller = marshaller;
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put(ElasticsearchAppender.ADDRESS_PROPERTY, "http://" + HOST + ":" + HTTP_PORT);
+        config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+        config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+        appender.open(config);
+
+        Map<String, String> data = MapBuilder.<String, String>newMapBuilder().put("refused_property", "test").map();
+        Event event = new Event("testTopic", data);
+        appender.handleEvent(event);
+
+        data = MapBuilder.<String, String>newMapBuilder().put("property", "refused_value").map();
+        event = new Event("testTopic", data);
+        appender.handleEvent(event);
+
+        data = MapBuilder.<String, String>newMapBuilder().put("foo", "bar").map();
+        event = new Event("testTopic", data);
+        appender.handleEvent(event);
+
+        int maxTryCount = 10;
+        for(int i=0; node.client().count(Requests.countRequest()).actionGet().getCount() == 0 && i< maxTryCount; i++) {
+            Thread.sleep(500);
+        }
+
+        Assert.assertEquals(1L, node.client().count(Requests.countRequest()).actionGet().getCount());
     }
 
 }
index d6d6208..1177756 100644 (file)
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.json</groupId>
             <artifactId>javax.json-api</artifactId>
         </dependency>
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.elasticsearch,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsannotations>*</_dsannotations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index d47e34c..87fc459 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Dictionary;
 import java.util.TimeZone;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.transport.TransportClient;
@@ -52,6 +53,18 @@ import org.slf4j.LoggerFactory;
 )
 public class ElasticsearchAppender implements EventHandler {
 
+    public static String HOST_PROPERTY = "host";
+    public static String PORT_PROPERTY = "port";
+    public static String CLUSTER_NAME_PROPERTY = "clusterName";
+    public static String INDEX_PREFIX_PROPERTY = "index.prefix";
+    public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+
+    public static String HOST_DEFAULT = "localhost";
+    public static String PORT_DEFAULT = "9300";
+    public static String CLUSTER_NAME_DEFAULT = "elasticsearch";
+    public static String INDEX_PREFIX_DEFAULT = "karaf";
+    public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+
     final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAppender.class);
 
     @Reference
@@ -66,8 +79,7 @@ public class ElasticsearchAppender implements EventHandler {
 
     private WorkFinishedListener listener;
 
-    private String indexPrefix;
-    private boolean indexTimestamped;
+    private Dictionary<String, Object> config;
 
     @SuppressWarnings("unchecked")
     @Activate
@@ -76,12 +88,11 @@ public class ElasticsearchAppender implements EventHandler {
     }
     
     public void open(Dictionary<String, Object> config) {
+        this.config = config;
         try {
-            String host = getValue(config, "host", "localhost");
-            int port = Integer.parseInt(getValue(config, "port", "9300"));
-            String cluster = getValue(config, "clusterName", "elasticsearch");
-            indexPrefix = getValue(config, "index.prefix", "karaf");
-            indexTimestamped = Boolean.parseBoolean(getValue(config, "index.event.timestamped", "true"));
+            String host = getValue(config, HOST_PROPERTY, HOST_DEFAULT);
+            int port = Integer.parseInt(getValue(config, PORT_PROPERTY, PORT_DEFAULT));
+            String cluster = getValue(config, CLUSTER_NAME_PROPERTY, CLUSTER_NAME_DEFAULT);
             TimeZone tz = TimeZone.getTimeZone( "UTC" );
             tsFormat.setTimeZone(tz);
             indexDateFormat.setTimeZone(tz);
@@ -130,15 +141,17 @@ public class ElasticsearchAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        try {
-            send(event);
-        } catch (Exception e) {
-            LOGGER.warn("Can't append into Elasticsearch", e);
+        if (EventFilter.match(event, config)) {
+            try {
+                send(event);
+            } catch (Exception e) {
+                LOGGER.warn("Can't append into Elasticsearch", e);
+            }
         }
     }
 
     private void send(Event event) {
-        String indexName = getIndexName(indexPrefix, getDate(event));
+        String indexName = getIndexName(getValue(config, INDEX_PREFIX_PROPERTY, INDEX_PREFIX_DEFAULT), getDate(event));
         String jsonSt = marshaller.marshal(event);
         LOGGER.debug("Sending event to elastic search with content: {}", jsonSt);
         bulkProcessor.add(new IndexRequest(indexName, getType(event)).source(jsonSt));
@@ -156,6 +169,7 @@ public class ElasticsearchAppender implements EventHandler {
     }
 
     private String getIndexName(String prefix, Date date) {
+        boolean indexTimestamped = Boolean.parseBoolean(getValue(config, INDEX_EVENT_TIMESTAMPED_PROPERTY, INDEX_EVENT_TIMESTAMPED_DEFAULT));
         if (indexTimestamped) {
             return prefix + "-" + indexDateFormat.format(date);
         } else {
index f966e9d..255d2e5 100644 (file)
@@ -18,11 +18,14 @@ package org.apache.karaf.decanter.appender.elasticsearch;
 
 import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
 
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.node.Node;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.osgi.service.event.Event;
 
@@ -36,53 +39,91 @@ import org.apache.karaf.decanter.api.marshaller.Marshaller;
 import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
 
 public class TestElasticsearchAppender {
+
     private static final String HOST = "127.0.0.1";
     private static final String CLUSTER_NAME = "elasticsearch-test";
     private static final int PORT = 9300;
     private static final int MAX_TRIES = 10;
 
-   @Test
-   public void testAppender() throws Exception {
-       
-       Settings settings = settingsBuilder()
-               .put("cluster.name", CLUSTER_NAME)
-               .put("http.enabled", "true")
-               .put("node.data", true)
-               .put("path.data", "target/data")
-               .put("network.host", HOST)
-               .put("port", PORT)
-               .put("index.store.type", "memory")
-               .put("index.store.fs.memory.enabled", "true")
-               .put("path.plugins", "target/plugins")
-               .build();
-       
-       Node node = nodeBuilder().settings(settings).node();
-       Marshaller marshaller = new JsonMarshaller();
-       ElasticsearchAppender appender = new ElasticsearchAppender();
-       appender.marshaller = marshaller;
-       Dictionary<String, Object> config = new Hashtable<>();
-       config.put("clusterName", CLUSTER_NAME);
-       config.put("port", "" + PORT);
-       appender.open(config);
-       appender.handleEvent(new Event("testTopic", dummyMap()));
-       appender.handleEvent(new Event("testTopic", dummyMap()));
-       appender.handleEvent(new Event("testTopic", dummyMap()));
-       appender.close();
-
-       long currentCount = 0;
-       int c = 0; 
-       while (c < MAX_TRIES && currentCount != 3) {
-           currentCount = node.client().count(Requests.countRequest()).actionGet().getCount();
-           Thread.sleep(500);
-           c++;
-       }
-       
-       Assert.assertEquals(3L, currentCount);
-       node.close();
-   }
-
-   private Map<String, String> dummyMap() {
-       return MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map();
-   }
+    private Node node;
+
+    @Before
+    public void setup() throws Exception {
+        Settings settings = settingsBuilder()
+                .put("cluster.name", CLUSTER_NAME)
+                .put("http.enabled", "true")
+                .put("node.data", true)
+                .put("path.data", "target/data")
+                .put("network.host", HOST)
+                .put("port", PORT)
+                .put("index.store.type", "memory")
+                .put("index.store.fs.memory.enabled", "true")
+                .put("path.plugins", "target/plugins")
+                .build();
+
+        node = nodeBuilder().settings(settings).node();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        node.close();
+    }
+
+    @Test
+    public void test() throws Exception {
+        Marshaller marshaller = new JsonMarshaller();
+        ElasticsearchAppender appender = new ElasticsearchAppender();
+        appender.marshaller = marshaller;
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put(ElasticsearchAppender.CLUSTER_NAME_PROPERTY, CLUSTER_NAME);
+        config.put(ElasticsearchAppender.PORT_PROPERTY, "" + PORT);
+        appender.open(config);
+        appender.handleEvent(new Event("testTopic", dummyMap()));
+        appender.handleEvent(new Event("testTopic", dummyMap()));
+        appender.handleEvent(new Event("testTopic", dummyMap()));
+        appender.close();
+
+        long currentCount = 0;
+        int c = 0;
+        while (c < MAX_TRIES && currentCount != 3) {
+            currentCount = node.client().count(Requests.countRequest()).actionGet().getCount();
+            Thread.sleep(500);
+            c++;
+        }
+
+        Assert.assertEquals(3L, currentCount);
+    }
+
+    @Test
+    public void testWithFilter() throws Exception {
+        Marshaller marshaller = new JsonMarshaller();
+        ElasticsearchAppender appender = new ElasticsearchAppender();
+        appender.marshaller = marshaller;
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put(ElasticsearchAppender.CLUSTER_NAME_PROPERTY, CLUSTER_NAME);
+        config.put(ElasticsearchAppender.PORT_PROPERTY, "" + PORT);
+        config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+        config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+        appender.open(config);
+
+        Map<String, String> data = MapBuilder.<String, String>newMapBuilder().put("refused_property", "test").map();
+        Event event = new Event("testTopic", data);
+        appender.handleEvent(event);
+
+        data = MapBuilder.<String, String>newMapBuilder().put("property", "refused_value").map();
+        event = new Event("testTopic", data);
+        appender.handleEvent(event);
+
+        int maxTryCount = 10;
+        for(int i=0; node.client().count(Requests.countRequest()).actionGet().getCount() == 0 && i< maxTryCount; i++) {
+            Thread.sleep(500);
+        }
+
+        Assert.assertEquals(0L, node.client().count(Requests.countRequest()).actionGet().getCount());
+    }
+
+    private Map<String, String> dummyMap() {
+        return MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map();
+    }
 
 }
index a08a018..ee0d9c2 100644 (file)
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.json</groupId>
             <artifactId>javax.json-api</artifactId>
         </dependency>
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.elasticsearch,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsannotations>*</_dsannotations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index ff38d9e..9149a92 100644 (file)
@@ -23,6 +23,7 @@ import java.util.Dictionary;
 import java.util.TimeZone;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
@@ -51,6 +52,18 @@ import org.slf4j.LoggerFactory;
 )
 public class ElasticsearchAppender implements EventHandler {
 
+    public static String HOST_PROPERTY = "host";
+    public static String PORT_PROPERTY = "port";
+    public static String CLUSTER_NAME_PROPERTY = "clusterName";
+    public static String INDEX_PREFIX_PROPERTY = "index.prefix";
+    public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+
+    public static String HOST_DEFAULT = "localhost";
+    public static String PORT_DEFAULT = "9300";
+    public static String CLUSTER_NAME_DEFAULT = "elasticsearch";
+    public static String INDEX_PREFIX_DEFAULT = "karaf";
+    public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+
     final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAppender.class);
 
     @Reference
@@ -64,8 +77,7 @@ public class ElasticsearchAppender implements EventHandler {
     Client client;
     private WorkFinishedListener listener;
 
-    private String indexPrefix;
-    private boolean indexTimestamped;
+    private Dictionary<String, Object> config;
 
     @SuppressWarnings("unchecked")
     @Activate
@@ -74,12 +86,11 @@ public class ElasticsearchAppender implements EventHandler {
     }
     
     public void open(Dictionary<String, Object> config) {
+        this.config = config;
         try {
-            String host = getValue(config, "host", "localhost");
-            int port = Integer.parseInt(getValue(config, "port", "9300"));
-            String cluster = getValue(config, "clusterName", "elasticsearch");
-            indexPrefix = getValue(config, "index.prefix", "karaf");
-            indexTimestamped = Boolean.parseBoolean(getValue(config, "index.event.timestamped", "true"));
+            String host = getValue(config, HOST_PROPERTY, HOST_DEFAULT);
+            int port = Integer.parseInt(getValue(config, PORT_PROPERTY, PORT_DEFAULT));
+            String cluster = getValue(config, CLUSTER_NAME_PROPERTY, CLUSTER_NAME_DEFAULT);
             TimeZone tz = TimeZone.getTimeZone( "UTC" );
             tsFormat.setTimeZone(tz);
             indexDateFormat.setTimeZone(tz);
@@ -123,15 +134,17 @@ public class ElasticsearchAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        try {
-            send(event);
-        } catch (Exception e) {
-            LOGGER.warn("Can't append into Elasticsearch", e);
+        if (EventFilter.match(event, config)) {
+            try {
+                send(event);
+            } catch (Exception e) {
+                LOGGER.warn("Can't append into Elasticsearch", e);
+            }
         }
     }
 
     private void send(Event event) {
-        String indexName = getIndexName(indexPrefix, getDate(event));
+        String indexName = getIndexName(getValue(config, INDEX_PREFIX_PROPERTY, INDEX_PREFIX_DEFAULT), getDate(event));
         String jsonSt = marshaller.marshal(event);
         LOGGER.debug("Sending event to elastic search with content: {}", jsonSt);
         bulkProcessor.add(new IndexRequest(indexName, getType(event)).source(jsonSt));
@@ -149,6 +162,7 @@ public class ElasticsearchAppender implements EventHandler {
     }
 
     private String getIndexName(String prefix, Date date) {
+        boolean indexTimestamped = Boolean.parseBoolean(getValue(config, INDEX_EVENT_TIMESTAMPED_PROPERTY, INDEX_EVENT_TIMESTAMPED_DEFAULT));
         if (indexTimestamped) {
             return prefix + "-" + indexDateFormat.format(date);
         } else {
index 9d8851e..b01d760 100644 (file)
@@ -24,49 +24,62 @@ import java.util.Hashtable;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
 import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.node.Node;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.osgi.service.event.Event;
 
 public class TestElasticsearchAppender {
+
     private static final String CLUSTER_NAME = "elasticsearch-test";
     private static final int PORT = 9300;
+    private static final int MAX_TRIES = 10;
+
+    private Node node;
+
+    @Before
+    public void setup() throws Exception {
+        Settings settings = Settings.settingsBuilder()
+                .put("cluster.name", "elasticsearch")
+                .put("http.enabled", "true")
+                .put("node.data", true)
+                .put("path.home", "target")
+                .put("path.data", "target/data")
+                .put("network.host", "127.0.0.1")
+                .put("index.store.type", "memory")
+                .put("index.store.fs.memory.enabled", "true")
+                .put("path.plugins", "target/plugins")
+                .build();
+
+        node = nodeBuilder().settings(settings).node();
+    }
 
-   @Test
-   public void testAppender() throws Exception {
-       
-       Settings settings = Settings.settingsBuilder()
-               .put("cluster.name", "elasticsearch")
-               .put("http.enabled", "true")
-               .put("node.data", true)
-               .put("path.home", "target")
-               .put("path.data", "target/data")
-               .put("network.host", "127.0.0.1")
-               .put("index.store.type", "memory")
-               .put("index.store.fs.memory.enabled", "true")
-               .put("path.plugins", "target/plugins")
-               .build();
-       
-       Node node = nodeBuilder().settings(settings).node();
-       
-       Marshaller marshaller = new JsonMarshaller();
-       ElasticsearchAppender appender = new ElasticsearchAppender();
-       appender.marshaller = marshaller;
-       Dictionary<String, Object> config = new Hashtable<>();
-       config.put("clusterName", CLUSTER_NAME);
-       config.put("port", "" + PORT);
-       appender.open(config);
-       appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
-       appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
-       appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
-       appender.close();
+    @After
+    public void teardown() throws Exception {
+        node.close();
+    }
 
-       SearchResponse response = node.client().prepareSearch().execute().actionGet();
-       System.out.println(response.toString());
+    @Test
+    public void test() throws Exception {
+        Marshaller marshaller = new JsonMarshaller();
+        ElasticsearchAppender appender = new ElasticsearchAppender();
+        appender.marshaller = marshaller;
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put(ElasticsearchAppender.CLUSTER_NAME_PROPERTY, CLUSTER_NAME);
+        config.put(ElasticsearchAppender.PORT_PROPERTY, "" + PORT);
+        appender.open(config);
+        appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
+        appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
+        appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
+        appender.close();
 
-       node.close();
-   }
+        SearchResponse response = node.client().prepareSearch().execute().actionGet();
+        System.out.println(response.toString());
+    }
 
 }
index 373e7e9..d1aaf93 100644 (file)
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
             <version>${elasticsearch6.version}</version>
             <plugin>
                 <groupId>org.apache.felix</groupId>
                 <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
                 <configuration>
+                    <obrRepository>NONE</obrRepository>
                     <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
                         <Import-Package>
                             org.apache.log;resolution:=optional,
                             *
                         </Import-Package>
                         <Private-Package>
                             org.apache.karaf.decanter.appender.elasticsearch.rest,
+                            org.apache.karaf.decanter.appender.utils,
                             org.apache.http*,
                             org.apache.commons*,
                             org.elasticsearch*
                         </Private-Package>
+                        <_dsannotations>*</_dsannotations>
                     </instructions>
                 </configuration>
             </plugin>
index 901ed37..1d3c359 100644 (file)
@@ -28,6 +28,7 @@ import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 import org.apache.http.nio.entity.NStringEntity;
 import org.apache.http.util.EntityUtils;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
@@ -51,13 +52,26 @@ import java.util.*;
 )
 public class ElasticsearchAppender implements EventHandler {
 
+    public static String ADDRESSES_PROPERTY = "addresses";
+    public static String USERNAME_PROPERTY = "username";
+    public static String PASSWORD_PROPERTY = "password";
+    public static String INDEX_PREFIX_PROPERTY = "index.prefix";
+    public static String INDEX_TYPE_PROPERTY = "index.type";
+    public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+
+    public static String ADDRESSES_DEFAULT = "http://localhost:9200";
+    public static String USERNAME_DEFAULT = null;
+    public static String PASSWORD_DEFAULT = null;
+    public static String INDEX_PREFIX_DEFAULT = "karaf";
+    public static String INDEX_TYPE_DEFAULT = "decanter";
+    public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+
     @Reference
     public Marshaller marshaller;
 
     private RestClient client;
-    private String indexPrefix;
-    private boolean indexTimestamped;
-    private String indexType;
+
+    private Dictionary<String, Object> config;
 
     private final SimpleDateFormat tsFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss,SSS'Z'");
     private final SimpleDateFormat indexDateFormat = new SimpleDateFormat("yyyy.MM.dd");
@@ -71,9 +85,11 @@ public class ElasticsearchAppender implements EventHandler {
     }
     
     public void open(Dictionary<String, Object> config) {
-        String addressesString = getValue(config, "addresses", "http://localhost:9200");
-        String username = getValue(config, "username", null);
-        String password = getValue(config, "password", null);
+        this.config = config;
+
+        String addressesString = getValue(config, ADDRESSES_PROPERTY, ADDRESSES_DEFAULT);
+        String username = getValue(config, USERNAME_PROPERTY, USERNAME_DEFAULT);
+        String password = getValue(config, PASSWORD_PROPERTY, PASSWORD_DEFAULT);
 
         Set<String> addresses = new HashSet<String>(Arrays.asList(addressesString.split(",")));
 
@@ -116,10 +132,6 @@ public class ElasticsearchAppender implements EventHandler {
         TimeZone tz = TimeZone.getTimeZone( "UTC" );
         tsFormat.setTimeZone(tz);
         indexDateFormat.setTimeZone(tz);
-
-        indexPrefix = getValue(config, "index.prefix", "karaf");
-        indexTimestamped = Boolean.parseBoolean(getValue(config, "index.event.timestamped", "true"));
-        indexType = getValue(config, "index.type", "decanter");
     }
     
     private String getValue(Dictionary<String, Object> config, String key, String defaultValue) {
@@ -138,19 +150,21 @@ public class ElasticsearchAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        try {
-            send(event);
-        } catch (Exception e) {
-            LOGGER.warn("Can't append into Elasticsearch", e);
+        if (EventFilter.match(event, config)) {
+            try {
+                send(event);
+            } catch (Exception e) {
+                LOGGER.warn("Can't append into Elasticsearch", e);
+            }
         }
     }
 
     private void send(Event event) throws Exception {
-        String indexName = getIndexName(indexPrefix, getDate(event));
+        String indexName = getIndexName(getValue(config, INDEX_PREFIX_PROPERTY, INDEX_PREFIX_DEFAULT), getDate(event));
         String jsonSt = marshaller.marshal(event);
 
         // elasticsearch 6.x only allows one type per index mapping, the _type is part of the document
-        String endpoint = String.format("/%s/%s", indexName, indexType);
+        String endpoint = String.format("/%s/%s", indexName, getValue(config, INDEX_TYPE_PROPERTY, INDEX_TYPE_DEFAULT));
         HttpEntity request = new NStringEntity(jsonSt, ContentType.APPLICATION_JSON);
 
         client.performRequest("POST", endpoint, Collections.singletonMap("refresh", "true"), request);
@@ -163,6 +177,7 @@ public class ElasticsearchAppender implements EventHandler {
     }
 
     private String getIndexName(String prefix, Date date) {
+        boolean indexTimestamped = Boolean.parseBoolean(getValue(config, INDEX_EVENT_TIMESTAMPED_PROPERTY, INDEX_EVENT_TIMESTAMPED_DEFAULT));
         if (indexTimestamped) {
             return prefix + "-" + indexDateFormat.format(date);
         } else {
index 09abc7e..2199771 100644 (file)
@@ -22,6 +22,7 @@ import org.apache.http.nio.entity.NStringEntity;
 import org.apache.http.protocol.HTTP;
 import org.apache.http.util.EntityUtils;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
@@ -43,7 +44,9 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.transport.Netty4Plugin;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.osgi.service.event.Event;
 
@@ -56,9 +59,10 @@ public class TestElasticsearchAppender {
     private static final String HOST = "127.0.0.1";
     private static final int HTTP_PORT = 9201;
 
-    @Test
-    public void testAppender() throws Exception {
+    private Node node;
 
+    @Before
+    public void setup() throws Exception {
         Settings settings = Settings.builder()
                 .put("cluster.name", CLUSTER_NAME)
                 .put("node.name", "test")
@@ -71,19 +75,31 @@ public class TestElasticsearchAppender {
                 .build();
 
         Collection plugins = Arrays.asList(Netty4Plugin.class);
-        Node node = new PluginConfigurableNode(settings, plugins);
+        node = new PluginConfigurableNode(settings, plugins);
 
         node.start();
+    }
 
+    @After
+    public void teardown() throws Exception {
+        node.close();
+    }
+
+    @Test
+    public void test() throws Exception {
         Marshaller marshaller = new JsonMarshaller();
         ElasticsearchAppender appender = new ElasticsearchAppender();
         appender.marshaller = marshaller;
         Dictionary<String, Object> config = new Hashtable<>();
-        config.put("addresses", "http://" + HOST + ":" + HTTP_PORT);
-        appender.open(config );
+        config.put(ElasticsearchAppender.ADDRESSES_PROPERTY, "http://" + HOST + ":" + HTTP_PORT);
+        config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+        config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+        appender.open(config);
         appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
         appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
         appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
+        appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("refused", "b").put("c", "d").map()));
+        appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "refused").put("c", "d").map()));
         appender.close();
 
         HttpHost host = new HttpHost(HOST, HTTP_PORT, "http");
index 964d6f1..18899e0 100644 (file)
             <groupId>org.apache.karaf.decanter</groupId>
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.karaf.decanter.marshaller</groupId>
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.file,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsannotations>*</_dsannotations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index 713d9fa..4a2778e 100644 (file)
@@ -17,6 +17,7 @@
 package org.apache.karaf.decanter.appender.file;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -38,22 +39,28 @@ import java.util.Dictionary;
 )
 public class FileAppender implements EventHandler {
 
+    public static String FILENAME_PROPERTY = "filename";
+    public static String APPEND_PROPERTY = "append";
+
     @Reference
     public Marshaller marshaller;
 
     private BufferedWriter writer;
 
-    private boolean append;
+    private Dictionary<String, Object> config;
 
     @Activate
     public void activate(ComponentContext componentContext) throws Exception {
         Dictionary<String, Object> config = componentContext.getProperties();
-        String filename = (config.get("filename") != null) ? (String) config.get("filename") : System.getProperty("karaf.data") + File.separator + "decanter";
-        append = (config.get("append") != null) ? Boolean.parseBoolean((String) config.get("append")) : true;
-        open(filename);
+        open(config);
     }
 
-    public void open(String filename) throws Exception {
+    public void open(Dictionary<String, Object> config) throws Exception {
+        this.config = config;
+
+        String filename = (config.get(FILENAME_PROPERTY) != null) ? (String) config.get(FILENAME_PROPERTY) : System.getProperty("karaf.data") + File.separator + "decanter";
+        boolean append = (config.get(APPEND_PROPERTY) != null) ? Boolean.parseBoolean((String) config.get(APPEND_PROPERTY)) : true;
+
         File file = new File(filename);
         file.getParentFile().mkdirs();
         file.createNewFile();
@@ -62,13 +69,15 @@ public class FileAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        try {
-            String marshalled = marshaller.marshal(event);
-            writer.write(marshalled);
-            writer.newLine();
-            writer.flush();
-        } catch (Exception e) {
-            // nothing to do
+        if (EventFilter.match(event, config)) {
+            try {
+                String marshalled = marshaller.marshal(event);
+                writer.write(marshalled);
+                writer.newLine();
+                writer.flush();
+            } catch (Exception e) {
+                // nothing to do
+            }
         }
     }
 
index 255327b..bd10f43 100644 (file)
@@ -16,6 +16,7 @@
  */
 package org.apache.karaf.decanter.appender.file;
 
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.apache.karaf.decanter.marshaller.csv.CsvMarshaller;
 import org.junit.Assert;
 import org.junit.Test;
@@ -24,31 +25,73 @@ import org.osgi.service.event.Event;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.util.Dictionary;
 import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.Map;
 
 public class TestFileAppender {
 
     @Test
-    public void testAppender() throws Exception {
+    public void test() throws Exception {
         FileAppender fileAppender = new FileAppender();
         fileAppender.marshaller = new CsvMarshaller();
-        fileAppender.open("target/test-classes/decanter");
-        Map<String, String> map = new HashMap<>();
-        map.put("a", "b");
-        map.put("c", "d");
-        fileAppender.handleEvent(new Event("testTopic", map));
-        fileAppender.handleEvent(new Event("testTopic", map));
-        fileAppender.handleEvent(new Event("testTopic", map));
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put(FileAppender.FILENAME_PROPERTY, "target/test-classes/decanter");
+        fileAppender.open(config);
+
+        Map<String, String> data = new HashMap<>();
+        data.put("a", "b");
+        data.put("c", "d");
+        fileAppender.handleEvent(new Event("testTopic", data));
+        fileAppender.handleEvent(new Event("testTopic", data));
+        fileAppender.handleEvent(new Event("testTopic", data));
         fileAppender.deactivate();
 
         File file = new File("target/test-classes/decanter");
+        int lineCount = 0;
         try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
             String line;
             while ((line = reader.readLine()) != null) {
+                lineCount++;
                 Assert.assertEquals("a=b,c=d,event.topics=testTopic", line);
             }
         }
+        Assert.assertEquals(3, lineCount);
+    }
+
+    @Test
+    public void testWithFilter() throws Exception {
+        FileAppender fileAppender = new FileAppender();
+        fileAppender.marshaller = new CsvMarshaller();
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put(FileAppender.FILENAME_PROPERTY, "target/test-classes/filtered");
+        config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+        config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+        fileAppender.open(config);
+
+        Map<String, String> data = new HashMap<>();
+        data.put("refused_property", "test");
+        fileAppender.handleEvent(new Event("testTopic", data));
+
+        data = new HashMap<>();
+        data.put("property", "refused_value");
+        fileAppender.handleEvent(new Event("testTopic", data));
+
+        data = new HashMap<>();
+        data.put("a", "b");
+        fileAppender.handleEvent(new Event("testTopic", data));
+
+        File file = new File("target/test-classes/filtered");
+        int lineCount = 0;
+        try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                lineCount++;
+                Assert.assertEquals("a=b,event.topics=testTopic", line);
+            }
+        }
+        Assert.assertEquals(1, lineCount);
     }
 
 }
index c31b804..1638706 100644 (file)
             <groupId>org.apache.karaf.decanter</groupId>
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
         
         <dependency>
             <groupId>org.apache.derby</groupId>
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.jdbc,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsannotations>*</_dsannotations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index 3674f37..63b627f 100644 (file)
@@ -25,6 +25,7 @@ import java.util.Dictionary;
 import javax.sql.DataSource;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -42,6 +43,12 @@ import org.slf4j.LoggerFactory;
 )
 public class JdbcAppender implements EventHandler {
 
+    public static String TABLE_NAME_PROPERTY = "table.name";
+    public static String DIALECT_PROPERTY = "dialect";
+
+    public static String TABLE_NAME_DEFAULT = "decanter";
+    public static String DIALECT_DEFAULT = "generic";
+
     @Reference
     public Marshaller marshaller;
 
@@ -60,8 +67,7 @@ public class JdbcAppender implements EventHandler {
     private final static String insertQueryTemplate =
             "INSERT INTO TABLENAME(timestamp, content) VALUES(?,?)";
 
-    String tableName;
-    String dialect;
+    private Dictionary<String, Object> config;
     
     @SuppressWarnings("unchecked")
     @Activate
@@ -70,12 +76,11 @@ public class JdbcAppender implements EventHandler {
     }
     
     public void open(Dictionary<String, Object> config) {
-        this.tableName = getValue(config, "table.name", "decanter");
-        this.dialect = getValue(config, "dialect", "generic");
+        this.config = config;
         try (Connection connection = dataSource.getConnection()) {
             createTable(connection);
         } catch (Exception e) {
-            LOGGER.debug("Error creating table " + tableName, e);
+            LOGGER.debug("Error creating table " + getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT), e);
         } 
     }
     
@@ -86,37 +91,39 @@ public class JdbcAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        try (Connection connection = dataSource.getConnection()) {
-            String jsonSt = marshaller.marshal(event);
-            String insertQuery = insertQueryTemplate.replaceAll("TABLENAME", tableName);
-            Long timestamp = (Long)event.getProperty(EventConstants.TIMESTAMP);
-            if (timestamp == null) {
-                timestamp = System.currentTimeMillis();
+        if (EventFilter.match(event, config)) {
+            try (Connection connection = dataSource.getConnection()) {
+                String jsonSt = marshaller.marshal(event);
+                String insertQuery = insertQueryTemplate.replaceAll("TABLENAME", getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT));
+                Long timestamp = (Long) event.getProperty(EventConstants.TIMESTAMP);
+                if (timestamp == null) {
+                    timestamp = System.currentTimeMillis();
+                }
+                try (PreparedStatement insertStatement = connection.prepareStatement(insertQuery)) {
+                    insertStatement.setLong(1, timestamp);
+                    insertStatement.setString(2, jsonSt);
+                    insertStatement.executeUpdate();
+                    LOGGER.trace("Data inserted into {} table", getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT));
+                }
+            } catch (Exception e) {
+                LOGGER.error("Can't store in the database", e);
             }
-            try (PreparedStatement insertStatement = connection.prepareStatement(insertQuery)) {
-                insertStatement.setLong(1, timestamp);
-                insertStatement.setString(2, jsonSt);
-                insertStatement.executeUpdate();
-                LOGGER.trace("Data inserted into {} table", tableName);
-            }
-        } catch (Exception e) {
-            LOGGER.error("Can't store in the database", e);
         }
     }
 
     private void createTable(Connection connection) {
         String createTemplate = null;
-        if (dialect.equalsIgnoreCase("mysql")) {
+        if (getValue(config, DIALECT_PROPERTY, DIALECT_DEFAULT).equalsIgnoreCase("mysql")) {
             createTemplate = createTableQueryMySQLTemplate;
-        } else if (dialect.equalsIgnoreCase("derby")) {
+        } else if (getValue(config, DIALECT_PROPERTY, DIALECT_DEFAULT).equalsIgnoreCase("derby")) {
             createTemplate = createTableQueryDerbyTemplate;
         } else {
             createTemplate = createTableQueryGenericTemplate;
         }
-        String createTableQuery = createTemplate.replaceAll("TABLENAME", tableName);
+        String createTableQuery = createTemplate.replaceAll("TABLENAME", getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT));
         try (Statement createStatement = connection.createStatement()) {
             createStatement.executeUpdate(createTableQuery);
-            LOGGER.debug("Table {} has been created", tableName);
+            LOGGER.debug("Table {} has been created", getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT));
         } catch (SQLException e) {
             LOGGER.trace("Can't create table {}", e);
         }
index 6309959..b61a436 100644 (file)
@@ -32,6 +32,7 @@ import javax.json.JsonReader;
 
 import org.apache.derby.jdbc.EmbeddedDataSource;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
 import org.junit.Assert;
 import org.junit.Test;
@@ -39,12 +40,13 @@ import org.osgi.service.event.Event;
 import org.osgi.service.event.EventConstants;
 
 public class TestJdbcAppender {
+
     private static final String TABLE_NAME = "decanter";
     private static final String TOPIC = "decanter/collect/jmx";
     private static final long TIMESTAMP = 1454428780634L;
 
     @Test
-    public void testHandleEvent() throws SQLException {
+    public void test() throws SQLException {
         System.setProperty("derby.stream.error.file", "target/derby.log");
         Marshaller marshaller = new JsonMarshaller();
         EmbeddedDataSource dataSource = new EmbeddedDataSource();
@@ -60,9 +62,60 @@ public class TestJdbcAppender {
         config.put("dialect", "derby");
         appender.open(config);
         
-        Map<String, Object> properties = new HashMap<>();
-        properties.put(EventConstants.TIMESTAMP, TIMESTAMP);
-        Event event = new Event(TOPIC, properties);
+        Map<String, Object> data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        Event event = new Event(TOPIC, data);
+        appender.handleEvent(event);
+
+        try (Connection con = dataSource.getConnection(); Statement statement = con.createStatement();) {
+            ResultSet res = statement.executeQuery("select timestamp, content from " + TABLE_NAME);
+            res.next();
+            long dbTimeStamp = res.getLong(1);
+            String json = res.getString(2);
+            JsonReader reader = Json.createReader(new StringReader(json));
+            JsonObject jsonO = reader.readObject();
+            Assert.assertEquals("Timestamp db", TIMESTAMP, dbTimeStamp);
+            Assert.assertEquals("Timestamp string", "2016-02-02T15:59:40,634Z",jsonO.getString("@timestamp"));
+            Assert.assertEquals("timestamp long", TIMESTAMP, jsonO.getJsonNumber(EventConstants.TIMESTAMP).longValue());
+            Assert.assertEquals("Topic", TOPIC, jsonO.getString(EventConstants.EVENT_TOPIC.replace('.','_')));
+            Assert.assertFalse(res.next());
+        }
+    }
+
+    @Test
+    public void testWithFilter() throws SQLException {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+        Marshaller marshaller = new JsonMarshaller();
+        EmbeddedDataSource dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName("target/testFilterDB");
+        dataSource.setCreateDatabase("create");
+
+        deleteTable(dataSource);
+
+        JdbcAppender appender = new JdbcAppender();
+        appender.marshaller = marshaller;
+        appender.dataSource = dataSource;
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put("dialect", "derby");
+        config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+        config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+        appender.open(config);
+
+        Map<String, Object> data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        data.put("this_refused", "data");
+        Event event = new Event(TOPIC, data);
+        appender.handleEvent(event);
+
+        data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        data.put("property", "this_refused");
+        event = new Event(TOPIC, data);
+        appender.handleEvent(event);
+
+        data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        event = new Event(TOPIC, data);
         appender.handleEvent(event);
 
         try (Connection con = dataSource.getConnection(); Statement statement = con.createStatement();) {
index 86c9b42..416522f 100644 (file)
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.geronimo.specs</groupId>
             <artifactId>geronimo-jms_1.1_spec</artifactId>
             <version>1.1.1</version>
@@ -71,7 +75,8 @@
                             *
                         </Import-Package>
                         <Private-Package>
-                            org.apache.karaf.decanter.appender.jms
+                            org.apache.karaf.decanter.appender.jms,
+                            org.apache.karaf.decanter.appender.utils
                         </Private-Package>
                         <_dsannotations>*</_dsannotations>
                         <_dsannotations-options>nocapabilities,norequirements</_dsannotations-options>
index c328b19..ae1cb00 100644 (file)
@@ -22,6 +22,7 @@ import java.util.Map;
 import javax.jms.*;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -39,6 +40,18 @@ import org.slf4j.LoggerFactory;
 )
 public class JmsAppender implements EventHandler {
 
+    public static String USERNAME_PROPERTY = "username";
+    public static String PASSWORD_PROPERTY = "password";
+    public static String DESTINATION_NAME_PROPERTY = "destination.name";
+    public static String DESTINATION_TYPE_PROPERTY = "destination.type";
+    public static String MESSAGE_TYPE_PROPERTY = "message.type";
+
+    public static String USERNAME_DEFAULT = null;
+    public static String PASSWORD_DEFAULT = null;
+    public static String DESTINATION_NAME_DEFAULT = "decanter";
+    public static String DESTINATION_TYPE_DEFAULT = "queue";
+    public static String MESSAGE_TYPE_DEFAULT = "text";
+
     @Reference
     public ConnectionFactory connectionFactory;
 
@@ -47,11 +60,8 @@ public class JmsAppender implements EventHandler {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(JmsAppender.class);
 
-    private String username;
-    private String password;
-    private String destinationName;
-    private String destinationType;
-    private String messageType;
+    private Dictionary<String, Object> config;
+
 
     @SuppressWarnings("unchecked")
     @Activate
@@ -60,45 +70,42 @@ public class JmsAppender implements EventHandler {
     }
     
     void activate(Dictionary<String, Object> config) {
-        username = getProperty(config, "username", null);
-        password = getProperty(config, "password", null);
-        destinationName = getProperty(config, "destination.name", "decanter");
-        destinationType = getProperty(config, "destination.type", "queue");
-        messageType = getProperty(config, "message.type", "text");
-        LOGGER.info("Decanter JMS Appender started sending to {} {}",destinationType, destinationName);
+        this.config = config;
+        LOGGER.info("Decanter JMS Appender started sending to {} {}", getValue(config, DESTINATION_TYPE_PROPERTY, DESTINATION_TYPE_DEFAULT), getValue(config, DESTINATION_NAME_PROPERTY, DESTINATION_NAME_DEFAULT));
     }
 
-    private String getProperty(Dictionary<String, Object> properties, String key, String defaultValue) {
+    private String getValue(Dictionary<String, Object> properties, String key, String defaultValue) {
         return (properties.get(key) != null) ? (String) properties.get(key) : defaultValue;
     }
 
     @Override
     public void handleEvent(Event event) {
-        Connection connection = null;
-        Session session = null;
-        try {
-            connection = createConnection();
-            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Destination destination = createDestination(session);
-            MessageProducer producer = session.createProducer(destination);
-            if (messageType.equalsIgnoreCase("text")) {
-                TextMessage message = session.createTextMessage(marshaller.marshal(event));
-                producer.send(message);
-            } else {
-                MapMessage message = session.createMapMessage();
-                for (String name : event.getPropertyNames()) {
-                    Object value = event.getProperty(name);
-                    setProperty(message, name, value);
+        if (EventFilter.match(event, config)) {
+            Connection connection = null;
+            Session session = null;
+            try {
+                connection = createConnection();
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Destination destination = createDestination(session);
+                MessageProducer producer = session.createProducer(destination);
+                if (getValue(config, MESSAGE_TYPE_PROPERTY, MESSAGE_TYPE_DEFAULT).equalsIgnoreCase("text")) {
+                    TextMessage message = session.createTextMessage(marshaller.marshal(event));
+                    producer.send(message);
+                } else {
+                    MapMessage message = session.createMapMessage();
+                    for (String name : event.getPropertyNames()) {
+                        Object value = event.getProperty(name);
+                        setProperty(message, name, value);
+                    }
+                    producer.send(message);
                 }
-                producer.send(message);
+                producer.close();
+            } catch (Exception e) {
+                LOGGER.warn("Can't send to JMS broker", e);
+            } finally {
+                safeClose(session);
+                safeClose(connection);
             }
-            producer.close();
-        } catch (Exception e) {
-            LOGGER.warn("Can't send to JMS broker", e);
-        }
-        finally {
-            safeClose(session);
-            safeClose(connection);
         }
     }
 
@@ -125,12 +132,16 @@ public class JmsAppender implements EventHandler {
     }
 
     private Destination createDestination(Session session) throws JMSException {
+        String destinationType = getValue(config, DESTINATION_TYPE_PROPERTY, DESTINATION_TYPE_DEFAULT);
+        String destinationName = getValue(config, DESTINATION_NAME_PROPERTY, DESTINATION_NAME_DEFAULT);
         return (destinationType.equalsIgnoreCase("topic"))
             ? session.createTopic(destinationName)
             : session.createQueue(destinationName);
     }
 
     private Connection createConnection() throws JMSException {
+        String username = getValue(config, USERNAME_PROPERTY, USERNAME_DEFAULT);
+        String password = getValue(config, PASSWORD_PROPERTY, PASSWORD_DEFAULT);
         return (username != null) 
             ? connectionFactory.createConnection(username, password)
             : connectionFactory.createConnection();
index 36ac273..aa2e24f 100644 (file)
@@ -28,6 +28,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.junit.Assert;
 import org.junit.Test;
 import org.osgi.service.event.Event;
@@ -35,7 +36,7 @@ import org.osgi.service.event.Event;
 public class JmsAppenderTest {
 
     @Test
-    public void testHandleEvent() throws JMSException {
+    public void test() throws JMSException {
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
         JmsAppender appender = new JmsAppender();
         appender.connectionFactory = cf;
@@ -70,4 +71,52 @@ public class JmsAppenderTest {
         Object map = message.getObject("map");
         Assert.assertTrue(map instanceof Map);
     }
+
+    @Test
+    public void testWithFilter() throws JMSException {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        JmsAppender appender = new JmsAppender();
+        appender.connectionFactory = cf;
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put("message.type", "map");
+        config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+        config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+        appender.activate(config);
+
+        Connection con = cf.createConnection();
+        con.start();
+        Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = sess.createConsumer(sess.createQueue("decanter"));
+
+        Map<String, Object> data = new HashMap<String, Object>();
+        data.put("timestamp", 1l);
+        data.put("string", "test");
+        data.put("boolean", true);
+        data.put("integer", 1);
+        data.put("testnull", null);
+        data.put("map", new HashMap<String, String>());
+        appender.handleEvent(new Event("decanter/collect", data));
+
+        data = new HashMap<>();
+        data.put("refused_property", "value");
+        appender.handleEvent(new Event("decanter/collect", data));
+
+        data = new HashMap<>();
+        data.put("property", "refused_value");
+        appender.handleEvent(new Event("decanter/collect", data));
+
+        MapMessage message = (MapMessage)consumer.receive(1000);
+        consumer.close();
+        sess.close();
+        con.close();
+
+        Assert.assertEquals(1l, message.getObject("timestamp"));
+        Assert.assertEquals("test", message.getObject("string"));
+        Assert.assertEquals(true, message.getObject("boolean"));
+        Assert.assertEquals(1, message.getObject("integer"));
+        Object map = message.getObject("map");
+        Assert.assertTrue(map instanceof Map);
+    }
+
 }
index 61e2f02..8eaf002 100644 (file)
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>${kafka.version}</version>
         </dependency>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>3.4.10</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${kafka.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-simple</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>1.7.25</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.kafka,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsannotations>*</_dsannotations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index f62cac0..5d6e775 100644 (file)
@@ -62,7 +62,7 @@ public class ConfigMapper {
     }
 
     private void process(String key, String defaultValue) {
-        String value = (String)confSource.get(key);
+        String value = (String) confSource.get(key);
         String usedValue = (value != null) ? value : defaultValue;
         if (usedValue != null) {
             config.put(key, usedValue);
index c09dd66..9bb2c48 100644 (file)
@@ -16,6 +16,7 @@
  */
 package org.apache.karaf.decanter.appender.kafka;
 
+import java.util.Dictionary;
 import java.util.Properties;
 
 import org.apache.kafka.clients.producer.Callback;
@@ -23,6 +24,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -48,6 +50,7 @@ public class KafkaAppender implements EventHandler {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(KafkaAppender.class);
 
+    private Dictionary<String, Object> config;
     private Properties properties;
     private String topic;
     private KafkaProducer<String, String> producer;
@@ -55,7 +58,12 @@ public class KafkaAppender implements EventHandler {
     @Activate
     @SuppressWarnings("unchecked")
     public void activate(ComponentContext context) {
-        this.properties = ConfigMapper.map(context.getProperties());
+        activate(context.getProperties());
+    }
+
+    public void activate(Dictionary<String, Object> config) {
+        this.config = config;
+        this.properties = ConfigMapper.map(config);
         this.topic = properties.getProperty("topic");
         properties.remove("topic");
 
@@ -71,20 +79,22 @@ public class KafkaAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        try {
-            String type = (String)event.getProperty("type");
-            String data = marshaller.marshal(event);
-            producer.send(new ProducerRecord<>(topic, type, data), new Callback() {
-                @Override
-                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-                    if (e != null) {
-                        LOGGER.warn("Can't send event to Kafka broker", e);
+        if (EventFilter.match(event, config)) {
+            try {
+                String type = (String) event.getProperty("type");
+                String data = marshaller.marshal(event);
+                producer.send(new ProducerRecord<>(topic, type, data), new Callback() {
+                    @Override
+                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+                        if (e != null) {
+                            LOGGER.warn("Can't send event to Kafka broker", e);
+                        }
                     }
-                }
-            }).get();
-            producer.flush();
-        } catch (Exception e) {
-            LOGGER.warn("Error sending event to kafka", e);
+                }).get();
+                producer.flush();
+            } catch (Exception e) {
+                LOGGER.warn("Error sending event to kafka", e);
+            }
         }
     }
     
diff --git a/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedKafkaBroker.java b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedKafkaBroker.java
new file mode 100644 (file)
index 0000000..4de295a
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.kafka;
+
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZkUtils;
+import org.apache.kafka.common.utils.SystemTime;
+import org.junit.rules.ExternalResource;
+import scala.Option;
+import scala.collection.mutable.Buffer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class EmbeddedKafkaBroker extends ExternalResource {
+
+    private final Integer brokerId;
+    private final Integer port;
+    private final String zkConnection;
+    private final Properties baseProperties;
+
+    private final String brokerList;
+
+    private KafkaServer kafkaServer;
+    private File logDir;
+    private ZkUtils zkUtils;
+
+    public EmbeddedKafkaBroker(int brokerId, int port, String zkConnection, Properties baseProperties) {
+        this.brokerId = brokerId;
+        this.port = port;
+        this.zkConnection = zkConnection;
+        this.baseProperties = baseProperties;
+        this.brokerList = "localhost:" + this.port;
+    }
+
+    @Override
+    public void before() {
+        logDir = new File("target/test-classes/kafka-log");
+        logDir.mkdirs();
+
+        Properties properties = new Properties();
+        properties.putAll(baseProperties);
+        properties.setProperty("zookeeper.connect", zkConnection);
+        properties.setProperty("broker.id", brokerId.toString());
+        properties.setProperty("host.name", "localhost");
+        properties.setProperty("port", Integer.toString(port));
+        properties.setProperty("log.dir", logDir.getAbsolutePath());
+        properties.setProperty("num.partitions", String.valueOf(1));
+        properties.setProperty("auto.create.topics.enable", String.valueOf(Boolean.TRUE));
+        properties.setProperty("log.flush.interval.messages", String.valueOf(1));
+        properties.setProperty("offsets.topic.replication.factor", String.valueOf(1));
+
+        kafkaServer = startBroker(properties);
+    }
+
+    private KafkaServer startBroker(Properties props) {
+        zkUtils = ZkUtils.apply(
+                zkConnection,
+                30000,
+                30000,
+                false);
+        List<KafkaMetricsReporter> kmrList = new ArrayList<>();
+        Buffer<KafkaMetricsReporter> metricsList = scala.collection.JavaConversions.asScalaBuffer(kmrList);
+        KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime(), Option.<String>empty(), metricsList);
+        server.startup();
+        return server;
+    }
+
+    public String getBrokerList() {
+        return brokerList;
+    }
+
+    public Integer getPort() {
+        return port;
+    }
+
+    public void after() {
+        kafkaServer.shutdown();
+        logDir.delete();
+    }
+
+}
diff --git a/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedZooKeeper.java b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedZooKeeper.java
new file mode 100644 (file)
index 0000000..2425bf8
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.kafka;
+
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.rules.ExternalResource;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class EmbeddedZooKeeper extends ExternalResource {
+
+    private int port = -1;
+    private int tickTime = 500;
+
+    private ServerCnxnFactory cnxnFactory;
+    private File snapshotDir;
+    private File logDir;
+    private ZooKeeperServer zooKeeperServer;
+
+    public EmbeddedZooKeeper(int port) {
+        this.port = port;
+    }
+
+    @Override
+    public void before() throws IOException {
+        snapshotDir = new File("target/test-classes/zk-snapshot");
+        snapshotDir.mkdirs();
+        logDir = new File("target/test-classes/zk-log");
+        logDir.mkdirs();
+
+        try {
+            zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, tickTime);
+            cnxnFactory = new NIOServerCnxnFactory();
+            cnxnFactory.configure(new InetSocketAddress("localhost", port), 1024);
+            cnxnFactory.startup(zooKeeperServer);
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void after() {
+        cnxnFactory.shutdown();
+        zooKeeperServer.shutdown();
+
+        logDir.delete();
+        snapshotDir.delete();
+    }
+
+    public String getConnection() {
+        return "localhost:" + port;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+}
diff --git a/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/KafkaAppenderTest.java b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/KafkaAppenderTest.java
new file mode 100644 (file)
index 0000000..7172502
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.*;
+import org.osgi.service.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class KafkaAppenderTest {
+
+    @ClassRule
+    public static EmbeddedZooKeeper zookeeper = new EmbeddedZooKeeper(PortFinder.getNextAvailable(23000));
+
+    @ClassRule
+    public static EmbeddedKafkaBroker kafkaBroker =
+            new EmbeddedKafkaBroker(0,
+                    PortFinder.getNextAvailable(24000),
+                    zookeeper.getConnection(),
+                    new Properties());
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaAppenderTest.class);
+
+    @BeforeClass
+    public static void beforeClass() {
+        LOG.info("Embedded Zookeeper connection: " + zookeeper.getConnection());
+        LOG.info("Embedded Kafka cluster broker list: " + kafkaBroker.getBrokerList());
+    }
+
+    @Test
+    @Ignore
+    public void test() throws Exception {
+        KafkaAppender appender = new KafkaAppender();
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put("topic", "test");
+        config.put("bootstrap.servers", kafkaBroker.getBrokerList());
+        appender.activate(config);
+
+        Map<String, Object> data = new HashMap<>();
+        data.put("foo", "bar");
+        Event event = new Event("decanter/collect", data);
+        appender.handleEvent(event);
+
+        Properties kafkaConfig = new Properties();
+        kafkaConfig.put("topic", "test");
+        kafkaConfig.put("bootstrap.servers", kafkaBroker.getBrokerList());
+        KafkaConsumer consumer = new KafkaConsumer<String, String>(kafkaConfig);
+        consumer.subscribe(Arrays.asList("test"));
+        ConsumerRecords<String, String> records = consumer.poll(1000);
+        Assert.assertFalse(records.isEmpty());
+        Assert.assertEquals(1, records.count());
+    }
+
+}
diff --git a/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/PortFinder.java b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/PortFinder.java
new file mode 100644 (file)
index 0000000..df83631
--- /dev/null
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PortFinder {
+
+    /**
+     * The minimum server currentMinPort number for IPv4.
+     * Set at 1100 to avoid returning privileged currentMinPort numbers.
+     */
+    public static final int MIN_PORT_NUMBER = 1100;
+
+    /**
+     * The maximum server currentMinPort number for IPv4.
+     */
+    public static final int MAX_PORT_NUMBER = 65535;
+
+    private static final Logger LOG = LoggerFactory.getLogger(PortFinder.class);
+
+    /**
+     * We'll hold open the lowest port in this process
+     * so parallel processes won't use the same block
+     * of ports.   They'll go up to the next block.
+     */
+    private static final ServerSocket LOCK;
+
+    /**
+     * Incremented to the next lowest available port when getNextAvailable() is called.
+     */
+    private static AtomicInteger currentMinPort = new AtomicInteger(MIN_PORT_NUMBER);
+
+    /**
+     * Creates a new instance.
+     */
+    private PortFinder() {
+        // Do nothing
+    }
+
+    static {
+        int port = MIN_PORT_NUMBER;
+        ServerSocket ss = null;
+
+        while (ss == null) {
+            try {
+                ss = new ServerSocket(port);
+            } catch (Exception e) {
+                ss = null;
+                port += 200;
+            }
+        }
+        LOCK = ss;
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                try {
+                    LOCK.close();
+                } catch (Exception ex) {
+                    //ignore
+                }
+            }
+        });
+        currentMinPort.set(port + 1);
+    }
+
+    /**
+     * Gets the next available port starting at the lowest number. This is the preferred
+     * method to use. The port return is immediately marked in use and doesn't rely on the caller actually opening
+     * the port.
+     *
+     * @throws IllegalArgumentException is thrown if the port number is out of range
+     * @throws NoSuchElementException if there are no ports available
+     * @return the available port
+     */
+    public static synchronized int getNextAvailable() {
+        int next = getNextAvailable(currentMinPort.get());
+        currentMinPort.set(next + 1);
+        return next;
+    }
+
+    /**
+     * Gets the next available port starting at a given from port.
+     *
+     * @param fromPort the from port to scan for availability
+     * @throws IllegalArgumentException is thrown if the port number is out of range
+     * @throws NoSuchElementException if there are no ports available
+     * @return the available port
+     */
+    public static synchronized int getNextAvailable(int fromPort) {
+        if (fromPort < currentMinPort.get() || fromPort > MAX_PORT_NUMBER) {
+            throw new IllegalArgumentException("From port number not in valid range: " + fromPort);
+        }
+
+        for (int i = fromPort; i <= MAX_PORT_NUMBER; i++) {
+            if (available(i)) {
+                LOG.info("getNextAvailable({}) -> {}", fromPort, i);
+                return i;
+            }
+        }
+
+        throw new NoSuchElementException("Could not find an available port above " + fromPort);
+    }
+
+    /**
+     * Checks to see if a specific port is available.
+     *
+     * @param port the port number to check for availability
+     * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
+     * @throws IllegalArgumentException is thrown if the port number is out of range
+     */
+    public static boolean available(int port) throws IllegalArgumentException {
+        if (port < currentMinPort.get() || port > MAX_PORT_NUMBER) {
+            throw new IllegalArgumentException("Invalid start currentMinPort: " + port);
+        }
+
+        ServerSocket ss = null;
+        DatagramSocket ds = null;
+        try {
+            ss = new ServerSocket(port);
+            ss.setReuseAddress(true);
+            ds = new DatagramSocket(port);
+            ds.setReuseAddress(true);
+            return true;
+        } catch (IOException e) {
+            // Do nothing
+        } finally {
+            if (ds != null) {
+                ds.close();
+            }
+
+            if (ss != null) {
+                try {
+                    ss.close();
+                } catch (IOException e) {
+                    /* should not be thrown */
+                }
+            }
+        }
+
+        return false;
+    }
+
+}
index eb7c90d..0986ddc 100644 (file)
             <groupId>org.apache.karaf.decanter</groupId>
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.log,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsannotations>*</_dsannotations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index 75c62c3..1bae119 100644 (file)
@@ -17,6 +17,9 @@
 package org.apache.karaf.decanter.appender.log;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
@@ -26,6 +29,8 @@ import org.osgi.service.event.EventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Dictionary;
+
 /**
  * Karaf Decanter Log Appender
  * Listens on EventAdmin and writes to the slf4j logger.
@@ -43,16 +48,25 @@ public class LogAppender implements EventHandler {
     @Reference(cardinality = ReferenceCardinality.OPTIONAL)
     public Marshaller marshaller;
 
+    private Dictionary<String, Object> config;
+
+    @Activate
+    public void activate(ComponentContext componentContext) {
+        this.config = componentContext.getProperties();
+    }
+
     @Override
     public void handleEvent(Event event) {
-        if (marshaller != null) {
-            LOGGER.info(marshaller.marshal(event));
-        } else {
-            StringBuilder builder = new StringBuilder();
-            for (String innerKey : event.getPropertyNames()) {
-                builder.append(innerKey).append(":").append(toString(event.getProperty(innerKey))).append(" | ");
+        if (EventFilter.match(event, config)) {
+            if (marshaller != null) {
+                LOGGER.info(marshaller.marshal(event));
+            } else {
+                StringBuilder builder = new StringBuilder();
+                for (String innerKey : event.getPropertyNames()) {
+                    builder.append(innerKey).append(":").append(toString(event.getProperty(innerKey))).append(" | ");
+                }
+                LOGGER.info(builder.toString());
             }
-            LOGGER.info(builder.toString());
         }
     }
 
index 7f5c7b8..0366157 100644 (file)
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.mongodb</groupId>
             <artifactId>mongo-java-driver</artifactId>
             <version>3.2.2</version>
@@ -51,7 +55,9 @@
                 <groupId>org.apache.felix</groupId>
                 <artifactId>maven-bundle-plugin</artifactId>
                 <configuration>
+                    <obrRepository>NONE</obrRepository>
                     <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
                         <Import-Package>
                             !io.netty*,
                             org.slf4j;resolution:=optional,
                         </Import-Package>
                         <Private-Package>
                             org.apache.karaf.decanter.appender.mongodb,
+                            org.apache.karaf.decanter.appender.utils,
                             com.mongodb*,
                             org.bson*,
                             io.netty*
                         </Private-Package>
+                        <_dsannotations>*</_dsannotations>
                     </instructions>
                 </configuration>
             </plugin>
index d97edff..16fef55 100644 (file)
@@ -21,6 +21,7 @@ import com.mongodb.MongoClientURI;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.bson.Document;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.*;
@@ -40,6 +41,14 @@ import java.util.Dictionary;
 )
 public class MongoDbAppender implements EventHandler {
 
+    public static String URI_PROPERTY = "uri";
+    public static String DATABASE_PROPERTY = "database";
+    public static String COLLECTION_PROPERTY = "collection";
+
+    public static String URI_DEFAULT = "mongodb://localhost";
+    public static String DATABASE_DEFAULT = "decanter";
+    public static String COLLECTION_DEFAULT = "decanter";
+
     @Reference
     public Marshaller marshaller;
 
@@ -49,13 +58,15 @@ public class MongoDbAppender implements EventHandler {
     private MongoDatabase mongoDatabase;
     private MongoCollection mongoCollection;
 
+    private Dictionary<String, Object> config;
+
     @Activate
     public void activate(ComponentContext componentContext) {
-        Dictionary<String, Object> config = componentContext.getProperties();
+        config = componentContext.getProperties();
 
-        String uri = getValue(config, "uri", "mongodb://localhost");
-        String database = getValue(config, "database", "decanter");
-        String collection = getValue(config, "collection", "decanter");
+        String uri = getValue(config, URI_PROPERTY, URI_DEFAULT);
+        String database = getValue(config, DATABASE_PROPERTY, DATABASE_DEFAULT);
+        String collection = getValue(config, COLLECTION_PROPERTY, COLLECTION_DEFAULT);
 
         mongoClient = new MongoClient(new MongoClientURI(uri));
         mongoDatabase = mongoClient.getDatabase(database);
@@ -69,11 +80,13 @@ public class MongoDbAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        try {
-            String data = marshaller.marshal(event);
-            mongoCollection.insertOne(Document.parse(data));
-        } catch (Exception e) {
-            LOGGER.warn("Error storing event in MongoDB", e);
+        if (EventFilter.match(event, config)) {
+            try {
+                String data = marshaller.marshal(event);
+                mongoCollection.insertOne(Document.parse(data));
+            } catch (Exception e) {
+                LOGGER.warn("Error storing event in MongoDB", e);
+            }
         }
     }
 
index 9d435ba..52920eb 100644 (file)
             <groupId>org.apache.karaf.decanter.marshaller</groupId>
             <artifactId>org.apache.karaf.decanter.marshaller.json</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>javax.json</groupId>
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.mqtt,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsannotations>*</_dsannotations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index 426c209..024b53b 100644 (file)
  */
 package org.apache.karaf.decanter.appender.mqtt;
 
-import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.Dictionary;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
-import org.eclipse.paho.client.mqttv3.MqttSecurityException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
@@ -45,47 +43,59 @@ import org.slf4j.LoggerFactory;
 )
 public class MqttAppender implements EventHandler {
 
+    public static String SERVER_PROPERTY = "server";
+    public static String CLIENT_ID_PROPERTY = "clientId";
+    public static String TOPIC_PROPERTY = "topic";
+
+    public static String SERVER_DEFAULT = "tcp://localhost:9300";
+    public static String CLIENT_ID_DEFAULT = "decanter";
+    public static String TOPIC_DEFAULT = "decanter";
+
     @Reference
     public Marshaller marshaller;
 
     private final static Logger LOGGER = LoggerFactory.getLogger(MqttAppender.class);
 
     private MqttClient client;
-    private String server;
-    private String clientId;
-    private String topic;
+
+    private Dictionary<String, Object> config;
 
     @Activate
     public void activate(ComponentContext componentContext) throws Exception {
         activate(componentContext.getProperties());
     }
 
-    public void activate(Dictionary<String, Object> dictionary) throws Exception {
-        this.server = getProperty(dictionary, "server", "tcp://localhost:9300");
-        this.clientId = getProperty(dictionary, "clientId", "decanter");
-        this.topic = getProperty(dictionary, "topic", "decanter");
-        client = new MqttClient(server, clientId, new MemoryPersistence());
+    public void activate(Dictionary<String, Object> config) throws Exception {
+        this.config = config;
+        client = new MqttClient(
+                getValue(config, SERVER_PROPERTY, SERVER_DEFAULT),
+                getValue(config, CLIENT_ID_PROPERTY, CLIENT_ID_DEFAULT),
+                new MemoryPersistence());
         client.connect();
     }
 
-    private String getProperty(Dictionary<String, Object> properties, String key, String defaultValue) {
-        return (properties.get(key) != null) ? (String) properties.get(key) : defaultValue;
+    private String getValue(Dictionary<String, Object> config, String key, String defaultValue) {
+        return (config.get(key) != null) ? (String) config.get(key) : defaultValue;
     }
 
     @Override
     public void handleEvent(Event event) {
-        try {
-            MqttMessage message = new MqttMessage();
-            String jsonSt = marshaller.marshal(event);
-            message.setPayload(jsonSt.getBytes(StandardCharsets.UTF_8));
-            client.publish(topic, message);
-        } catch (Exception e) {
-            LOGGER.warn("Error sending to MQTT server " + client.getServerURI(), e);
+        if (EventFilter.match(event, config)) {
             try {
-                client.disconnect();
-                client.connect();
-            } catch (MqttException e1) {
-                e1.printStackTrace();
+                MqttMessage message = new MqttMessage();
+                String jsonSt = marshaller.marshal(event);
+                message.setPayload(jsonSt.getBytes(StandardCharsets.UTF_8));
+                client.publish(
+                        getValue(config, TOPIC_PROPERTY, TOPIC_DEFAULT),
+                        message);
+            } catch (Exception e) {
+                LOGGER.warn("Error sending to MQTT server " + client.getServerURI(), e);
+                try {
+                    client.disconnect();
+                    client.connect();
+                } catch (MqttException e1) {
+                    e1.printStackTrace();
+                }
             }
         }
     }
index b9c2660..f74420d 100644 (file)
@@ -28,6 +28,7 @@ import javax.json.JsonReader;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -41,12 +42,13 @@ import org.osgi.service.event.Event;
 import org.osgi.service.event.EventConstants;
 
 public class TestMqttAppender  {
+
     private static final String SERVER = "tcp://localhost:11883";
     private static final String TOPIC = "decanter";
     private static final long TIMESTAMP = 1454428780634L;
 
     @Test
-    public void testSend() throws URISyntaxException, Exception {
+    public void test() throws URISyntaxException, Exception {
         BrokerService brokerService = new BrokerService();
         brokerService.setUseJmx(false);
         brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
@@ -61,15 +63,30 @@ public class TestMqttAppender  {
         MqttAppender appender = new MqttAppender();
         appender.marshaller = marshaller;
         Dictionary<String, Object> config = new Hashtable<>();
-        config.put("server", SERVER);
-        config.put("clientId", "decanter");
-        config.put("topic", TOPIC);
+        config.put(MqttAppender.SERVER_PROPERTY, SERVER);
+        config.put(MqttAppender.CLIENT_ID_PROPERTY, "decanter");
+        config.put(MqttAppender.TOPIC_PROPERTY, TOPIC);
+        config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+        config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
         appender.activate(config);
 
-        Map<String, Object> properties = new HashMap<>();
-        properties.put(EventConstants.TIMESTAMP, TIMESTAMP);
-        Event event = new Event(TOPIC, properties);
+        Map<String, Object> data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        data.put("property_refused", "data");
+        Event event = new Event(TOPIC, data);
+        appender.handleEvent(event);
+
+        data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        data.put("property", "refused_value");
+        event = new Event(TOPIC, data);
         appender.handleEvent(event);
+
+        data = new HashMap<>();
+        data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+        event = new Event(TOPIC, data);
+        appender.handleEvent(event);
+
         Thread.sleep(100);
         Assert.assertEquals(1, received.size());
         
index f06253f..102ddb8 100644 (file)
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.orientechnologies</groupId>
             <artifactId>orientdb-client</artifactId>
             <version>${orientdb.version}</version>
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.orientdb,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsannotations>*</_dsannotations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index 58b6c19..fd50d0f 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.karaf.decanter.appender.orientdb;
 import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
 import com.orientechnologies.orient.core.record.impl.ODocument;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -38,17 +39,27 @@ import java.util.Dictionary;
 )
 public class OrientDBAppender implements EventHandler {
 
+    public static String URL_PROPERTY = "url";
+    public static String USERNAME_PROPERTY = "username";
+    public static String PASSWORD_PROPERTY = "password";
+
+    public static String URL_DEFAULT = "remote:localhost/decanter";
+    public static String USERNAME_DEFAULT = "root";
+    public static String PASSWORD_DEFAULT = "decanter";
+
     @Reference
     public Marshaller marshaller;
 
     private ODatabaseDocumentTx database;
 
+    private Dictionary<String, Object> config;
+
     @Activate
     public void activate(ComponentContext componentContext) {
-        Dictionary<String, Object> config = componentContext.getProperties();
-        String url = getValue(config, "url", "remote:localhost/decanter");
-        String username = getValue(config, "username", "root");
-        String password = getValue(config, "password", "decanter");
+        config = componentContext.getProperties();
+        String url = getValue(config, URL_PROPERTY, URL_DEFAULT);
+        String username = getValue(config, USERNAME_PROPERTY, USERNAME_DEFAULT);
+        String password = getValue(config, PASSWORD_PROPERTY, PASSWORD_DEFAULT);
         database = new ODatabaseDocumentTx(url).open(username, password);
     }
 
@@ -64,9 +75,11 @@ public class OrientDBAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        String json = marshaller.marshal(event);
-        ODocument document = new ODocument("decanter").fromJSON(json);
-        document.save();
+        if (EventFilter.match(event, config)) {
+            String json = marshaller.marshal(event);
+            ODocument document = new ODocument("decanter").fromJSON(json);
+            document.save();
+        }
     }
 
 }
index 3c9605b..d021ddd 100644 (file)
@@ -34,6 +34,7 @@
     <name>Apache Karaf :: Decanter :: Appender</name>
 
     <modules>
+        <module>utils</module>
         <module>camel</module>
         <module>cassandra</module>
         <module>dropwizard</module>
index 3f3d5c6..feb982e 100644 (file)
             <artifactId>redisson</artifactId>
             <version>${redisson.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
             <plugin>
                 <groupId>org.apache.felix</groupId>
                 <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
                 <configuration>
+                    <obrRepository>NONE</obrRepository>
                     <instructions>
+                        <Export-Package>!*</Export-Package>
                         <Import-Package>
                             com.esotericsoftware.kryo;resolution:=optional
                         </Import-Package>
                         <Private-Package>
                             org.redisson*,
+                            org.apache.karaf.decanter.appender.utils
                         </Private-Package>
+                        <_dsannotations>*</_dsannotations>
                     </instructions>
                 </configuration>
             </plugin>
index d38ebdb..749a320 100644 (file)
@@ -16,6 +16,7 @@
  */
 package org.apache.karaf.decanter.appender.redis;
 
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -26,8 +27,6 @@ import org.osgi.service.event.EventHandler;
 import org.redisson.Config;
 import org.redisson.Redisson;
 import org.redisson.RedissonClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Dictionary;
 import java.util.Map;
@@ -42,38 +41,46 @@ import java.util.Map;
 )
 public class RedisAppender implements EventHandler {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(RedisAppender.class);
+    public static String ADDRESS_PROPERTY = "address";
+    public static String MODE_PROPERTY = "mode";
+    public static String MAP_PROPERTY = "map";
+    public static String MASTER_ADDRESS_PROPERTY = "masterAddress";
+    public static String MASTER_NAME_PROPERTY = "masterName";
+    public static String SCAN_INTERVAL_PROPERTY = "scanInterval";
 
-    private String address;
-    private String mode;
-    private String map;
-    private String masterAddress;
-    private String masterName;
-    private int scanInterval;
+    public static String ADDRESS_DEFAULT = "localhost:6379";
+    public static String MODE_DEFAULT = "Single";
+    public static String MAP_DEFAULT = "Decanter";
+    public static String MASTER_ADDRESS_DEFAULT = null;
+    public static String MASTER_NAME_DEFAULT = null;
+    public static String SCAN_INTERVAL_DEFAULT = "2000";
 
     private RedissonClient redissonClient;
 
+    private Dictionary<String, Object> config;
+
     @Activate
     public void activate(ComponentContext componentContext) {
-        Dictionary<String, Object> properties = componentContext.getProperties();
-        address = getProperty(properties, "address", "localhost:6379");
-        mode = getProperty(properties, "mode", "Single");
-        map = getProperty(properties, "map", "Decanter");
-        masterAddress = getProperty(properties, "masterAddress", null);
-        masterName = getProperty(properties, "masterName", null);
-        scanInterval = Integer.parseInt(getProperty(properties, "scanInterval", "2000"));
+        config = componentContext.getProperties();
+
+        String address = getValue(config, ADDRESS_PROPERTY, ADDRESS_DEFAULT);
+        String mode = getValue(config, MODE_PROPERTY, MODE_DEFAULT);
+        String map = getValue(config, MAP_PROPERTY, MAP_DEFAULT);
+        String masterAddress = getValue(config, MASTER_ADDRESS_PROPERTY, MASTER_ADDRESS_DEFAULT);
+        String masterName = getValue(config, MASTER_NAME_PROPERTY, MASTER_NAME_DEFAULT);
+        int scanInterval = Integer.parseInt(getValue(config, SCAN_INTERVAL_PROPERTY, SCAN_INTERVAL_DEFAULT));
 
-        Config config = new Config();
+        Config redissonConfig = new Config();
         if (mode.equalsIgnoreCase("Single")) {
-            config.useSingleServer().setAddress(address);
+            redissonConfig.useSingleServer().setAddress(address);
         } else if (mode.equalsIgnoreCase("Master_Slave")) {
-            config.useMasterSlaveServers().setMasterAddress(masterAddress).addSlaveAddress(address);
+            redissonConfig.useMasterSlaveServers().setMasterAddress(masterAddress).addSlaveAddress(address);
         } else if (mode.equalsIgnoreCase("Sentinel")) {
-            config.useSentinelServers().addSentinelAddress(masterName).addSentinelAddress(address);
+            redissonConfig.useSentinelServers().addSentinelAddress(masterName).addSentinelAddress(address);
         } else if (mode.equalsIgnoreCase("Cluster")) {
-            config.useClusterServers().setScanInterval(scanInterval).addNodeAddress(address);
+            redissonConfig.useClusterServers().setScanInterval(scanInterval).addNodeAddress(address);
         }
-        redissonClient = Redisson.create(config);
+        redissonClient = Redisson.create(redissonConfig);
     }
 
     @Deactivate
@@ -85,13 +92,15 @@ public class RedisAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        Map<String, Object> redisMap = redissonClient.getMap(this.map);
-        for (String name : event.getPropertyNames()) {
-            redisMap.put(name, event.getProperty(name));
+        if (EventFilter.match(event, config)) {
+            Map<String, Object> redisMap = redissonClient.getMap(getValue(config, MAP_PROPERTY, MAP_DEFAULT));
+            for (String name : event.getPropertyNames()) {
+                redisMap.put(name, event.getProperty(name));
+            }
         }
     }
 
-    private String getProperty(Dictionary<String, Object> properties, String key, String defaultValue) {
+    private String getValue(Dictionary<String, Object> properties, String key, String defaultValue) {
         return (properties.get(key) != null) ? (String) properties.get(key) : defaultValue;
     }
 
index 85edd19..851d616 100644 (file)
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.karaf.decanter.marshaller</groupId>
             <artifactId>org.apache.karaf.decanter.marshaller.json</artifactId>
             <scope>test</scope> 
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.rest,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsanntations>*</_dsanntations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index b14425f..5762465 100644 (file)
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
 import java.util.Dictionary;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -44,6 +45,8 @@ import org.slf4j.LoggerFactory;
 )
 public class RestAppender implements EventHandler {
 
+    public static String URI_PROPERTY = "uri";
+
     @Reference
     public Marshaller marshaller;
 
@@ -51,6 +54,8 @@ public class RestAppender implements EventHandler {
 
     private URI uri;
 
+    private Dictionary<String, Object> config;
+
     @Activate
     @SuppressWarnings("unchecked")
     public void activate(ComponentContext context) throws URISyntaxException {
@@ -59,7 +64,8 @@ public class RestAppender implements EventHandler {
     }
 
     void activate(Dictionary<String, Object> config) throws URISyntaxException {
-        uri = new URI(getMandatoryValue(config, "uri"));
+        this.config = config;
+        uri = new URI(getMandatoryValue(config, URI_PROPERTY));
     }
 
     private String getMandatoryValue(Dictionary<String, Object> config, String key) {
@@ -73,21 +79,23 @@ public class RestAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        try {
-            HttpURLConnection connection = (HttpURLConnection)uri.toURL().openConnection();
-            connection.setDoOutput(true); 
-            connection.setInstanceFollowRedirects(false); 
-            connection.setRequestMethod("POST"); 
-            connection.setRequestProperty("Content-Type", "application/json"); 
-            connection.setRequestProperty("charset", "utf-8");
-            OutputStream out = connection.getOutputStream();
-            marshaller.marshal(event, out);
-            out.close();
-            InputStream is = connection.getInputStream();
-            is.read();
-            is.close();
-        } catch (Exception e) {
-            LOGGER.warn("Error sending event to rest service", e);
+        if (EventFilter.match(event, config)) {
+            try {
+                HttpURLConnection connection = (HttpURLConnection) uri.toURL().openConnection();
+                connection.setDoOutput(true);
+                connection.setInstanceFollowRedirects(false);
+                connection.setRequestMethod("POST");
+                connection.setRequestProperty("Content-Type", "application/json");
+                connection.setRequestProperty("charset", "utf-8");
+                OutputStream out = connection.getOutputStream();
+                marshaller.marshal(event, out);
+                out.close();
+                InputStream is = connection.getInputStream();
+                is.read();
+                is.close();
+            } catch (Exception e) {
+                LOGGER.warn("Error sending event to rest service", e);
+            }
         }
     }
     
index 7f75b2c..41a8565 100644 (file)
             <groupId>org.apache.karaf.decanter</groupId>
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.socket,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsannotations>*</_dsannotations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index 1016438..fb5d0f7 100644 (file)
@@ -17,6 +17,7 @@
 package org.apache.karaf.decanter.appender.socket;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.*;
 import org.osgi.service.event.Event;
@@ -37,44 +38,49 @@ import java.util.Dictionary;
 )
 public class SocketAppender implements EventHandler {
 
+    public static String HOST_PROPERTY = "host";
+    public static String PORT_PROPERTY = "port";
+
+    public static String HOST_DEFAULT = "localhost";
+    public static String PORT_DEFAULT = "34343";
+
     @Reference
     public Marshaller marshaller;
 
     private final static Logger LOGGER = LoggerFactory.getLogger(SocketAppender.class);
 
-    private String host;
-    private int port;
-
+    private Dictionary<String, Object> config;
 
     @Activate
     public void activate(ComponentContext componentContext) throws Exception {
-        Dictionary<String, Object> config = componentContext.getProperties();
-        host = getValue(config, "host", "localhost");
-        String portSt = getValue(config, "port", "34343");
-        port = Integer.parseInt(portSt);
+        this.config = componentContext.getProperties();
     }
 
     @Override
     public void handleEvent(Event event) {
-        Socket socket = null;
-        PrintWriter writer = null;
-        try {
-            socket = new Socket(host, port);
-            String data = marshaller.marshal(event);
-            writer = new PrintWriter(socket.getOutputStream(), true);
-            writer.println(data);
-        } catch (Exception e) {
-            LOGGER.warn("Error sending data on the socket", e);
-        } finally {
-            if (writer != null) {
-                writer.flush();
-                writer.close();
-            }
-            if (socket != null) {
-                try {
-                    socket.close();
-                } catch (Exception e) {
-                    // nothing to do
+        if (EventFilter.match(event, config)) {
+            Socket socket = null;
+            PrintWriter writer = null;
+            try {
+                socket = new Socket(
+                        getValue(config, HOST_PROPERTY, HOST_DEFAULT),
+                        Integer.parseInt(getValue(config, PORT_PROPERTY, PORT_DEFAULT)));
+                String data = marshaller.marshal(event);
+                writer = new PrintWriter(socket.getOutputStream(), true);
+                writer.println(data);
+            } catch (Exception e) {
+                LOGGER.warn("Error sending data on the socket", e);
+            } finally {
+                if (writer != null) {
+                    writer.flush();
+                    writer.close();
+                }
+                if (socket != null) {
+                    try {
+                        socket.close();
+                    } catch (Exception e) {
+                        // nothing to do
+                    }
                 }
             }
         }
index 7738326..1fd90a9 100644 (file)
             <groupId>org.apache.karaf.decanter</groupId>
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.appender.timescaledb,
+                            org.apache.karaf.decanter.appender.utils
+                        </Private-Package>
+                        <_dsannotations>*</_dsannotations>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
index ef95c25..c9ef9f3 100644 (file)
@@ -23,6 +23,7 @@ import java.sql.Statement;
 import java.util.Dictionary;
 import javax.sql.DataSource;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -40,6 +41,10 @@ import org.slf4j.LoggerFactory;
 )
 public class TimescaleDbAppender implements EventHandler {
 
+    public static String TABLE_NAME_PROPERTY = "table.name";
+
+    public static String TABLE_NAME_DEFAULT = "decanter";
+
     @Reference
     public Marshaller marshaller;
 
@@ -58,7 +63,7 @@ public class TimescaleDbAppender implements EventHandler {
     private final static String insertQueryTemplate =
             "INSERT INTO TABLENAME(timestamp, content) VALUES(?,?)";
 
-    String tableName;
+    private Dictionary<String, Object> config;
 
     @SuppressWarnings("unchecked")
     @Activate
@@ -67,7 +72,8 @@ public class TimescaleDbAppender implements EventHandler {
     }
     
     public void open(Dictionary<String, Object> config) {
-        this.tableName = getValue(config, "table.name", "decanter");
+        this.config = config;
+        String tableName = getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT);
         try (Connection connection = dataSource.getConnection()) {
             createStructure(connection);
         } catch (Exception e) {
@@ -82,25 +88,29 @@ public class TimescaleDbAppender implements EventHandler {
 
     @Override
     public void handleEvent(Event event) {
-        try (Connection connection = dataSource.getConnection()) {
-            String jsonSt = marshaller.marshal(event);
-            String insertQuery = insertQueryTemplate.replaceAll("TABLENAME", tableName);
-            Long timestamp = (Long)event.getProperty(EventConstants.TIMESTAMP);
-            if (timestamp == null) {
-                timestamp = System.currentTimeMillis();
+        if (EventFilter.match(event, config)) {
+            try (Connection connection = dataSource.getConnection()) {
+                String tableName = getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT);
+                String jsonSt = marshaller.marshal(event);
+                String insertQuery = insertQueryTemplate.replaceAll("TABLENAME", tableName);
+                Long timestamp = (Long) event.getProperty(EventConstants.TIMESTAMP);
+                if (timestamp == null) {
+                    timestamp = System.currentTimeMillis();
+                }
+                try (PreparedStatement insertStatement = connection.prepareStatement(insertQuery)) {
+                    insertStatement.setLong(1, timestamp);
+                    insertStatement.setString(2, jsonSt);
+                    insertStatement.executeUpdate();
+                    LOGGER.trace("Data inserted into {} table", tableName);
+                }
+            } catch (Exception e) {
+                LOGGER.error("Can't store in the database", e);
             }
-            try (PreparedStatement insertStatement = connection.prepareStatement(insertQuery)) {
-                insertStatement.setLong(1, timestamp);
-                insertStatement.setString(2, jsonSt);
-                insertStatement.executeUpdate();
-                LOGGER.trace("Data inserted into {} table", tableName);
-            }
-        } catch (Exception e) {
-            LOGGER.error("Can't store in the database", e);
         }
     }
 
     private void createStructure(Connection connection) {
+        String tableName = getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT);
         String createTemplate = createTableQueryTemplate;
         String createTableQuery = createTemplate.replaceAll("TABLENAME", tableName);
 
diff --git a/appender/utils/pom.xml b/appender/utils/pom.xml
new file mode 100644 (file)
index 0000000..854fbdd
--- /dev/null
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <!--
+
+        Licensed to the Apache Software Foundation (ASF) under one or more
+        contributor license agreements.  See the NOTICE file distributed with
+        this work for additional information regarding copyright ownership.
+        The ASF licenses this file to You under the Apache License, Version 2.0
+        (the "License"); you may not use this file except in compliance with
+        the License.  You may obtain a copy of the License at
+
+           http://www.apache.org/licenses/LICENSE-2.0
+
+        Unless required by applicable law or agreed to in writing, software
+        distributed under the License is distributed on an "AS IS" BASIS,
+        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+        See the License for the specific language governing permissions and
+        limitations under the License.
+    -->
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.karaf.decanter</groupId>
+        <artifactId>appender</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.apache.karaf.decanter.appender</groupId>
+    <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Karaf :: Decanter :: Appender :: Utils</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi.cmpn</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/appender/utils/src/main/java/org/apache/karaf/decanter/appender/utils/EventFilter.java b/appender/utils/src/main/java/org/apache/karaf/decanter/appender/utils/EventFilter.java
new file mode 100644 (file)
index 0000000..8f5efa0
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.utils;
+
+import org.osgi.service.event.Event;
+
+import java.util.Dictionary;
+
+public class EventFilter {
+
+    public static String PROPERTY_NAME_EXCLUDE_CONFIG = "event.property.name.exclude";
+    public static String PROPERTY_NAME_INCLUDE_CONFIG = "event.property.name.include";
+    public static String PROPERTY_VALUE_EXCLUDE_CONFIG = "event.property.value.exclude";
+    public static String PROPERTY_VALUE_INCLUDE_CONFIG = "event.property.value.include";
+
+    public static boolean match(Event event, Dictionary<String, Object> config) {
+        if (config == null) {
+            return true;
+        }
+
+        String nameExcludeRegex = (config.get(PROPERTY_NAME_EXCLUDE_CONFIG) != null) ? (String) config.get(PROPERTY_NAME_EXCLUDE_CONFIG) : null;
+        String nameIncludeRegex = (config.get(PROPERTY_NAME_INCLUDE_CONFIG) != null) ? (String) config.get(PROPERTY_NAME_INCLUDE_CONFIG) : null;
+        String valueExcludeRegex = (config.get(PROPERTY_VALUE_EXCLUDE_CONFIG) != null) ? (String) config.get(PROPERTY_VALUE_EXCLUDE_CONFIG) : null;
+        String valueIncludeRegex = (config.get(PROPERTY_VALUE_INCLUDE_CONFIG) != null) ? (String) config.get(PROPERTY_VALUE_INCLUDE_CONFIG) : null;
+
+        for (String name : event.getPropertyNames()) {
+            if (nameExcludeRegex != null && name.matches(nameExcludeRegex)) {
+                return false;
+            }
+
+            if (nameIncludeRegex != null && name.matches(nameIncludeRegex)) {
+                return true;
+            }
+
+            if (event.getProperty(name) != null && event.getProperty(name) instanceof String) {
+                if (valueExcludeRegex != null && ((String) event.getProperty(name)).matches(valueExcludeRegex)) {
+                    return false;
+                }
+                if (valueIncludeRegex != null && ((String) event.getProperty(name)).matches(valueIncludeRegex)) {
+                    return true;
+                }
+            }
+
+        }
+        return true;
+    }
+
+}
diff --git a/appender/utils/src/test/java/org/apache/karaf/decanter/appender/utils/EventFilterTest.java b/appender/utils/src/test/java/org/apache/karaf/decanter/appender/utils/EventFilterTest.java
new file mode 100644 (file)
index 0000000..f564cee
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+public class EventFilterTest {
+
+    @Test
+    public void noFilter() {
+        // no config
+        Assert.assertTrue(EventFilter.match(prepareTestEvent(), null));
+        // no filter in the config
+        Dictionary<String, Object> config = new Hashtable<>();
+        Assert.assertTrue(EventFilter.match(prepareTestEvent(), config));
+    }
+
+    @Test
+    public void propertyNameFilter() {
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, "key.*");
+        // exclude
+        Assert.assertFalse(EventFilter.match(prepareTestEvent(), config));
+        // exclude first
+        config.put(EventFilter.PROPERTY_NAME_INCLUDE_CONFIG, "other");
+        Assert.assertFalse(EventFilter.match(prepareTestEvent(), config));
+        // include
+        config.remove(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG);
+        Assert.assertTrue(EventFilter.match(prepareTestEvent(), config));
+    }
+
+    @Test
+    public void propertyValueFilter() {
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, "value.*");
+        // exclude
+        Assert.assertFalse(EventFilter.match(prepareTestEvent(), config));
+        // exclude first
+        config.put(EventFilter.PROPERTY_VALUE_INCLUDE_CONFIG, "other");
+        Assert.assertFalse(EventFilter.match(prepareTestEvent(), config));
+        // include
+        config.remove(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG);
+        Assert.assertTrue(EventFilter.match(prepareTestEvent(), config));
+    }
+
+    private Event prepareTestEvent() {
+        Map<String, Object> map = new HashMap<>();
+        map.put("key1", "value1");
+        map.put("key2", "value2");
+        map.put("other", "other");
+        return new Event("test", map);
+    }
+
+}
index cf05680..10af852 100644 (file)
@@ -1,5 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
     <!--
 
             <groupId>org.apache.karaf.decanter</groupId>
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.appender</groupId>
+            <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
             <plugin>
                 <groupId>org.apache.felix</groupId>
                 <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
                 <configuration>
+                    <obrRepository>NONE</obrRepository>
                     <instructions>
+                        <Export-Package>!*</Export-Package>
                         <Import-Package>
                             org.osgi.service.component,
                             org.osgi.service.event,
                             *
                         </Import-Package>
                         <Private-Package>
-                            org.apache.karaf.decanter.appender.websocket
+                            org.apache.karaf.decanter.appender.websocket,
+                            org.apache.karaf.decanter.appender.utils
                         </Private-Package>
+                        <_dsannotations>*</_dsannotations>
                     </instructions>
                 </configuration>
-            </plugin>            <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <executions>
-                <execution>
-                    <phase>package</phase>
-                    <goals>
-                        <goal>attach-artifact</goal>
-                    </goals>
-                    <configuration>
-                        <artifacts>
-                            <artifact>
-                                <file>src/main/cfg/org.apache.karaf.decanter.appender.websocket.servlet.cfg</file>
-                                <type>cfg</type>
-                            </artifact>
-                        </artifacts>
-                    </configuration>
-                </execution>
-            </executions>
-        </plugin>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>src/main/cfg/org.apache.karaf.decanter.appender.websocket.servlet.cfg</file>
+                                    <type>cfg</type>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 
index 195ed83..79cdd4a 100644 (file)
@@ -17,6 +17,7 @@
 package org.apache.karaf.decanter.appender.websocket;
 
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@@ -34,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.Dictionary;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -46,10 +48,13 @@ import java.util.Set;
 @WebSocket
 public class DecanterWebSocketAppender implements EventHandler {
 
+    public static String ALIAS_PROPERTY = "servlet.alias";
+
+    public static String ALIAS_DEFAULT = "/decanter-websocket";
+
     private static final Logger LOG = LoggerFactory.getLogger(DecanterWebSocketAppender.class);
 
     private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<Session>());
-    private String alias;
 
     @Reference
     private Marshaller marshaller;
@@ -57,6 +62,8 @@ public class DecanterWebSocketAppender implements EventHandler {
     @Reference
     private HttpService httpService;
 
+    private Dictionary<String, Object> config;
+
     @OnWebSocketConnect
     public void onOpen(Session session) {
         session.setIdleTimeout(-1);
@@ -70,27 +77,34 @@ public class DecanterWebSocketAppender implements EventHandler {
 
     @Activate
     public void activate(ComponentContext componentContext) throws Exception {
-        alias = (String) componentContext.getProperties().get("servlet.alias");
+        this.config = componentContext.getProperties();
+        String alias = (String) config.get(ALIAS_PROPERTY);
         if (alias == null) {
-            alias = "/decanter-websocket";
+            alias = ALIAS_DEFAULT;
         }
         httpService.registerServlet(alias, new DecanterWebSocketServlet(), null, null);
     }
 
     @Deactivate
     public void deactivate() throws Exception {
+        String alias = (String) config.get(ALIAS_PROPERTY);
+        if (alias == null) {
+            alias = ALIAS_DEFAULT;
+        }
         httpService.unregister(alias);
     }
 
     @Override
     public void handleEvent(Event event) {
-        String message = marshaller.marshal(event);
-        synchronized (sessions) {
-            for (Session session : sessions) {
-                try {
-                    session.getRemote().sendString(message);
-                } catch (Exception e) {
-                    LOG.warn("Can't publish to remote websocket endpoint",  e);
+        if (EventFilter.match(event, config)) {
+            String message = marshaller.marshal(event);
+            synchronized (sessions) {
+                for (Session session : sessions) {
+                    try {
+                        session.getRemote().sendString(message);
+                    } catch (Exception e) {
+                        LOG.warn("Can't publish to remote websocket endpoint", e);
+                    }
                 }
             }
         }
diff --git a/pom.xml b/pom.xml
index 63e4edb..ac76a82 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -52,7 +52,7 @@
         <elasticsearch6.bundle.version>6.2.4_1</elasticsearch6.bundle.version>
         <glassfish-json.version>1.0.4</glassfish-json.version>
         <json-api.version>1.0</json-api.version>
-        <kafka.version>1.1.1</kafka.version>
+        <kafka.version>2.1.0</kafka.version>
         <karaf.version>4.1.4</karaf.version>
         <kibana.version>3.1.1</kibana.version>
         <kibana4.version>4.1.2</kibana4.version>
                 <artifactId>org.apache.karaf.decanter.collector.utils</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.karaf.decanter.appender</groupId>
+                <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+                <version>${project.version}</version>
+            </dependency>
 
             <!-- OSGi -->
             <dependency>