Fork the sink from main flume build master
authorRalph Goers <rgoers@apache.org>
Mon, 4 Apr 2022 08:44:16 +0000 (01:44 -0700)
committerRalph Goers <rgoers@apache.org>
Mon, 4 Apr 2022 08:44:16 +0000 (01:44 -0700)
37 files changed:
.github/workflows/build.yml [new file with mode: 0644]
.github/workflows/codeql-analysis.yml [new file with mode: 0644]
.gitignore [new file with mode: 0644]
flume-elasticsearch-sink/pom.xml [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java [new file with mode: 0644]
flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java [new file with mode: 0644]
flume-elasticsearch-sink/src/test/resources/log4j2.xml [new file with mode: 0644]
pom.xml [new file with mode: 0644]

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
new file mode 100644 (file)
index 0000000..96130aa
--- /dev/null
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache license, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the license for the specific language governing permissions and
+# limitations under the license.
+
+name: build
+
+on:
+  push:
+    branches:
+      - trunk
+  pull_request:
+
+jobs:
+  build:
+
+    runs-on: ${{ matrix.os }}
+
+    strategy:
+      matrix:
+        os: [ ubuntu-latest, macos-latest ]
+
+    steps:
+
+      - name: Checkout repository
+        uses: actions/checkout@v2
+
+      # JDK 8 is needed for the build, and it is the primary bytecode target.
+      - name: Setup JDK 8
+        uses: actions/setup-java@v2.3.0
+        with:
+          distribution: temurin
+          java-version: 8
+          java-package: jdk
+          architecture: x64
+          cache: maven
+
+      - name: Inspect environment (Linux)
+        if: runner.os == 'Linux'
+        run: env | grep '^JAVA'
+
+      - name: Inspect environment (MacOS)
+        if: runner.os == 'macOS'
+        run: env | grep '^JAVA'
+
+      - name: Build with Maven
+        timeout-minutes: 120
+        shell: bash
+        run: |
+          ./mvnw clean verify -DredirectTestOutput=true \
+          --show-version --batch-mode --errors --no-transfer-progress \
+          -DtrimStackTrace=false \
+          -Dsurefire.rerunFailingTestsCount=2
diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml
new file mode 100644 (file)
index 0000000..20d8be9
--- /dev/null
@@ -0,0 +1,70 @@
+# For most projects, this workflow file will not need changing; you simply need
+# to commit it to your repository.
+#
+# You may wish to alter this file to override the set of languages analyzed,
+# or to provide custom queries or build logic.
+#
+# ******** NOTE ********
+# We have attempted to detect the languages in your repository. Please check
+# the `language` matrix defined below to confirm you have the correct set of
+# supported CodeQL languages.
+#
+name: "CodeQL"
+
+on:
+  push:
+    branches: [ trunk ]
+  pull_request:
+    # The branches below must be a subset of the branches above
+    branches: [ trunk ]
+  schedule:
+    - cron: '15 0 * * 5'
+
+jobs:
+  analyze:
+    name: Analyze
+    runs-on: ubuntu-latest
+    permissions:
+      actions: read
+      contents: read
+      security-events: write
+
+    strategy:
+      fail-fast: false
+      matrix:
+        language: [ 'java', 'python' ]
+        # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
+        # Learn more about CodeQL language support at https://git.io/codeql-language-support
+
+    steps:
+    - name: Checkout repository
+      uses: actions/checkout@v2
+
+    # Initializes the CodeQL tools for scanning.
+    - name: Initialize CodeQL
+      uses: github/codeql-action/init@v1
+      with:
+        languages: ${{ matrix.language }}
+        # If you wish to specify custom queries, you can do so here or in a config file.
+        # By default, queries listed here will override any specified in a config file.
+        # Prefix the list here with "+" to use these queries and those in the config file.
+        # queries: ./path/to/local/query, your-org/your-repo/queries@main
+
+    # Autobuild attempts to build any compiled languages  (C/C++, C#, or Java).
+    # If this step fails, then you should remove it and run the build manually (see below)
+    - name: Autobuild
+      uses: github/codeql-action/autobuild@v1
+
+    # â„šī¸ Command-line programs to run using the OS shell.
+    # đŸ“š https://git.io/JvXDl
+
+    # âœī¸ If the Autobuild fails above, remove it and uncomment the following three lines
+    #    and modify them (or add more) to build your code if your project
+    #    uses a compiled language
+
+    #- run: |
+    #   make bootstrap
+    #   make release
+
+    - name: Perform CodeQL Analysis
+      uses: github/codeql-action/analyze@v1
diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..9a0a932
--- /dev/null
@@ -0,0 +1,22 @@
+# Lines that start with '#' are comments.
+*~
+*.diff
+*#
+.classpath
+.project
+.settings
+bin/flume-env.sh
+conf/flume-site.xml
+bin/.settings
+.eclipse
+pmd_report.html
+*/bin
+target
+patchprocess
+derby.log
+.idea
+*.iml
+nb-configuration.xml
+.DS_Store
+/.mvn/wrapper/maven-wrapper.jar
+**/metastore_db
diff --git a/flume-elasticsearch-sink/pom.xml b/flume-elasticsearch-sink/pom.xml
new file mode 100644 (file)
index 0000000..a5beb8e
--- /dev/null
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+    <artifactId>flume-search</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume</groupId>
+  <artifactId>flume-elasticsearch-sink</artifactId>
+  <name>Apache Flume ElasticSearch Sink</name>
+
+  <properties>
+    <!-- TODO fix spotbugs/pmd violations -->
+    <spotbugs.maxAllowedViolations>8</spotbugs.maxAllowedViolations>
+    <pmd.maxAllowedViolations>10</pmd.maxAllowedViolations>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.elasticsearch</groupId>
+      <artifactId>elasticsearch</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-1.2-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+</project>
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
new file mode 100644 (file)
index 0000000..754155c
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+import org.apache.flume.formatter.output.BucketPath;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Abstract base class for custom implementations of
+ * {@link ElasticSearchIndexRequestBuilderFactory}.
+ */
+public abstract class AbstractElasticSearchIndexRequestBuilderFactory
+    implements ElasticSearchIndexRequestBuilderFactory {
+
+  /**
+   * {@link FastDateFormat} to use for index names
+   *   in {@link #getIndexName(String, long)}
+   */
+  protected final FastDateFormat fastDateFormat;
+
+  /**
+   * Constructor for subclasses
+   * @param fastDateFormat {@link FastDateFormat} to use for index names
+   */
+  protected AbstractElasticSearchIndexRequestBuilderFactory(FastDateFormat fastDateFormat) {
+    this.fastDateFormat = fastDateFormat;
+  }
+
+  /**
+   * @see Configurable
+   */
+  @Override
+  public abstract void configure(Context arg0);
+
+  /**
+   * @see ConfigurableComponent
+   */
+  @Override
+  public abstract void configure(ComponentConfiguration arg0);
+
+  /**
+   * Creates and prepares an {@link IndexRequestBuilder} from the supplied
+   * {@link Client} via delegation to the subclass-hook template methods
+   * {@link #getIndexName(String, long)} and
+   * {@link #prepareIndexRequest(IndexRequestBuilder, String, String, Event)}
+   */
+  @Override
+  public IndexRequestBuilder createIndexRequest(Client client,
+        String indexPrefix, String indexType, Event event) throws IOException {
+    IndexRequestBuilder request = prepareIndex(client);
+    String realIndexPrefix = BucketPath.escapeString(indexPrefix, event.getHeaders());
+    String realIndexType = BucketPath.escapeString(indexType, event.getHeaders());
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(event);
+    long timestamp = timestampedEvent.getTimestamp();
+
+    String indexName = getIndexName(realIndexPrefix, timestamp);
+    prepareIndexRequest(request, indexName, realIndexType, timestampedEvent);
+    return request;
+  }
+
+  @VisibleForTesting
+  IndexRequestBuilder prepareIndex(Client client) {
+    return client.prepareIndex();
+  }
+
+  /**
+   * Gets the name of the index to use for an index request
+   * @param indexPrefix
+   *          Prefix of index name to use -- as configured on the sink
+   * @param timestamp
+   *          timestamp (millis) to format / use
+   * @return index name of the form 'indexPrefix-formattedTimestamp'
+   */
+  protected String getIndexName(String indexPrefix, long timestamp) {
+    return new StringBuilder(indexPrefix).append('-')
+      .append(fastDateFormat.format(timestamp)).toString();
+  }
+
+  /**
+   * Prepares an ElasticSearch {@link IndexRequestBuilder} instance
+   * @param indexRequest
+   *          The (empty) ElasticSearch {@link IndexRequestBuilder} to prepare
+   * @param indexName
+   *          Index name to use -- as per {@link #getIndexName(String, long)}
+   * @param indexType
+   *          Index type to use -- as configured on the sink
+   * @param event
+   *          Flume event to serialize and add to index request
+   * @throws IOException
+   *           If an error occurs e.g. during serialization
+  */
+  protected abstract void prepareIndexRequest(
+      IndexRequestBuilder indexRequest, String indexName,
+      String indexType, Event event) throws IOException;
+
+}
\ No newline at end of file
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
new file mode 100644 (file)
index 0000000..4fda1b8
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.elasticsearch.common.jackson.core.JsonParseException;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+
+/**
+ * Utility methods for using ElasticSearch {@link XContentBuilder}
+ */
+public class ContentBuilderUtil {
+
+  private static final Charset charset = Charset.defaultCharset();
+
+  private ContentBuilderUtil() {
+  }
+
+  public static void appendField(XContentBuilder builder, String field,
+      byte[] data) throws IOException {
+    XContentType contentType = XContentFactory.xContentType(data);
+    if (contentType == null) {
+      addSimpleField(builder, field, data);
+    } else {
+      addComplexField(builder, field, contentType, data);
+    }
+  }
+
+  public static void addSimpleField(XContentBuilder builder, String fieldName,
+      byte[] data) throws IOException {
+    builder.field(fieldName, new String(data, charset));
+  }
+
+  public static void addComplexField(XContentBuilder builder, String fieldName,
+      XContentType contentType, byte[] data) throws IOException {
+    XContentParser parser = null;
+    try {
+      // Elasticsearch will accept JSON directly but we need to validate that
+      // the incoming event is JSON first. Sadly, the elasticsearch JSON parser
+      // is a stream parser so we need to instantiate it, parse the event to
+      // validate it, then instantiate it again to provide the JSON to
+      // elasticsearch.
+      // If validation fails then the incoming event is submitted to
+      // elasticsearch as plain text.
+      parser = XContentFactory.xContent(contentType).createParser(data);
+      while (parser.nextToken() != null) {};
+
+      // If the JSON is valid then include it
+      parser = XContentFactory.xContent(contentType).createParser(data);
+      // Add the field name, but not the value.
+      builder.field(fieldName);
+      // This will add the whole parsed content as the value of the field.
+      builder.copyCurrentStructure(parser);
+    } catch (JsonParseException ex) {
+      // If we get an exception here the most likely cause is nested JSON that
+      // can't be figured out in the body. At this point just push it through
+      // as is
+      addSimpleField(builder, fieldName, data);
+    } finally {
+      if (parser != null) {
+        parser.close();
+      }
+    }
+  }
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java
new file mode 100644 (file)
index 0000000..aa7ad39
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+/**
+ * Basic serializer that serializes the event body and header fields into
+ * individual fields</p>
+ *
+ * A best effort will be used to determine the content-type, if it cannot be
+ * determined fields will be indexed as Strings
+ */
+public class ElasticSearchDynamicSerializer implements
+    ElasticSearchEventSerializer {
+
+  @Override
+  public void configure(Context context) {
+    // NO-OP...
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+    // NO-OP...
+  }
+
+  @Override
+  public XContentBuilder getContentBuilder(Event event) throws IOException {
+    XContentBuilder builder = jsonBuilder().startObject();
+    appendBody(builder, event);
+    appendHeaders(builder, event);
+    return builder;
+  }
+
+  private void appendBody(XContentBuilder builder, Event event)
+      throws IOException {
+    ContentBuilderUtil.appendField(builder, "body", event.getBody());
+  }
+
+  private void appendHeaders(XContentBuilder builder, Event event)
+      throws IOException {
+    Map<String, String> headers = event.getHeaders();
+    for (String key : headers.keySet()) {
+      ContentBuilderUtil.appendField(builder, key,
+          headers.get(key).getBytes(charset));
+    }
+  }
+
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
new file mode 100644 (file)
index 0000000..c89d627
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+import org.elasticsearch.common.io.BytesStream;
+
+/**
+ * Interface for an event serializer which serializes the headers and body of an
+ * event to write them to ElasticSearch. This is configurable, so any config
+ * params required should be taken through this.
+ */
+public interface ElasticSearchEventSerializer extends Configurable,
+    ConfigurableComponent {
+
+  public static final Charset charset = Charset.defaultCharset();
+
+  /**
+   * Return an {@link BytesStream} made up of the serialized flume event
+   * @param event
+   *          The flume event to serialize
+   * @return A {@link BytesStream} used to write to ElasticSearch
+   * @throws IOException
+   *           If an error occurs during serialization
+   */
+  abstract BytesStream getContentBuilder(Event event) throws IOException;
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
new file mode 100644 (file)
index 0000000..f76308c
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+
+import java.io.IOException;
+import java.util.TimeZone;
+
+/**
+ * Interface for creating ElasticSearch {@link IndexRequestBuilder} instances
+ * from serialized flume events. This is configurable, so any config params
+ * required should be taken through this.
+ */
+public interface ElasticSearchIndexRequestBuilderFactory extends Configurable,
+    ConfigurableComponent {
+
+  static final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd",
+      TimeZone.getTimeZone("Etc/UTC"));
+
+  /**
+   * @param client
+   *          ElasticSearch {@link Client} to prepare index from
+   * @param indexPrefix
+   *          Prefix of index name to use -- as configured on the sink
+   * @param indexType
+   *          Index type to use -- as configured on the sink
+   * @param event
+   *          Flume event to serialize and add to index request
+   * @return prepared ElasticSearch {@link IndexRequestBuilder} instance
+   * @throws IOException
+   *           If an error occurs e.g. during serialization
+   */
+  IndexRequestBuilder createIndexRequest(Client client, String indexPrefix,
+      String indexType, Event event) throws IOException;
+
+
+
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java
new file mode 100644 (file)
index 0000000..3638368
--- /dev/null
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+/**
+ * Serialize flume events into the same format LogStash uses</p>
+ *
+ * This can be used to send events to ElasticSearch and use clients such as
+ * Kabana which expect Logstash formated indexes
+ *
+ * <pre>
+ * {
+ *    "@timestamp": "2010-12-21T21:48:33.309258Z",
+ *    "@tags": [ "array", "of", "tags" ],
+ *    "@type": "string",
+ *    "@source": "source of the event, usually a URL."
+ *    "@source_host": ""
+ *    "@source_path": ""
+ *    "@fields":{
+ *       # a set of fields for this event
+ *       "user": "jordan",
+ *       "command": "shutdown -r":
+ *     }
+ *     "@message": "the original plain-text message"
+ *   }
+ * </pre>
+ *
+ * If the following headers are present, they will map to the above logstash
+ * output as long as the logstash fields are not already present.</p>
+ *
+ * <pre>
+ *  timestamp: long -> @timestamp:Date
+ *  host: String -> @source_host: String
+ *  src_path: String -> @source_path: String
+ *  type: String -> @type: String
+ *  source: String -> @source: String
+ * </pre>
+ *
+ * @see https
+ *      ://github.com/logstash/logstash/wiki/logstash%27s-internal-message-
+ *      format
+ */
+public class ElasticSearchLogStashEventSerializer implements
+    ElasticSearchEventSerializer {
+
+  @Override
+  public XContentBuilder getContentBuilder(Event event) throws IOException {
+    XContentBuilder builder = jsonBuilder().startObject();
+    appendBody(builder, event);
+    appendHeaders(builder, event);
+    return builder;
+  }
+
+  private void appendBody(XContentBuilder builder, Event event)
+      throws IOException, UnsupportedEncodingException {
+    byte[] body = event.getBody();
+    ContentBuilderUtil.appendField(builder, "@message", body);
+  }
+
+  private void appendHeaders(XContentBuilder builder, Event event)
+      throws IOException {
+    Map<String, String> headers = Maps.newHashMap(event.getHeaders());
+
+    String timestamp = headers.get("timestamp");
+    if (!StringUtils.isBlank(timestamp)
+        && StringUtils.isBlank(headers.get("@timestamp"))) {
+      long timestampMs = Long.parseLong(timestamp);
+      builder.field("@timestamp", new Date(timestampMs));
+    }
+
+    String source = headers.get("source");
+    if (!StringUtils.isBlank(source)
+        && StringUtils.isBlank(headers.get("@source"))) {
+      ContentBuilderUtil.appendField(builder, "@source",
+          source.getBytes(charset));
+    }
+
+    String type = headers.get("type");
+    if (!StringUtils.isBlank(type)
+        && StringUtils.isBlank(headers.get("@type"))) {
+      ContentBuilderUtil.appendField(builder, "@type", type.getBytes(charset));
+    }
+
+    String host = headers.get("host");
+    if (!StringUtils.isBlank(host)
+        && StringUtils.isBlank(headers.get("@source_host"))) {
+      ContentBuilderUtil.appendField(builder, "@source_host",
+          host.getBytes(charset));
+    }
+
+    String srcPath = headers.get("src_path");
+    if (!StringUtils.isBlank(srcPath)
+        && StringUtils.isBlank(headers.get("@source_path"))) {
+      ContentBuilderUtil.appendField(builder, "@source_path",
+          srcPath.getBytes(charset));
+    }
+
+    builder.startObject("@fields");
+    for (String key : headers.keySet()) {
+      byte[] val = headers.get(key).getBytes(charset);
+      ContentBuilderUtil.appendField(builder, key, val);
+    }
+    builder.endObject();
+  }
+
+  @Override
+  public void configure(Context context) {
+    // NO-OP...
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+    // NO-OP...
+  }
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
new file mode 100644 (file)
index 0000000..05eb5ff
--- /dev/null
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_CLUSTER_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_TTL;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER_PREFIX;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL_REGEX;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.BatchSizeSupported;
+import org.apache.flume.formatter.output.BucketPath;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.flume.sink.elasticsearch.client.ElasticSearchClient;
+import org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_PREFIX;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_CLIENT_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_NAME_BUILDER_CLASS;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_SERIALIZER_CLASS;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER_PREFIX;
+
+/**
+ * A sink which reads events from a channel and writes them to ElasticSearch
+ * based on the work done by https://github.com/Aconex/elasticflume.git.</p>
+ * 
+ * This sink supports batch reading of events from the channel and writing them
+ * to ElasticSearch.</p>
+ * 
+ * Indexes will be rolled daily using the format 'indexname-YYYY-MM-dd' to allow
+ * easier management of the index</p>
+ * 
+ * This sink must be configured with with mandatory parameters detailed in
+ * {@link ElasticSearchSinkConstants}</p> It is recommended as a secondary step
+ * the ElasticSearch indexes are optimized for the specified serializer. This is
+ * not handled by the sink but is typically done by deploying a config template
+ * alongside the ElasticSearch deploy</p>
+ * 
+ * @see http
+ *      ://www.elasticsearch.org/guide/reference/api/admin-indices-templates.
+ *      html
+ */
+public class ElasticSearchSink extends AbstractSink implements Configurable, BatchSizeSupported {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(ElasticSearchSink.class);
+
+  // Used for testing
+  private boolean isLocal = false;
+  private final CounterGroup counterGroup = new CounterGroup();
+
+  private static final int defaultBatchSize = 100;
+
+  private int batchSize = defaultBatchSize;
+  private long ttlMs = DEFAULT_TTL;
+  private String clusterName = DEFAULT_CLUSTER_NAME;
+  private String indexName = DEFAULT_INDEX_NAME;
+  private String indexType = DEFAULT_INDEX_TYPE;
+  private String clientType = DEFAULT_CLIENT_TYPE;
+  private final Pattern pattern = Pattern.compile(TTL_REGEX,
+      Pattern.CASE_INSENSITIVE);
+  private Matcher matcher = pattern.matcher("");
+
+  private String[] serverAddresses = null;
+
+  private ElasticSearchClient client = null;
+  private Context elasticSearchClientContext = null;
+
+  private ElasticSearchIndexRequestBuilderFactory indexRequestFactory;
+  private ElasticSearchEventSerializer eventSerializer;
+  private IndexNameBuilder indexNameBuilder;
+  private SinkCounter sinkCounter;
+
+  /**
+   * Create an {@link ElasticSearchSink} configured using the supplied
+   * configuration
+   */
+  public ElasticSearchSink() {
+    this(false);
+  }
+
+  /**
+   * Create an {@link ElasticSearchSink}</p>
+   * 
+   * @param isLocal
+   *          If <tt>true</tt> sink will be configured to only talk to an
+   *          ElasticSearch instance hosted in the same JVM, should always be
+   *          false is production
+   * 
+   */
+  @VisibleForTesting
+  ElasticSearchSink(boolean isLocal) {
+    this.isLocal = isLocal;
+  }
+
+  @VisibleForTesting
+  String[] getServerAddresses() {
+    return serverAddresses;
+  }
+
+  @VisibleForTesting
+  String getClusterName() {
+    return clusterName;
+  }
+
+  @VisibleForTesting
+  String getIndexName() {
+    return indexName;
+  }
+
+  @VisibleForTesting
+  String getIndexType() {
+    return indexType;
+  }
+
+  @VisibleForTesting
+  long getTTLMs() {
+    return ttlMs;
+  }
+
+  @VisibleForTesting
+  ElasticSearchEventSerializer getEventSerializer() {
+    return eventSerializer;
+  }
+
+  @VisibleForTesting
+  IndexNameBuilder getIndexNameBuilder() {
+    return indexNameBuilder;
+  }
+
+  @Override
+  public long getBatchSize() {
+    return batchSize;
+  }
+
+  @Override
+  public Status process() throws EventDeliveryException {
+    logger.debug("processing...");
+    Status status = Status.READY;
+    Channel channel = getChannel();
+    Transaction txn = channel.getTransaction();
+    try {
+      txn.begin();
+      int count;
+      for (count = 0; count < batchSize; ++count) {
+        Event event = channel.take();
+
+        if (event == null) {
+          break;
+        }
+        String realIndexType = BucketPath.escapeString(indexType, event.getHeaders());
+        client.addEvent(event, indexNameBuilder, realIndexType, ttlMs);
+      }
+
+      if (count <= 0) {
+        sinkCounter.incrementBatchEmptyCount();
+        counterGroup.incrementAndGet("channel.underflow");
+        status = Status.BACKOFF;
+      } else {
+        if (count < batchSize) {
+          sinkCounter.incrementBatchUnderflowCount();
+          status = Status.BACKOFF;
+        } else {
+          sinkCounter.incrementBatchCompleteCount();
+        }
+
+        sinkCounter.addToEventDrainAttemptCount(count);
+        client.execute();
+      }
+      txn.commit();
+      sinkCounter.addToEventDrainSuccessCount(count);
+      counterGroup.incrementAndGet("transaction.success");
+    } catch (Throwable ex) {
+      try {
+        txn.rollback();
+        counterGroup.incrementAndGet("transaction.rollback");
+      } catch (Exception ex2) {
+        logger.error(
+            "Exception in rollback. Rollback might not have been successful.",
+            ex2);
+      }
+
+      if (ex instanceof Error || ex instanceof RuntimeException) {
+        logger.error("Failed to commit transaction. Transaction rolled back.",
+            ex);
+        Throwables.propagate(ex);
+      } else {
+        logger.error("Failed to commit transaction. Transaction rolled back.",
+            ex);
+        throw new EventDeliveryException(
+            "Failed to commit transaction. Transaction rolled back.", ex);
+      }
+    } finally {
+      txn.close();
+    }
+    return status;
+  }
+
+  @Override
+  public void configure(Context context) {
+    if (!isLocal) {
+      if (StringUtils.isNotBlank(context.getString(HOSTNAMES))) {
+        serverAddresses = StringUtils.deleteWhitespace(
+            context.getString(HOSTNAMES)).split(",");
+      }
+      Preconditions.checkState(serverAddresses != null
+          && serverAddresses.length > 0, "Missing Param:" + HOSTNAMES);
+    }
+
+    if (StringUtils.isNotBlank(context.getString(INDEX_NAME))) {
+      this.indexName = context.getString(INDEX_NAME);
+    }
+
+    if (StringUtils.isNotBlank(context.getString(INDEX_TYPE))) {
+      this.indexType = context.getString(INDEX_TYPE);
+    }
+
+    if (StringUtils.isNotBlank(context.getString(CLUSTER_NAME))) {
+      this.clusterName = context.getString(CLUSTER_NAME);
+    }
+
+    if (StringUtils.isNotBlank(context.getString(BATCH_SIZE))) {
+      this.batchSize = Integer.parseInt(context.getString(BATCH_SIZE));
+    }
+
+    if (StringUtils.isNotBlank(context.getString(TTL))) {
+      this.ttlMs = parseTTL(context.getString(TTL));
+      Preconditions.checkState(ttlMs > 0, TTL
+          + " must be greater than 0 or not set.");
+    }
+
+    if (StringUtils.isNotBlank(context.getString(CLIENT_TYPE))) {
+      clientType = context.getString(CLIENT_TYPE);
+    }
+
+    elasticSearchClientContext = new Context();
+    elasticSearchClientContext.putAll(context.getSubProperties(CLIENT_PREFIX));
+
+    String serializerClazz = DEFAULT_SERIALIZER_CLASS;
+    if (StringUtils.isNotBlank(context.getString(SERIALIZER))) {
+      serializerClazz = context.getString(SERIALIZER);
+    }
+
+    Context serializerContext = new Context();
+    serializerContext.putAll(context.getSubProperties(SERIALIZER_PREFIX));
+
+    try {
+      @SuppressWarnings("unchecked")
+      Class<? extends Configurable> clazz = (Class<? extends Configurable>) Class
+          .forName(serializerClazz);
+      Configurable serializer = clazz.newInstance();
+
+      if (serializer instanceof ElasticSearchIndexRequestBuilderFactory) {
+        indexRequestFactory
+            = (ElasticSearchIndexRequestBuilderFactory) serializer;
+        indexRequestFactory.configure(serializerContext);
+      } else if (serializer instanceof ElasticSearchEventSerializer) {
+        eventSerializer = (ElasticSearchEventSerializer) serializer;
+        eventSerializer.configure(serializerContext);
+      } else {
+        throw new IllegalArgumentException(serializerClazz
+            + " is not an ElasticSearchEventSerializer");
+      }
+    } catch (Exception e) {
+      logger.error("Could not instantiate event serializer.", e);
+      Throwables.propagate(e);
+    }
+
+    if (sinkCounter == null) {
+      sinkCounter = new SinkCounter(getName());
+    }
+
+    String indexNameBuilderClass = DEFAULT_INDEX_NAME_BUILDER_CLASS;
+    if (StringUtils.isNotBlank(context.getString(INDEX_NAME_BUILDER))) {
+      indexNameBuilderClass = context.getString(INDEX_NAME_BUILDER);
+    }
+
+    Context indexnameBuilderContext = new Context();
+    serializerContext.putAll(
+            context.getSubProperties(INDEX_NAME_BUILDER_PREFIX));
+
+    try {
+      @SuppressWarnings("unchecked")
+      Class<? extends IndexNameBuilder> clazz
+              = (Class<? extends IndexNameBuilder>) Class
+              .forName(indexNameBuilderClass);
+      indexNameBuilder = clazz.newInstance();
+      indexnameBuilderContext.put(INDEX_NAME, indexName);
+      indexNameBuilder.configure(indexnameBuilderContext);
+    } catch (Exception e) {
+      logger.error("Could not instantiate index name builder.", e);
+      Throwables.propagate(e);
+    }
+
+    if (sinkCounter == null) {
+      sinkCounter = new SinkCounter(getName());
+    }
+
+    Preconditions.checkState(StringUtils.isNotBlank(indexName),
+        "Missing Param:" + INDEX_NAME);
+    Preconditions.checkState(StringUtils.isNotBlank(indexType),
+        "Missing Param:" + INDEX_TYPE);
+    Preconditions.checkState(StringUtils.isNotBlank(clusterName),
+        "Missing Param:" + CLUSTER_NAME);
+    Preconditions.checkState(batchSize >= 1, BATCH_SIZE
+        + " must be greater than 0");
+  }
+
+  @Override
+  public void start() {
+    ElasticSearchClientFactory clientFactory = new ElasticSearchClientFactory();
+
+    logger.info("ElasticSearch sink {} started");
+    sinkCounter.start();
+    try {
+      if (isLocal) {
+        client = clientFactory.getLocalClient(
+            clientType, eventSerializer, indexRequestFactory);
+      } else {
+        client = clientFactory.getClient(clientType, serverAddresses,
+            clusterName, eventSerializer, indexRequestFactory);
+        client.configure(elasticSearchClientContext);
+      }
+      sinkCounter.incrementConnectionCreatedCount();
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      sinkCounter.incrementConnectionFailedCount();
+      if (client != null) {
+        client.close();
+        sinkCounter.incrementConnectionClosedCount();
+      }
+    }
+
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    logger.info("ElasticSearch sink {} stopping");
+    if (client != null) {
+      client.close();
+    }
+    sinkCounter.incrementConnectionClosedCount();
+    sinkCounter.stop();
+    super.stop();
+  }
+
+  /*
+   * Returns TTL value of ElasticSearch index in milliseconds when TTL specifier
+   * is "ms" / "s" / "m" / "h" / "d" / "w". In case of unknown specifier TTL is
+   * not set. When specifier is not provided it defaults to days in milliseconds
+   * where the number of days is parsed integer from TTL string provided by
+   * user. <p> Elasticsearch supports ttl values being provided in the format:
+   * 1d / 1w / 1ms / 1s / 1h / 1m specify a time unit like d (days), m
+   * (minutes), h (hours), ms (milliseconds) or w (weeks), milliseconds is used
+   * as default unit.
+   * http://www.elasticsearch.org/guide/reference/mapping/ttl-field/.
+   * 
+   * @param ttl TTL value provided by user in flume configuration file for the
+   * sink
+   * 
+   * @return the ttl value in milliseconds
+   */
+  private long parseTTL(String ttl) {
+    matcher = matcher.reset(ttl);
+    while (matcher.find()) {
+      if (matcher.group(2).equals("ms")) {
+        return Long.parseLong(matcher.group(1));
+      } else if (matcher.group(2).equals("s")) {
+        return TimeUnit.SECONDS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("m")) {
+        return TimeUnit.MINUTES.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("h")) {
+        return TimeUnit.HOURS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("d")) {
+        return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("w")) {
+        return TimeUnit.DAYS.toMillis(7 * Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("")) {
+        logger.info("TTL qualifier is empty. Defaulting to day qualifier.");
+        return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else {
+        logger.debug("Unknown TTL qualifier provided. Setting TTL to 0.");
+        return 0;
+      }
+    }
+    logger.info("TTL not provided. Skipping the TTL config by returning 0.");
+    return 0;
+  }
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
new file mode 100644 (file)
index 0000000..da88def
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+public class ElasticSearchSinkConstants {
+
+  /**
+   * Comma separated list of hostname:port, if the port is not present the
+   * default port '9300' will be used</p>
+   * Example:
+   * <pre>
+   *  127.0.0.1:92001,127.0.0.2:9300
+   * </pre>
+   */
+  public static final String HOSTNAMES = "hostNames";
+
+  /**
+   * The name to index the document to, defaults to 'flume'</p>
+   * The current date in the format 'yyyy-MM-dd' will be appended to this name,
+   * for example 'foo' will result in a daily index of 'foo-yyyy-MM-dd'
+   */
+  public static final String INDEX_NAME = "indexName";
+
+  /**
+   * The type to index the document to, defaults to 'log'
+   */
+  public static final String INDEX_TYPE = "indexType";
+
+  /**
+   * Name of the ElasticSearch cluster to connect to
+   */
+  public static final String CLUSTER_NAME = "clusterName";
+
+  /**
+   * Maximum number of events the sink should take from the channel per
+   * transaction, if available. Defaults to 100
+   */
+  public static final String BATCH_SIZE = "batchSize";
+
+  /**
+   * TTL in days, when set will cause the expired documents to be deleted
+   * automatically, if not set documents will never be automatically deleted
+   */
+  public static final String TTL = "ttl";
+
+  /**
+   * The fully qualified class name of the serializer the sink should use.
+   */
+  public static final String SERIALIZER = "serializer";
+
+  /**
+   * Configuration to pass to the serializer.
+   */
+  public static final String SERIALIZER_PREFIX = SERIALIZER + ".";
+
+  /**
+   * The fully qualified class name of the index name builder the sink
+   * should use to determine name of index where the event should be sent.
+   */
+  public static final String INDEX_NAME_BUILDER = "indexNameBuilder";
+
+  /**
+   * The fully qualified class name of the index name builder the sink
+   * should use to determine name of index where the event should be sent.
+   */
+  public static final String INDEX_NAME_BUILDER_PREFIX
+          = INDEX_NAME_BUILDER + ".";
+
+  /**
+   * The client type used for sending bulks to ElasticSearch
+   */
+  public static final String CLIENT_TYPE = "client";
+
+  /**
+   * The client prefix to extract the configuration that will be passed to
+   * elasticsearch client.
+   */
+  public static final String CLIENT_PREFIX = CLIENT_TYPE + ".";
+
+  /**
+   * DEFAULTS USED BY THE SINK
+   */
+
+  public static final int DEFAULT_PORT = 9300;
+  public static final int DEFAULT_TTL = -1;
+  public static final String DEFAULT_INDEX_NAME = "flume";
+  public static final String DEFAULT_INDEX_TYPE = "log";
+  public static final String DEFAULT_CLUSTER_NAME = "elasticsearch";
+  public static final String DEFAULT_CLIENT_TYPE = "transport";
+  public static final String TTL_REGEX = "^(\\d+)(\\D*)";
+  public static final String DEFAULT_SERIALIZER_CLASS = "org.apache.flume." +
+          "sink.elasticsearch.ElasticSearchLogStashEventSerializer";
+  public static final String DEFAULT_INDEX_NAME_BUILDER_CLASS =
+          "org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder";
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
new file mode 100644 (file)
index 0000000..d6cca50
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.common.io.BytesStream;
+
+/**
+ * Default implementation of {@link ElasticSearchIndexRequestBuilderFactory}.
+ * It serializes flume events using the
+ * {@link ElasticSearchEventSerializer} instance configured on the sink.
+ */
+public class EventSerializerIndexRequestBuilderFactory
+    extends AbstractElasticSearchIndexRequestBuilderFactory {
+
+  protected final ElasticSearchEventSerializer serializer;
+
+  public EventSerializerIndexRequestBuilderFactory(
+      ElasticSearchEventSerializer serializer) {
+    this(serializer, ElasticSearchIndexRequestBuilderFactory.df);
+  }
+
+  protected EventSerializerIndexRequestBuilderFactory(
+      ElasticSearchEventSerializer serializer, FastDateFormat fdf) {
+    super(fdf);
+    this.serializer = serializer;
+  }
+
+  @Override
+  public void configure(Context context) {
+    serializer.configure(context);
+  }
+
+  @Override
+  public void configure(ComponentConfiguration config) {
+    serializer.configure(config);
+  }
+
+  @Override
+  protected void prepareIndexRequest(IndexRequestBuilder indexRequest,
+      String indexName, String indexType, Event event) throws IOException {
+    BytesStream contentBuilder = serializer.getContentBuilder(event);
+    indexRequest.setIndex(indexName)
+        .setType(indexType)
+        .setSource(contentBuilder.bytes());
+  }
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java
new file mode 100644 (file)
index 0000000..1dd4415
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+
+public interface IndexNameBuilder extends Configurable,
+        ConfigurableComponent {
+  /**
+   * Gets the name of the index to use for an index request
+   * @param event
+   *          Event which determines index name
+   * @return index name of the form 'indexPrefix-indexDynamicName'
+   */
+  public String getIndexName(Event event);
+  
+  /**
+   * Gets the prefix of index to use for an index request.
+   * @param event
+   *          Event which determines index name
+   * @return Index prefix name
+   */
+  public String getIndexPrefix(Event event);
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
new file mode 100644 (file)
index 0000000..801cac9
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.formatter.output.BucketPath;
+
+public class SimpleIndexNameBuilder implements IndexNameBuilder {
+
+  private String indexName;
+
+  @Override
+  public String getIndexName(Event event) {
+    return BucketPath.escapeString(indexName, event.getHeaders());
+  }
+
+  @Override
+  public String getIndexPrefix(Event event) {
+    return BucketPath.escapeString(indexName, event.getHeaders());
+  }
+
+  @Override
+  public void configure(Context context) {
+    indexName = context.getString(ElasticSearchSinkConstants.INDEX_NAME);
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
new file mode 100644 (file)
index 0000000..c651732
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.formatter.output.BucketPath;
+
+import java.util.TimeZone;
+
+/**
+ * Default index name builder. It prepares name of index using configured
+ * prefix and current timestamp. Default format of name is prefix-yyyy-MM-dd".
+ */
+public class TimeBasedIndexNameBuilder implements
+        IndexNameBuilder {
+
+  public static final String DATE_FORMAT = "dateFormat";
+  public static final String TIME_ZONE = "timeZone";
+
+  public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd";
+  public static final String DEFAULT_TIME_ZONE = "Etc/UTC";
+
+  private FastDateFormat fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd",
+      TimeZone.getTimeZone("Etc/UTC"));
+
+  private String indexPrefix;
+
+  @VisibleForTesting
+  FastDateFormat getFastDateFormat() {
+    return fastDateFormat;
+  }
+
+  /**
+   * Gets the name of the index to use for an index request
+   * @param event
+   *          Event for which the name of index has to be prepared
+   * @return index name of the form 'indexPrefix-formattedTimestamp'
+   */
+  @Override
+  public String getIndexName(Event event) {
+    TimestampedEvent timestampedEvent = new TimestampedEvent(event);
+    long timestamp = timestampedEvent.getTimestamp();
+    String realIndexPrefix = BucketPath.escapeString(indexPrefix, event.getHeaders());
+    return new StringBuilder(realIndexPrefix).append('-')
+      .append(fastDateFormat.format(timestamp)).toString();
+  }
+  
+  @Override
+  public String getIndexPrefix(Event event) {
+    return BucketPath.escapeString(indexPrefix, event.getHeaders());
+  }
+
+  @Override
+  public void configure(Context context) {
+    String dateFormatString = context.getString(DATE_FORMAT);
+    String timeZoneString = context.getString(TIME_ZONE);
+    if (StringUtils.isBlank(dateFormatString)) {
+      dateFormatString = DEFAULT_DATE_FORMAT;
+    }
+    if (StringUtils.isBlank(timeZoneString)) {
+      timeZoneString = DEFAULT_TIME_ZONE;
+    }
+    fastDateFormat = FastDateFormat.getInstance(dateFormatString,
+        TimeZone.getTimeZone(timeZoneString));
+    indexPrefix = context.getString(ElasticSearchSinkConstants.INDEX_NAME);
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java
new file mode 100644 (file)
index 0000000..c056839
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.joda.time.DateTimeUtils;
+
+import java.util.Map;
+
+/**
+ * {@link org.apache.flume.Event} implementation that has a timestamp.
+ * The timestamp is taken from (in order of precedence):<ol>
+ * <li>The "timestamp" header of the base event, if present</li>
+ * <li>The "@timestamp" header of the base event, if present</li>
+ * <li>The current time in millis, otherwise</li>
+ * </ol>
+ */
+final class TimestampedEvent extends SimpleEvent {
+
+  private final long timestamp;
+
+  TimestampedEvent(Event base) {
+    setBody(base.getBody());
+    Map<String, String> headers = Maps.newHashMap(base.getHeaders());
+    String timestampString = headers.get("timestamp");
+    if (StringUtils.isBlank(timestampString)) {
+      timestampString = headers.get("@timestamp");
+    }
+    if (StringUtils.isBlank(timestampString)) {
+      this.timestamp = DateTimeUtils.currentTimeMillis();
+      headers.put("timestamp", String.valueOf(timestamp ));
+    } else {
+      this.timestamp = Long.valueOf(timestampString);
+    }
+    setHeaders(headers);
+  }
+
+  long getTimestamp() {
+    return timestamp;
+  }
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java
new file mode 100644 (file)
index 0000000..655e00a
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+
+/**
+ * Interface for an ElasticSearch client which is responsible for sending bulks
+ * of events to ElasticSearch.
+ */
+public interface ElasticSearchClient extends Configurable {
+
+  /**
+   * Close connection to elastic search in client
+   */
+  void close();
+
+  /**
+   * Add new event to the bulk
+   *
+   * @param event
+   *    Flume Event
+   * @param indexNameBuilder
+   *    Index name builder which generates name of index to feed
+   * @param indexType
+   *    Name of type of document which will be sent to the elasticsearch cluster
+   * @param ttlMs
+   *    Time to live expressed in milliseconds. Value <= 0 is ignored
+   * @throws Exception
+   */
+  public void addEvent(Event event, IndexNameBuilder indexNameBuilder,
+      String indexType, long ttlMs) throws Exception;
+
+  /**
+   * Sends bulk to the elasticsearch cluster
+   *
+   * @throws Exception
+   */
+  void execute() throws Exception;
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
new file mode 100644 (file)
index 0000000..986fb2b
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory;
+
+/**
+ * Internal ElasticSearch client factory. Responsible for creating instance
+ * of ElasticSearch clients.
+ */
+public class ElasticSearchClientFactory {
+  public static final String TransportClient = "transport";
+  public static final String RestClient = "rest";
+
+  /**
+   *
+   * @param clientType
+   *    String representation of client type
+   * @param hostNames
+   *    Array of strings that represents hostnames with ports (hostname:port)
+   * @param clusterName
+   *    Elasticsearch cluster name used only by Transport Client
+   * @param serializer
+   *    Serializer of flume events to elasticsearch documents
+   * @return
+   */
+  public ElasticSearchClient getClient(String clientType, String[] hostNames,
+      String clusterName, ElasticSearchEventSerializer serializer,
+      ElasticSearchIndexRequestBuilderFactory indexBuilder) throws NoSuchClientTypeException {
+    if (clientType.equalsIgnoreCase(TransportClient) && serializer != null) {
+      return new ElasticSearchTransportClient(hostNames, clusterName, serializer);
+    } else if (clientType.equalsIgnoreCase(TransportClient) && indexBuilder != null) { 
+      return new ElasticSearchTransportClient(hostNames, clusterName, indexBuilder);
+    } else if (clientType.equalsIgnoreCase(RestClient) && serializer != null) {
+      return new ElasticSearchRestClient(hostNames, serializer);
+    }
+    throw new NoSuchClientTypeException();
+  }
+
+  /**
+   * Used for tests only. Creates local elasticsearch instance client.
+   *
+   * @param clientType Name of client to use
+   * @param serializer Serializer for the event
+   * @param indexBuilder Index builder factory
+   *
+   * @return Local elastic search instance client
+   */
+  public ElasticSearchClient getLocalClient(String clientType,
+                                            ElasticSearchEventSerializer serializer,
+                                            ElasticSearchIndexRequestBuilderFactory indexBuilder)
+      throws NoSuchClientTypeException {
+    if (clientType.equalsIgnoreCase(TransportClient) && serializer != null) {
+      return new ElasticSearchTransportClient(serializer);
+    } else if (clientType.equalsIgnoreCase(TransportClient) && indexBuilder != null)  {
+      return new ElasticSearchTransportClient(indexBuilder);
+    } else if (clientType.equalsIgnoreCase(RestClient)) {
+    }
+    throw new NoSuchClientTypeException();
+  }
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
new file mode 100644 (file)
index 0000000..e51efe2
--- /dev/null
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Rest ElasticSearch client which is responsible for sending bulks of events to
+ * ElasticSearch using ElasticSearch HTTP API. This is configurable, so any
+ * config params required should be taken through this.
+ */
+public class  ElasticSearchRestClient implements ElasticSearchClient {
+
+  private static final String INDEX_OPERATION_NAME = "index";
+  private static final String INDEX_PARAM = "_index";
+  private static final String TYPE_PARAM = "_type";
+  private static final String TTL_PARAM = "_ttl";
+  private static final String BULK_ENDPOINT = "_bulk";
+
+  private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestClient.class);
+
+  private final ElasticSearchEventSerializer serializer;
+  private final RoundRobinList<String> serversList;
+  
+  private StringBuilder bulkBuilder;
+  private HttpClient httpClient;
+  
+  public ElasticSearchRestClient(String[] hostNames,
+      ElasticSearchEventSerializer serializer) {
+
+    for (int i = 0; i < hostNames.length; ++i) {
+      if (!hostNames[i].contains("http://") && !hostNames[i].contains("https://")) {
+        hostNames[i] = "http://" + hostNames[i];
+      }
+    }
+    this.serializer = serializer;
+
+    serversList = new RoundRobinList<String>(Arrays.asList(hostNames));
+    httpClient = new DefaultHttpClient();
+    bulkBuilder = new StringBuilder();
+  }
+
+  @VisibleForTesting
+  public ElasticSearchRestClient(String[] hostNames,
+          ElasticSearchEventSerializer serializer, HttpClient client) {
+    this(hostNames, serializer);
+    httpClient = client;
+  }
+
+  @Override
+  public void configure(Context context) {
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String indexType,
+                       long ttlMs) throws Exception {
+    BytesReference content = serializer.getContentBuilder(event).bytes();
+    Map<String, Map<String, String>> parameters = new HashMap<String, Map<String, String>>();
+    Map<String, String> indexParameters = new HashMap<String, String>();
+    indexParameters.put(INDEX_PARAM, indexNameBuilder.getIndexName(event));
+    indexParameters.put(TYPE_PARAM, indexType);
+    if (ttlMs > 0) {
+      indexParameters.put(TTL_PARAM, Long.toString(ttlMs));
+    }
+    parameters.put(INDEX_OPERATION_NAME, indexParameters);
+
+    Gson gson = new Gson();
+    synchronized (bulkBuilder) {
+      bulkBuilder.append(gson.toJson(parameters));
+      bulkBuilder.append("\n");
+      bulkBuilder.append(content.toBytesArray().toUtf8());
+      bulkBuilder.append("\n");
+    }
+  }
+
+  @Override
+  public void execute() throws Exception {
+    int statusCode = 0, triesCount = 0;
+    HttpResponse response = null;
+    String entity;
+    synchronized (bulkBuilder) {
+      entity = bulkBuilder.toString();
+      bulkBuilder = new StringBuilder();
+    }
+
+    while (statusCode != HttpStatus.SC_OK && triesCount < serversList.size()) {
+      triesCount++;
+      String host = serversList.get();
+      String url = host + "/" + BULK_ENDPOINT;
+      HttpPost httpRequest = new HttpPost(url);
+      httpRequest.setEntity(new StringEntity(entity));
+      response = httpClient.execute(httpRequest);
+      statusCode = response.getStatusLine().getStatusCode();
+      logger.info("Status code from elasticsearch: " + statusCode);
+      if (response.getEntity() != null) {
+        logger.debug("Status message from elasticsearch: " +
+                     EntityUtils.toString(response.getEntity(), "UTF-8"));
+      }
+    }
+
+    if (statusCode != HttpStatus.SC_OK) {
+      if (response.getEntity() != null) {
+        throw new EventDeliveryException(EntityUtils.toString(response.getEntity(), "UTF-8"));
+      } else {
+        throw new EventDeliveryException("Elasticsearch status code was: " + statusCode);
+      }
+    }
+  }
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
new file mode 100644 (file)
index 0000000..2cf365e
--- /dev/null
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
+
+public class ElasticSearchTransportClient implements ElasticSearchClient {
+
+  public static final Logger logger = LoggerFactory
+      .getLogger(ElasticSearchTransportClient.class);
+
+  private InetSocketTransportAddress[] serverAddresses;
+  private ElasticSearchEventSerializer serializer;
+  private ElasticSearchIndexRequestBuilderFactory indexRequestBuilderFactory;
+  private BulkRequestBuilder bulkRequestBuilder;
+
+  private Client client;
+
+  @VisibleForTesting
+  InetSocketTransportAddress[] getServerAddresses() {
+    return serverAddresses;
+  }
+
+  @VisibleForTesting
+  void setBulkRequestBuilder(BulkRequestBuilder bulkRequestBuilder) {
+    this.bulkRequestBuilder = bulkRequestBuilder;
+  }
+
+  /**
+   * Transport client for external cluster
+   * 
+   * @param hostNames
+   * @param clusterName
+   * @param serializer
+   */
+  public ElasticSearchTransportClient(String[] hostNames, String clusterName,
+      ElasticSearchEventSerializer serializer) {
+    configureHostnames(hostNames);
+    this.serializer = serializer;
+    openClient(clusterName);
+  }
+
+  public ElasticSearchTransportClient(String[] hostNames, String clusterName,
+      ElasticSearchIndexRequestBuilderFactory indexBuilder) {
+    configureHostnames(hostNames);
+    this.indexRequestBuilderFactory = indexBuilder;
+    openClient(clusterName);
+  }
+  
+  /**
+   * Local transport client only for testing
+   * 
+   * @param indexBuilderFactory
+   */
+  public ElasticSearchTransportClient(ElasticSearchIndexRequestBuilderFactory indexBuilderFactory) {
+    this.indexRequestBuilderFactory = indexBuilderFactory;
+    openLocalDiscoveryClient();
+  }
+  
+  /**
+   * Local transport client only for testing
+   *
+   * @param serializer
+   */
+  public ElasticSearchTransportClient(ElasticSearchEventSerializer serializer) {
+    this.serializer = serializer;
+    openLocalDiscoveryClient();
+  }
+
+  /**
+   * Used for testing
+   *
+   * @param client
+   *    ElasticSearch Client
+   * @param serializer
+   *    Event Serializer
+   */
+  public ElasticSearchTransportClient(Client client,
+      ElasticSearchEventSerializer serializer) {
+    this.client = client;
+    this.serializer = serializer;
+  }
+
+  /**
+   * Used for testing
+   */
+  public ElasticSearchTransportClient(Client client,
+                                      ElasticSearchIndexRequestBuilderFactory requestBuilderFactory)
+      throws IOException {
+    this.client = client;
+    requestBuilderFactory.createIndexRequest(client, null, null, null);
+  }
+
+  private void configureHostnames(String[] hostNames) {
+    logger.warn(Arrays.toString(hostNames));
+    serverAddresses = new InetSocketTransportAddress[hostNames.length];
+    for (int i = 0; i < hostNames.length; i++) {
+      String[] hostPort = hostNames[i].trim().split(":");
+      String host = hostPort[0].trim();
+      int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1].trim())
+              : DEFAULT_PORT;
+      serverAddresses[i] = new InetSocketTransportAddress(host, port);
+    }
+  }
+  
+  @Override
+  public void close() {
+    if (client != null) {
+      client.close();
+    }
+    client = null;
+  }
+
+  @Override
+  public void addEvent(Event event, IndexNameBuilder indexNameBuilder,
+      String indexType, long ttlMs) throws Exception {
+    if (bulkRequestBuilder == null) {
+      bulkRequestBuilder = client.prepareBulk();
+    }
+
+    IndexRequestBuilder indexRequestBuilder = null;
+    if (indexRequestBuilderFactory == null) {
+      indexRequestBuilder = client
+          .prepareIndex(indexNameBuilder.getIndexName(event), indexType)
+          .setSource(serializer.getContentBuilder(event).bytes());
+    } else {
+      indexRequestBuilder = indexRequestBuilderFactory.createIndexRequest(
+          client, indexNameBuilder.getIndexPrefix(event), indexType, event);
+    }
+
+    if (ttlMs > 0) {
+      indexRequestBuilder.setTTL(ttlMs);
+    }
+    bulkRequestBuilder.add(indexRequestBuilder);
+  }
+
+  @Override
+  public void execute() throws Exception {
+    try {
+      BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
+      if (bulkResponse.hasFailures()) {
+        throw new EventDeliveryException(bulkResponse.buildFailureMessage());
+      }
+    } finally {
+      bulkRequestBuilder = client.prepareBulk();
+    }
+  }
+
+  /**
+   * Open client to elaticsearch cluster
+   * 
+   * @param clusterName
+   */
+  private void openClient(String clusterName) {
+    logger.info("Using ElasticSearch hostnames: {} ",
+        Arrays.toString(serverAddresses));
+    Settings settings = ImmutableSettings.settingsBuilder()
+        .put("cluster.name", clusterName).build();
+
+    TransportClient transportClient = new TransportClient(settings);
+    for (InetSocketTransportAddress host : serverAddresses) {
+      transportClient.addTransportAddress(host);
+    }
+    if (client != null) {
+      client.close();
+    }
+    client = transportClient;
+  }
+
+  /*
+   * FOR TESTING ONLY...
+   * 
+   * Opens a local discovery node for talking to an elasticsearch server running
+   * in the same JVM
+   */
+  private void openLocalDiscoveryClient() {
+    logger.info("Using ElasticSearch AutoDiscovery mode");
+    Node node = NodeBuilder.nodeBuilder().client(true).local(true).node();
+    if (client != null) {
+      client.close();
+    }
+    client = node.client();
+  }
+
+  @Override
+  public void configure(Context context) {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java
new file mode 100644 (file)
index 0000000..41fbe0d
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.elasticsearch.client;
+
+/**
+ * Exception class
+ */
+class NoSuchClientTypeException extends Exception {
+}
diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
new file mode 100644 (file)
index 0000000..4cbbe91
--- /dev/null
@@ -0,0 +1,44 @@
+package org.apache.flume.sink.elasticsearch.client;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class RoundRobinList<T> {
+
+  private Iterator<T> iterator;
+  private final Collection<T> elements;
+
+  public RoundRobinList(Collection<T> elements) {
+    this.elements = elements;
+    iterator = this.elements.iterator();
+  }
+
+  public synchronized T get() {
+    if (iterator.hasNext()) {
+      return iterator.next();
+    } else {
+      iterator = elements.iterator();
+      return iterator.next();
+    }
+  }
+
+  public int size() {
+    return elements.size();
+  }
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
new file mode 100644 (file)
index 0000000..9fbd747
--- /dev/null
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.gateway.Gateway;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.node.internal.InternalNode;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.joda.time.DateTimeUtils;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.junit.Assert.assertEquals;
+
+public abstract class AbstractElasticSearchSinkTest {
+
+  static final String DEFAULT_INDEX_NAME = "flume";
+  static final String DEFAULT_INDEX_TYPE = "log";
+  static final String DEFAULT_CLUSTER_NAME = "elasticsearch";
+  static final long FIXED_TIME_MILLIS = 123456789L;
+
+  Node node;
+  Client client;
+  String timestampedIndexName;
+  Map<String, String> parameters;
+
+  void initDefaults() {
+    parameters = Maps.newHashMap();
+    parameters.put(INDEX_NAME, DEFAULT_INDEX_NAME);
+    parameters.put(INDEX_TYPE, DEFAULT_INDEX_TYPE);
+    parameters.put(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+    parameters.put(BATCH_SIZE, "1");
+    parameters.put(TTL, "5");
+
+    timestampedIndexName = DEFAULT_INDEX_NAME + '-'
+        + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS);
+  }
+
+  void createNodes() throws Exception {
+    Settings settings = ImmutableSettings
+        .settingsBuilder()
+        .put("number_of_shards", 1)
+        .put("number_of_replicas", 0)
+        .put("routing.hash.type", "simple")
+        .put("gateway.type", "none")
+        .put("path.data", "target/es-test")
+        .build();
+
+    node = NodeBuilder.nodeBuilder().settings(settings).local(true).node();
+    client = node.client();
+
+    client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute()
+        .actionGet();
+  }
+
+  void shutdownNodes() throws Exception {
+    ((InternalNode) node).injector().getInstance(Gateway.class).reset();
+    client.close();
+    node.close();
+  }
+
+  @Before
+  public void setFixedJodaTime() {
+    DateTimeUtils.setCurrentMillisFixed(FIXED_TIME_MILLIS);
+  }
+
+  @After
+  public void resetJodaTime() {
+    DateTimeUtils.setCurrentMillisSystem();
+  }
+
+  Channel bindAndStartChannel(ElasticSearchSink fixture) {
+    // Configure the channel
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+
+    // Wire them together
+    fixture.setChannel(channel);
+    fixture.start();
+    return channel;
+  }
+
+  void assertMatchAllQuery(int expectedHits, Event... events) {
+    assertSearch(expectedHits, performSearch(QueryBuilders.matchAllQuery()),
+        null, events);
+  }
+
+  void assertBodyQuery(int expectedHits, Event... events) {
+    // Perform Multi Field Match
+    assertSearch(expectedHits,
+        performSearch(QueryBuilders.fieldQuery("@message", "event")),
+        null, events);
+  }
+
+  SearchResponse performSearch(QueryBuilder query) {
+    return client.prepareSearch(timestampedIndexName)
+        .setTypes(DEFAULT_INDEX_TYPE).setQuery(query).execute().actionGet();
+  }
+
+  void assertSearch(int expectedHits, SearchResponse response, Map<String, Object> expectedBody,
+                    Event... events) {
+    SearchHits hitResponse = response.getHits();
+    assertEquals(expectedHits, hitResponse.getTotalHits());
+
+    SearchHit[] hits = hitResponse.getHits();
+    Arrays.sort(hits, new Comparator<SearchHit>() {
+      @Override
+      public int compare(SearchHit o1, SearchHit o2) {
+        return o1.getSourceAsString().compareTo(o2.getSourceAsString());
+      }
+    });
+
+    for (int i = 0; i < events.length; i++) {
+      Event event = events[i];
+      SearchHit hit = hits[i];
+      Map<String, Object> source = hit.getSource();
+      if (expectedBody == null) {
+        assertEquals(new String(event.getBody()), source.get("@message"));
+      } else {
+        assertEquals(expectedBody, source.get("@message"));
+      }
+    }
+  }
+
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
new file mode 100644 (file)
index 0000000..d4e4654
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.assertEquals;
+
+public class TestElasticSearchDynamicSerializer {
+
+  @Test
+  public void testRoundTrip() throws Exception {
+    ElasticSearchDynamicSerializer fixture = new ElasticSearchDynamicSerializer();
+    Context context = new Context();
+    fixture.configure(context);
+
+    String message = "test body";
+    Map<String, String> headers = Maps.newHashMap();
+    headers.put("headerNameOne", "headerValueOne");
+    headers.put("headerNameTwo", "headerValueTwo");
+    headers.put("headerNameThree", "headerValueThree");
+    Event event = EventBuilder.withBody(message.getBytes(charset));
+    event.setHeaders(headers);
+
+    XContentBuilder expected = jsonBuilder().startObject();
+    expected.field("body", new String(message.getBytes(), charset));
+    for (String headerName : headers.keySet()) {
+      expected.field(headerName, new String(headers.get(headerName).getBytes(),
+          charset));
+    }
+    expected.endObject();
+
+    XContentBuilder actual = fixture.getContentBuilder(event);
+
+    assertEquals(new String(expected.bytes().array()), new String(actual
+        .bytes().array()));
+
+  }
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
new file mode 100644 (file)
index 0000000..b62254e
--- /dev/null
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.common.collect.Maps;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.sink.SinkConfiguration;
+import org.apache.flume.event.SimpleEvent;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.io.BytesStream;
+import org.elasticsearch.common.io.FastByteArrayOutputStream;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestElasticSearchIndexRequestBuilderFactory
+    extends AbstractElasticSearchSinkTest {
+
+  private static final Client FAKE_CLIENT = null;
+
+  private EventSerializerIndexRequestBuilderFactory factory;
+
+  private FakeEventSerializer serializer;
+
+  @Before
+  public void setupFactory() throws Exception {
+    serializer = new FakeEventSerializer();
+    factory = new EventSerializerIndexRequestBuilderFactory(serializer) {
+      @Override
+      IndexRequestBuilder prepareIndex(Client client) {
+        return new IndexRequestBuilder(FAKE_CLIENT);
+      }
+    };
+  }
+
+  @Test
+  public void shouldUseUtcAsBasisForDateFormat() {
+    assertEquals("Coordinated Universal Time",
+        factory.fastDateFormat.getTimeZone().getDisplayName());
+  }
+
+  @Test
+  public void indexNameShouldBePrefixDashFormattedTimestamp() {
+    long millis = 987654321L;
+    assertEquals("prefix-" + factory.fastDateFormat.format(millis),
+        factory.getIndexName("prefix", millis));
+  }
+
+  @Test
+  public void shouldEnsureTimestampHeaderPresentInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(FIXED_TIME_MILLIS, timestampedEvent.getTimestamp());
+    assertEquals(String.valueOf(FIXED_TIME_MILLIS),
+        timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldUseExistingTimestampHeaderInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("timestamp", "-321");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(-321L, timestampedEvent.getTimestamp());
+    assertEquals("-321", timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldUseExistingAtTimestampHeaderInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("@timestamp", "-999");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(-999L, timestampedEvent.getTimestamp());
+    assertEquals("-999", timestampedEvent.getHeaders().get("@timestamp"));
+    assertNull(timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldPreserveBodyAndNonTimestampHeadersInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    base.setBody(new byte[] {1,2,3,4});
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("foo", "bar");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals("bar", timestampedEvent.getHeaders().get("foo"));
+    assertArrayEquals(base.getBody(), timestampedEvent.getBody());
+  }
+
+  @Test
+  public void shouldSetIndexNameTypeAndSerializedEventIntoIndexRequest()
+      throws Exception {
+
+    String indexPrefix = "qwerty";
+    String indexType = "uiop";
+    Event event = new SimpleEvent();
+
+    IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest(
+        FAKE_CLIENT, indexPrefix, indexType, event);
+
+    assertEquals(indexPrefix + '-'
+        + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS),
+        indexRequestBuilder.request().index());
+    assertEquals(indexType, indexRequestBuilder.request().type());
+    assertArrayEquals(FakeEventSerializer.FAKE_BYTES,
+        indexRequestBuilder.request().source().array());
+  }
+
+  @Test
+  public void shouldSetIndexNameFromTimestampHeaderWhenPresent()
+      throws Exception {
+    String indexPrefix = "qwerty";
+    String indexType = "uiop";
+    Event event = new SimpleEvent();
+    event.getHeaders().put("timestamp", "1213141516");
+
+    IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest(
+        null, indexPrefix, indexType, event);
+
+    assertEquals(indexPrefix + '-'
+        + ElasticSearchIndexRequestBuilderFactory.df.format(1213141516L),
+        indexRequestBuilder.request().index());
+  }
+
+  @Test
+  public void shouldSetIndexNameTypeFromHeaderWhenPresent()
+      throws Exception {
+    String indexPrefix = "%{index-name}";
+    String indexType = "%{index-type}";
+    String indexValue = "testing-index-name-from-headers";
+    String typeValue = "testing-index-type-from-headers";
+
+    Event event = new SimpleEvent();
+    event.getHeaders().put("index-name", indexValue);
+    event.getHeaders().put("index-type", typeValue);
+
+    IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest(
+        null, indexPrefix, indexType, event);
+
+    assertEquals(indexValue + '-'
+        + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS),
+        indexRequestBuilder.request().index());
+    assertEquals(typeValue, indexRequestBuilder.request().type());
+  }
+
+  @Test
+  public void shouldConfigureEventSerializer() throws Exception {
+    assertFalse(serializer.configuredWithContext);
+    factory.configure(new Context());
+    assertTrue(serializer.configuredWithContext);
+
+    assertFalse(serializer.configuredWithComponentConfiguration);
+    factory.configure(new SinkConfiguration("name"));
+    assertTrue(serializer.configuredWithComponentConfiguration);
+  }
+
+  static class FakeEventSerializer implements ElasticSearchEventSerializer {
+
+    static final byte[] FAKE_BYTES = new byte[]{9, 8, 7, 6};
+    boolean configuredWithContext;
+    boolean configuredWithComponentConfiguration;
+
+    @Override
+    public BytesStream getContentBuilder(Event event) throws IOException {
+      FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4);
+      fbaos.write(FAKE_BYTES);
+      return fbaos;
+    }
+
+    @Override
+    public void configure(Context arg0) {
+      configuredWithContext = true;
+    }
+
+    @Override
+    public void configure(ComponentConfiguration arg0) {
+      configuredWithComponentConfiguration = true;
+    }
+  }
+
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
new file mode 100644 (file)
index 0000000..65b4dab
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.gson.JsonParser;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.elasticsearch.common.collect.Maps;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.Test;
+
+import java.util.Date;
+import java.util.Map;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.assertEquals;
+
+public class TestElasticSearchLogStashEventSerializer {
+
+  @Test
+  public void testRoundTrip() throws Exception {
+    ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer();
+    Context context = new Context();
+    fixture.configure(context);
+
+    String message = "test body";
+    Map<String, String> headers = Maps.newHashMap();
+    long timestamp = System.currentTimeMillis();
+    headers.put("timestamp", String.valueOf(timestamp));
+    headers.put("source", "flume_tail_src");
+    headers.put("host", "test@localhost");
+    headers.put("src_path", "/tmp/test");
+    headers.put("headerNameOne", "headerValueOne");
+    headers.put("headerNameTwo", "headerValueTwo");
+    headers.put("type", "sometype");
+    Event event = EventBuilder.withBody(message.getBytes(charset));
+    event.setHeaders(headers);
+
+    XContentBuilder expected = jsonBuilder().startObject();
+    expected.field("@message", new String(message.getBytes(), charset));
+    expected.field("@timestamp", new Date(timestamp));
+    expected.field("@source", "flume_tail_src");
+    expected.field("@type", "sometype");
+    expected.field("@source_host", "test@localhost");
+    expected.field("@source_path", "/tmp/test");
+
+    expected.startObject("@fields");
+    expected.field("timestamp", String.valueOf(timestamp));
+    expected.field("src_path", "/tmp/test");
+    expected.field("host", "test@localhost");
+    expected.field("headerNameTwo", "headerValueTwo");
+    expected.field("source", "flume_tail_src");
+    expected.field("headerNameOne", "headerValueOne");
+    expected.field("type", "sometype");
+    expected.endObject();
+
+    expected.endObject();
+
+    XContentBuilder actual = fixture.getContentBuilder(event);
+    
+    JsonParser parser = new JsonParser();
+    assertEquals(parser.parse(expected.string()),parser.parse(actual.string()));
+  }
+
+  @Test
+  public void shouldHandleInvalidJSONDuringComplexParsing() throws Exception {
+    ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer();
+    Context context = new Context();
+    fixture.configure(context);
+
+    String message = "{flume: somethingnotvalid}";
+    Map<String, String> headers = Maps.newHashMap();
+    long timestamp = System.currentTimeMillis();
+    headers.put("timestamp", String.valueOf(timestamp));
+    headers.put("source", "flume_tail_src");
+    headers.put("host", "test@localhost");
+    headers.put("src_path", "/tmp/test");
+    headers.put("headerNameOne", "headerValueOne");
+    headers.put("headerNameTwo", "headerValueTwo");
+    headers.put("type", "sometype");
+    Event event = EventBuilder.withBody(message.getBytes(charset));
+    event.setHeaders(headers);
+
+    XContentBuilder expected = jsonBuilder().startObject();
+    expected.field("@message", new String(message.getBytes(), charset));
+    expected.field("@timestamp", new Date(timestamp));
+    expected.field("@source", "flume_tail_src");
+    expected.field("@type", "sometype");
+    expected.field("@source_host", "test@localhost");
+    expected.field("@source_path", "/tmp/test");
+
+    expected.startObject("@fields");
+    expected.field("timestamp", String.valueOf(timestamp));
+    expected.field("src_path", "/tmp/test");
+    expected.field("host", "test@localhost");
+    expected.field("headerNameTwo", "headerValueTwo");
+    expected.field("source", "flume_tail_src");
+    expected.field("headerNameOne", "headerValueOne");
+    expected.field("type", "sometype");
+    expected.endObject();
+
+    expected.endObject();
+
+    XContentBuilder actual = fixture.getContentBuilder(event);
+
+    JsonParser parser = new JsonParser();
+    assertEquals(parser.parse(expected.string()),parser.parse(actual.string()));
+  }
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
new file mode 100644 (file)
index 0000000..69acc06
--- /dev/null
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.UUID;
+import org.elasticsearch.common.io.BytesStream;
+import org.elasticsearch.common.io.FastByteArrayOutputStream;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
+
+  private ElasticSearchSink fixture;
+
+  @Before
+  public void init() throws Exception {
+    initDefaults();
+    createNodes();
+    fixture = new ElasticSearchSink(true);
+    fixture.setName("ElasticSearchSink-" + UUID.randomUUID().toString());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    shutdownNodes();
+  }
+
+  @Test
+  public void shouldIndexOneEvent() throws Exception {
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody("event #1 or 1".getBytes());
+    channel.put(event);
+    tx.commit();
+    tx.close();
+
+    fixture.process();
+    fixture.stop();
+    client.admin().indices()
+        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+
+    assertMatchAllQuery(1, event);
+    assertBodyQuery(1, event);
+  }
+
+  @Test
+  public void shouldIndexInvalidComplexJsonBody() throws Exception {
+    parameters.put(BATCH_SIZE, "3");
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event event1 = EventBuilder.withBody("TEST1 {test}".getBytes());
+    channel.put(event1);
+    Event event2 = EventBuilder.withBody("{test: TEST2 }".getBytes());
+    channel.put(event2);
+    Event event3 = EventBuilder.withBody("{\"test\":{ TEST3 {test} }}".getBytes());
+    channel.put(event3);
+    tx.commit();
+    tx.close();
+
+    fixture.process();
+    fixture.stop();
+    client.admin().indices()
+        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+
+    assertMatchAllQuery(3);
+    assertSearch(1,
+        performSearch(QueryBuilders.fieldQuery("@message", "TEST1")),
+        null, event1);
+    assertSearch(1,
+        performSearch(QueryBuilders.fieldQuery("@message", "TEST2")),
+        null, event2);
+    assertSearch(1,
+        performSearch(QueryBuilders.fieldQuery("@message", "TEST3")),
+        null, event3);
+  }
+
+  @Test
+  public void shouldIndexComplexJsonEvent() throws Exception {
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(
+        "{\"event\":\"json content\",\"num\":1}".getBytes());
+    channel.put(event);
+    tx.commit();
+    tx.close();
+
+    fixture.process();
+    fixture.stop();
+    client.admin().indices()
+            .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+
+    Map<String, Object> expectedBody = new HashMap<String, Object>();
+    expectedBody.put("event", "json content");
+    expectedBody.put("num", 1);
+
+    assertSearch(1,
+        performSearch(QueryBuilders.matchAllQuery()), expectedBody, event);
+    assertSearch(1,
+        performSearch(QueryBuilders.fieldQuery("@message.event", "json")),
+        expectedBody, event);
+  }
+
+  @Test
+  public void shouldIndexFiveEvents() throws Exception {
+    // Make it so we only need to call process once
+    parameters.put(BATCH_SIZE, "5");
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    int numberOfEvents = 5;
+    Event[] events = new Event[numberOfEvents];
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < numberOfEvents; i++) {
+      String body = "event #" + i + " of " + numberOfEvents;
+      Event event = EventBuilder.withBody(body.getBytes());
+      events[i] = event;
+      channel.put(event);
+    }
+    tx.commit();
+    tx.close();
+
+    fixture.process();
+    fixture.stop();
+    client.admin().indices()
+        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+
+    assertMatchAllQuery(numberOfEvents, events);
+    assertBodyQuery(5, events);
+  }
+
+  @Test
+  public void shouldIndexFiveEventsOverThreeBatches() throws Exception {
+    parameters.put(BATCH_SIZE, "2");
+    Configurables.configure(fixture, new Context(parameters));
+    Channel channel = bindAndStartChannel(fixture);
+
+    int numberOfEvents = 5;
+    Event[] events = new Event[numberOfEvents];
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < numberOfEvents; i++) {
+      String body = "event #" + i + " of " + numberOfEvents;
+      Event event = EventBuilder.withBody(body.getBytes());
+      events[i] = event;
+      channel.put(event);
+    }
+    tx.commit();
+    tx.close();
+
+    int count = 0;
+    Status status = Status.READY;
+    while (status != Status.BACKOFF) {
+      count++;
+      status = fixture.process();
+    }
+    fixture.stop();
+
+    assertEquals(3, count);
+
+    client.admin().indices()
+        .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
+    assertMatchAllQuery(numberOfEvents, events);
+    assertBodyQuery(5, events);
+  }
+
+  @Test
+  public void shouldParseConfiguration() {
+    parameters.put(HOSTNAMES, "10.5.5.27");
+    parameters.put(CLUSTER_NAME, "testing-cluster-name");
+    parameters.put(INDEX_NAME, "testing-index-name");
+    parameters.put(INDEX_TYPE, "testing-index-type");
+    parameters.put(TTL, "10");
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    String[] expected = { "10.5.5.27" };
+
+    assertEquals("testing-cluster-name", fixture.getClusterName());
+    assertEquals("testing-index-name", fixture.getIndexName());
+    assertEquals("testing-index-type", fixture.getIndexType());
+    assertEquals(TimeUnit.DAYS.toMillis(10), fixture.getTTLMs());
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldParseConfigurationUsingDefaults() {
+    parameters.put(HOSTNAMES, "10.5.5.27");
+    parameters.remove(INDEX_NAME);
+    parameters.remove(INDEX_TYPE);
+    parameters.remove(CLUSTER_NAME);
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    String[] expected = { "10.5.5.27" };
+
+    assertEquals(DEFAULT_INDEX_NAME, fixture.getIndexName());
+    assertEquals(DEFAULT_INDEX_TYPE, fixture.getIndexType());
+    assertEquals(DEFAULT_CLUSTER_NAME, fixture.getClusterName());
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldParseMultipleHostUsingDefaultPorts() {
+    parameters.put(HOSTNAMES, "10.5.5.27,10.5.5.28,10.5.5.29");
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    String[] expected = { "10.5.5.27", "10.5.5.28", "10.5.5.29" };
+
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldParseMultipleHostWithWhitespacesUsingDefaultPorts() {
+    parameters.put(HOSTNAMES, " 10.5.5.27 , 10.5.5.28 , 10.5.5.29 ");
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    String[] expected = { "10.5.5.27", "10.5.5.28", "10.5.5.29" };
+
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldParseMultipleHostAndPorts() {
+    parameters.put(HOSTNAMES, "10.5.5.27:9300,10.5.5.28:9301,10.5.5.29:9302");
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    String[] expected = { "10.5.5.27:9300", "10.5.5.28:9301", "10.5.5.29:9302" };
+
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldParseMultipleHostAndPortsWithWhitespaces() {
+    parameters.put(HOSTNAMES,
+        " 10.5.5.27 : 9300 , 10.5.5.28 : 9301 , 10.5.5.29 : 9302 ");
+
+    fixture = new ElasticSearchSink();
+    fixture.configure(new Context(parameters));
+
+    String[] expected = { "10.5.5.27:9300", "10.5.5.28:9301", "10.5.5.29:9302" };
+
+    assertArrayEquals(expected, fixture.getServerAddresses());
+  }
+
+  @Test
+  public void shouldAllowCustomElasticSearchIndexRequestBuilderFactory()
+      throws Exception {
+    parameters.put(SERIALIZER,
+        CustomElasticSearchIndexRequestBuilderFactory.class.getName());
+
+    fixture.configure(new Context(parameters));
+
+    Channel channel = bindAndStartChannel(fixture);
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    String body = "{ foo: \"bar\" }";
+    Event event = EventBuilder.withBody(body.getBytes());
+    channel.put(event);
+    tx.commit();
+    tx.close();
+
+    fixture.process();
+    fixture.stop();
+
+    assertEquals(fixture.getIndexName() + "-05_17_36_789",
+        CustomElasticSearchIndexRequestBuilderFactory.actualIndexName);
+    assertEquals(fixture.getIndexType(),
+        CustomElasticSearchIndexRequestBuilderFactory.actualIndexType);
+    assertArrayEquals(event.getBody(),
+        CustomElasticSearchIndexRequestBuilderFactory.actualEventBody);
+    assertTrue(CustomElasticSearchIndexRequestBuilderFactory.hasContext);
+  }
+
+  @Test
+  public void shouldParseFullyQualifiedTTLs() {
+    Map<String, Long> testTTLMap = new HashMap<String, Long>();
+    testTTLMap.put("1ms", Long.valueOf(1));
+    testTTLMap.put("1s", Long.valueOf(1000));
+    testTTLMap.put("1m", Long.valueOf(60000));
+    testTTLMap.put("1h", Long.valueOf(3600000));
+    testTTLMap.put("1d", Long.valueOf(86400000));
+    testTTLMap.put("1w", Long.valueOf(604800000));
+    testTTLMap.put("1", Long.valueOf(86400000));
+
+    parameters.put(HOSTNAMES, "10.5.5.27");
+    parameters.put(CLUSTER_NAME, "testing-cluster-name");
+    parameters.put(INDEX_NAME, "testing-index-name");
+    parameters.put(INDEX_TYPE, "testing-index-type");
+
+    for (String ttl : testTTLMap.keySet()) {
+      parameters.put(TTL, ttl);
+      fixture = new ElasticSearchSink();
+      fixture.configure(new Context(parameters));
+
+      String[] expected = { "10.5.5.27" };
+      assertEquals("testing-cluster-name", fixture.getClusterName());
+      assertEquals("testing-index-name", fixture.getIndexName());
+      assertEquals("testing-index-type", fixture.getIndexType());
+      assertEquals((long) testTTLMap.get(ttl), fixture.getTTLMs());
+      assertArrayEquals(expected, fixture.getServerAddresses());
+
+    }
+  }
+
+  public static final class CustomElasticSearchIndexRequestBuilderFactory
+      extends AbstractElasticSearchIndexRequestBuilderFactory {
+
+    static String actualIndexName;
+    static String actualIndexType;
+    static byte[] actualEventBody;
+    static boolean hasContext;
+
+    public CustomElasticSearchIndexRequestBuilderFactory() {
+      super(FastDateFormat.getInstance("HH_mm_ss_SSS", TimeZone.getTimeZone("EST5EDT")));
+    }
+
+    @Override
+    protected void prepareIndexRequest(IndexRequestBuilder indexRequest, String indexName,
+                                       String indexType, Event event) throws IOException {
+      actualIndexName = indexName;
+      actualIndexType = indexType;
+      actualEventBody = event.getBody();
+      indexRequest.setIndex(indexName).setType(indexType).setSource(event.getBody());
+    }
+
+    @Override
+    public void configure(Context arg0) {
+      hasContext = true;
+    }
+
+    @Override
+    public void configure(ComponentConfiguration arg0) {
+      //no-op
+    }
+  }
+
+  @Test
+  public void shouldFailToConfigureWithInvalidSerializerClass()
+      throws Exception {
+
+    parameters.put(SERIALIZER, "java.lang.String");
+    try {
+      Configurables.configure(fixture, new Context(parameters));
+    } catch (ClassCastException e) {
+      // expected
+    }
+
+    parameters.put(SERIALIZER, FakeConfigurable.class.getName());
+    try {
+      Configurables.configure(fixture, new Context(parameters));
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void shouldUseSpecifiedSerializer() throws Exception {
+    Context context = new Context();
+    context.put(SERIALIZER,
+        "org.apache.flume.sink.elasticsearch.FakeEventSerializer");
+
+    assertNull(fixture.getEventSerializer());
+    fixture.configure(context);
+    assertTrue(fixture.getEventSerializer() instanceof FakeEventSerializer);
+  }
+
+  @Test
+  public void shouldUseSpecifiedIndexNameBuilder() throws Exception {
+    Context context = new Context();
+    context.put(ElasticSearchSinkConstants.INDEX_NAME_BUILDER,
+            "org.apache.flume.sink.elasticsearch.FakeIndexNameBuilder");
+
+    assertNull(fixture.getIndexNameBuilder());
+    fixture.configure(context);
+    assertTrue(fixture.getIndexNameBuilder() instanceof FakeIndexNameBuilder);
+  }
+
+  public static class FakeConfigurable implements Configurable {
+    @Override
+    public void configure(Context arg0) {
+      // no-op
+    }
+  }
+}
+
+/**
+ * Internal class. Fake event serializer used for tests
+ */
+class FakeEventSerializer implements ElasticSearchEventSerializer {
+
+  static final byte[] FAKE_BYTES = new byte[] { 9, 8, 7, 6 };
+  boolean configuredWithContext;
+  boolean configuredWithComponentConfiguration;
+
+  @Override
+  public BytesStream getContentBuilder(Event event) throws IOException {
+    FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4);
+    fbaos.write(FAKE_BYTES);
+    return fbaos;
+  }
+
+  @Override
+  public void configure(Context arg0) {
+    configuredWithContext = true;
+  }
+
+  @Override
+  public void configure(ComponentConfiguration arg0) {
+    configuredWithComponentConfiguration = true;
+  }
+}
+
+/**
+ * Internal class. Fake index name builder used only for tests.
+ */
+class FakeIndexNameBuilder implements IndexNameBuilder {
+
+  static final String INDEX_NAME = "index_name";
+
+  @Override
+  public String getIndexName(Event event) {
+    return INDEX_NAME;
+  }
+
+  @Override
+  public String getIndexPrefix(Event event) {
+    return INDEX_NAME;
+  }
+
+  @Override
+  public void configure(Context context) {
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java
new file mode 100644 (file)
index 0000000..2a36439
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkFactory;
+import org.apache.flume.sink.DefaultSinkFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestElasticSearchSinkCreation {
+
+  private SinkFactory sinkFactory;
+
+  @Before
+  public void setUp() {
+    sinkFactory = new DefaultSinkFactory();
+  }
+
+  private void verifySinkCreation(String name, String type,
+      Class<?> typeClass) throws FlumeException {
+    Sink sink = sinkFactory.create(name, type);
+    Assert.assertNotNull(sink);
+    Assert.assertTrue(typeClass.isInstance(sink));
+  }
+
+  @Test
+  public void testSinkCreation() {
+    verifySinkCreation("elasticsearch-sink", "elasticsearch", ElasticSearchSink.class);
+  }
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java
new file mode 100644 (file)
index 0000000..678342a
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimeBasedIndexNameBuilderTest {
+
+  private TimeBasedIndexNameBuilder indexNameBuilder;
+
+  @Before
+  public void setUp() throws Exception {
+    Context context = new Context();
+    context.put(ElasticSearchSinkConstants.INDEX_NAME, "prefix");
+    indexNameBuilder = new TimeBasedIndexNameBuilder();
+    indexNameBuilder.configure(context);
+  }
+
+  @Test
+  public void shouldUseUtcAsBasisForDateFormat() {
+    assertEquals("Coordinated Universal Time",
+            indexNameBuilder.getFastDateFormat().getTimeZone().getDisplayName());
+  }
+
+  @Test
+  public void indexNameShouldBePrefixDashFormattedTimestamp() {
+    long time = 987654321L;
+    Event event = new SimpleEvent();
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put("timestamp", Long.toString(time));
+    event.setHeaders(headers);
+    assertEquals("prefix-" + indexNameBuilder.getFastDateFormat().format(time),
+        indexNameBuilder.getIndexName(event));
+  }
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
new file mode 100644 (file)
index 0000000..bef2ac6
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.common.collect.Maps;
+import org.apache.flume.event.SimpleEvent;
+import org.joda.time.DateTimeUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TimestampedEventTest {
+  static final long FIXED_TIME_MILLIS = 123456789L;
+
+  @Before
+  public void setFixedJodaTime() {
+    DateTimeUtils.setCurrentMillisFixed(FIXED_TIME_MILLIS);
+  }
+
+  @Test
+  public void shouldEnsureTimestampHeaderPresentInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(FIXED_TIME_MILLIS, timestampedEvent.getTimestamp());
+    assertEquals(String.valueOf(FIXED_TIME_MILLIS),
+            timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldUseExistingTimestampHeaderInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("timestamp", "-321");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(-321L, timestampedEvent.getTimestamp());
+    assertEquals("-321", timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldUseExistingAtTimestampHeaderInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("@timestamp", "-999");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(-999L, timestampedEvent.getTimestamp());
+    assertEquals("-999", timestampedEvent.getHeaders().get("@timestamp"));
+    assertNull(timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldPreserveBodyAndNonTimestampHeadersInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    base.setBody(new byte[] {1,2,3,4});
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("foo", "bar");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals("bar", timestampedEvent.getHeaders().get("foo"));
+    assertArrayEquals(base.getBody(), timestampedEvent.getBody());
+  }
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
new file mode 100644 (file)
index 0000000..0d1d092
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.elasticsearch.client;
+
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class RoundRobinListTest {
+
+  private RoundRobinList<String> fixture;
+
+  @Before
+  public void setUp() {
+    fixture = new RoundRobinList<String>(Arrays.asList("test1", "test2"));
+  }
+
+  @Test
+  public void shouldReturnNextElement() {
+    assertEquals("test1", fixture.get());
+    assertEquals("test2", fixture.get());
+    assertEquals("test1", fixture.get());
+    assertEquals("test2", fixture.get());
+    assertEquals("test1", fixture.get());
+  }
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
new file mode 100644 (file)
index 0000000..c3f07b0
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchClientFactory {
+
+  ElasticSearchClientFactory factory;
+  
+  @Mock
+  ElasticSearchEventSerializer serializer;
+
+  @Before
+  public void setUp() {
+    initMocks(this);
+    factory = new ElasticSearchClientFactory();
+  }
+
+  @Test
+  public void shouldReturnTransportClient() throws Exception {
+    String[] hostNames = { "127.0.0.1" };
+    Object o = factory.getClient(ElasticSearchClientFactory.TransportClient,
+                                 hostNames, "test", serializer, null);
+    assertThat(o, instanceOf(ElasticSearchTransportClient.class));
+  }
+
+  @Test
+  public void shouldReturnRestClient() throws NoSuchClientTypeException {
+    String[] hostNames = { "127.0.0.1" };
+    Object o = factory.getClient(ElasticSearchClientFactory.RestClient,
+                                 hostNames, "test", serializer, null);
+    assertThat(o, instanceOf(ElasticSearchRestClient.class));
+  }
+
+  @Test(expected = NoSuchClientTypeException.class)
+  public void shouldThrowNoSuchClientTypeException() throws NoSuchClientTypeException {
+    String[] hostNames = { "127.0.0.1" };
+    factory.getClient("not_existing_client", hostNames, "test", null, null);
+  }
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
new file mode 100644 (file)
index 0000000..9551c81
--- /dev/null
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import com.google.common.base.Splitter;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.BytesStream;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchRestClient {
+
+  private ElasticSearchRestClient fixture;
+
+  @Mock
+  private ElasticSearchEventSerializer serializer;
+
+  @Mock
+  private IndexNameBuilder nameBuilder;
+  
+  @Mock
+  private Event event;
+
+  @Mock
+  private HttpClient httpClient;
+
+  @Mock
+  private HttpResponse httpResponse;
+
+  @Mock
+  private StatusLine httpStatus;
+
+  @Mock
+  private HttpEntity httpEntity;
+
+  private static final String INDEX_NAME = "foo_index";
+  private static final String MESSAGE_CONTENT = "{\"body\":\"test\"}";
+  private static final String[] HOSTS = {"host1", "host2"};
+
+  @Before
+  public void setUp() throws IOException {
+    initMocks(this);
+    BytesReference bytesReference = mock(BytesReference.class);
+    BytesStream bytesStream = mock(BytesStream.class);
+
+    when(nameBuilder.getIndexName(any(Event.class))).thenReturn(INDEX_NAME);
+    when(bytesReference.toBytesArray()).thenReturn(new BytesArray(MESSAGE_CONTENT));
+    when(bytesStream.bytes()).thenReturn(bytesReference);
+    when(serializer.getContentBuilder(any(Event.class))).thenReturn(bytesStream);
+    fixture = new ElasticSearchRestClient(HOSTS, serializer, httpClient);
+  }
+
+  @Test
+  public void shouldAddNewEventWithoutTTL() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+    
+    fixture.addEvent(event, nameBuilder, "bar_type", -1);
+    fixture.execute();
+
+    verify(httpClient).execute(isA(HttpUriRequest.class));
+    verify(httpClient).execute(argument.capture());
+
+    assertEquals("http://host1/_bulk", argument.getValue().getURI().toString());
+    assertTrue(verifyJsonEvents("{\"index\":{\"_type\":\"bar_type\", \"_index\":\"foo_index\"}}\n",
+            MESSAGE_CONTENT, EntityUtils.toString(argument.getValue().getEntity())));
+  }
+
+  @Test
+  public void shouldAddNewEventWithTTL() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 123);
+    fixture.execute();
+
+    verify(httpClient).execute(isA(HttpUriRequest.class));
+    verify(httpClient).execute(argument.capture());
+
+    assertEquals("http://host1/_bulk", argument.getValue().getURI().toString());
+    assertTrue(verifyJsonEvents(
+        "{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\",\"_ttl\":\"123\"}}\n",
+        MESSAGE_CONTENT, EntityUtils.toString(argument.getValue().getEntity())));
+  }
+
+  private boolean verifyJsonEvents(String expectedIndex, String expectedBody, String actual) {
+    Iterator<String> it = Splitter.on("\n").split(actual).iterator();
+    JsonParser parser = new JsonParser();
+    JsonObject[] arr = new JsonObject[2];
+    for (int i = 0; i < 2; i++) {
+      arr[i] = (JsonObject) parser.parse(it.next());
+    }
+    return arr[0].equals(parser.parse(expectedIndex)) && arr[1].equals(parser.parse(expectedBody));
+  }
+
+  @Test(expected = EventDeliveryException.class)
+  public void shouldThrowEventDeliveryException() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 123);
+    fixture.execute();
+  }
+
+  @Test()
+  public void shouldRetryBulkOperation() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR,
+                                                HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 123);
+    fixture.execute();
+
+    verify(httpClient, times(2)).execute(isA(HttpUriRequest.class));
+    verify(httpClient, times(2)).execute(argument.capture());
+
+    List<HttpPost> allValues = argument.getAllValues();
+    assertEquals("http://host1/_bulk", allValues.get(0).getURI().toString());
+    assertEquals("http://host2/_bulk", allValues.get(1).getURI().toString());
+  }
+}
diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
new file mode 100644 (file)
index 0000000..b7b8e74
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.BytesStream;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchTransportClient {
+
+  private ElasticSearchTransportClient fixture;
+
+  @Mock
+  private ElasticSearchEventSerializer serializer;
+
+  @Mock
+  private IndexNameBuilder nameBuilder;
+
+  @Mock
+  private Client elasticSearchClient;
+
+  @Mock
+  private BulkRequestBuilder bulkRequestBuilder;
+
+  @Mock
+  private IndexRequestBuilder indexRequestBuilder;
+
+  @Mock
+  private Event event;
+
+  @Before
+  public void setUp() throws IOException {
+    initMocks(this);
+    BytesReference bytesReference = mock(BytesReference.class);
+    BytesStream bytesStream = mock(BytesStream.class);
+
+    when(nameBuilder.getIndexName(any(Event.class))).thenReturn("foo_index");
+    when(bytesReference.toBytes()).thenReturn("{\"body\":\"test\"}".getBytes());
+    when(bytesStream.bytes()).thenReturn(bytesReference);
+    when(serializer.getContentBuilder(any(Event.class)))
+        .thenReturn(bytesStream);
+    when(elasticSearchClient.prepareIndex(anyString(), anyString()))
+        .thenReturn(indexRequestBuilder);
+    when(indexRequestBuilder.setSource(bytesReference)).thenReturn(
+        indexRequestBuilder);
+
+    fixture = new ElasticSearchTransportClient(elasticSearchClient, serializer);
+    fixture.setBulkRequestBuilder(bulkRequestBuilder);
+  }
+
+  @Test
+  public void shouldAddNewEventWithoutTTL() throws Exception {
+    fixture.addEvent(event, nameBuilder, "bar_type", -1);
+    verify(indexRequestBuilder).setSource(
+        serializer.getContentBuilder(event).bytes());
+    verify(bulkRequestBuilder).add(indexRequestBuilder);
+  }
+
+  @Test
+  public void shouldAddNewEventWithTTL() throws Exception {
+    fixture.addEvent(event, nameBuilder, "bar_type", 10);
+    verify(indexRequestBuilder).setTTL(10);
+    verify(indexRequestBuilder).setSource(
+        serializer.getContentBuilder(event).bytes());
+  }
+
+  @Test
+  public void shouldExecuteBulkRequestBuilder() throws Exception {
+    ListenableActionFuture<BulkResponse> action =
+        (ListenableActionFuture<BulkResponse>) mock(ListenableActionFuture.class);
+    BulkResponse response = mock(BulkResponse.class);
+    when(bulkRequestBuilder.execute()).thenReturn(action);
+    when(action.actionGet()).thenReturn(response);
+    when(response.hasFailures()).thenReturn(false);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 10);
+    fixture.execute();
+    verify(bulkRequestBuilder).execute();
+  }
+
+  @Test(expected = EventDeliveryException.class)
+  public void shouldThrowExceptionOnExecuteFailed() throws Exception {
+    ListenableActionFuture<BulkResponse> action =
+        (ListenableActionFuture<BulkResponse>) mock(ListenableActionFuture.class);
+    BulkResponse response = mock(BulkResponse.class);
+    when(bulkRequestBuilder.execute()).thenReturn(action);
+    when(action.actionGet()).thenReturn(response);
+    when(response.hasFailures()).thenReturn(true);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 10);
+    fixture.execute();
+  }
+}
diff --git a/flume-elasticsearch-sink/src/test/resources/log4j2.xml b/flume-elasticsearch-sink/src/test/resources/log4j2.xml
new file mode 100644 (file)
index 0000000..5676916
--- /dev/null
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<Configuration status="OFF">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d (%t) [%p - %l] %m%n" />
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Logger name="org.apache.flume" level="DEBUG"/>
+    <Root level="DEBUG">
+      <AppenderRef ref="Console" />
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644 (file)
index 0000000..553aaa6
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,277 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <parent>
+    <groupId>org.apache.flume</groupId>
+    <artifactId>flume-parent</artifactId>
+    <version>1.10.0-SNAPSHOT</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.flume</groupId>
+  <artifactId>flume-search</artifactId>
+  <version>1.0.0-SNAPSHOT</version>
+  <packaging>pom</packaging>
+
+  <name>Apache Flume Search Components</name>
+
+  <properties>
+    <!-- Set default encoding to UTF-8 to remove maven complaints -->
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+    <!-- Java compiler configuration -->
+    <sourceJavaVersion>1.8</sourceJavaVersion>
+    <targetJavaVersion>1.8</targetJavaVersion>
+
+    <elasticsearch.version>0.90.1</elasticsearch.version>
+    <flume.version>1.10.0-SNAPSHOT</flume.version>
+  </properties>
+
+  <modules>
+    <module>flume-elasticsearch-sink</module>
+  </modules>
+
+  <inceptionYear>2009</inceptionYear>
+
+  <issueManagement>
+    <system>JIRA</system>
+    <url>https://issues.apache.org/jira/browse/FLUME</url>
+  </issueManagement>
+
+  <licenses>
+    <license>
+      <name>The Apache Software License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+    </license>
+  </licenses>
+
+  <mailingLists>
+    <mailingList>
+      <archive>http://mail-archives.apache.org/mod_mbox/flume-user/</archive>
+      <name>Flume User List</name>
+      <post>user@flume.apache.org</post>
+      <subscribe>user-subscribe@flume.apache.org</subscribe>
+      <unsubscribe>user-unsubscribe@flume.apache.org</unsubscribe>
+    </mailingList>
+    <mailingList>
+      <archive>http://mail-archives.apache.org/mod_mbox/flume-dev/</archive>
+      <name>Flume Developer List</name>
+      <post>dev@flume.apache.org</post>
+      <subscribe>dev-subscribe@flume.apache.org</subscribe>
+      <unsubscribe>dev-unsubscribe@flume.apache.org</unsubscribe>
+    </mailingList>
+    <mailingList>
+      <archive>http://mail-archives.apache.org/mod_mbox/flume-commits/</archive>
+      <name>Flume Commits</name>
+      <post>commits@flume.apache.org</post>
+      <subscribe>commits-subscribe@flume.apache.org</subscribe>
+      <unsubscribe>commits-unsubscribe@flume.apache.org</unsubscribe>
+    </mailingList>
+  </mailingLists>
+
+  <scm>
+    <url>https://git-wip-us.apache.org/repos/asf?p=flume.git;a=tree;h=refs/heads/trunk;hb=trunk</url>
+    <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/flume.git</developerConnection>
+    <connection>scm:git:http://git-wip-us.apache.org/repos/asf/flume.git</connection>
+  </scm>
+
+  <ciManagement>
+    <system>jenkins</system>
+    <url>https://builds.apache.org/job/flume-trunk</url>
+  </ciManagement>
+
+  <developers>
+    <developer>
+      <name>Ralph Goers</name>
+      <id>rgoers</id>
+      <email>rgoers@apache.org</email>
+      <organization>Intuit</organization>
+    </developer>
+  </developers>
+
+  <organization>
+    <name>Apache Software Foundation</name>
+    <url>http://www.apache.org</url>
+  </organization>
+
+  <distributionManagement>
+    <repository>
+      <id>apache.staging.https</id>
+      <name>Apache Staging Repository</name>
+      <url>https://repository.apache.org/service/local/staging/deploy/maven2/</url>
+    </repository>
+    <site>
+      <id>apache.website</id>
+      <url>${siteUrlDeployment}</url>
+      <name>Flume Site</name>
+    </site>
+  </distributionManagement>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>${mvn-checkstyle-plugin.version}</version>
+        <dependencies>
+          <!-- The build-support module adds the checkstyle config files to the classpath. -->
+          <dependency>
+            <groupId>org.apache.flume</groupId>
+            <artifactId>build-support</artifactId>
+            <version>${flume.version}</version>
+          </dependency>
+          <!-- Newer versions of puppycrawl checkstyle have more features. Pull in a more recent
+               version than is specified in the maven-checkstyle-plugin pom file. -->
+          <dependency>
+            <groupId>com.puppycrawl.tools</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>${checkstyle.tool.version}</version>
+          </dependency>
+        </dependencies>
+        <executions>
+          <execution>
+            <id>verify.checkstyle</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <configLocation>config/checkstyle/checkstyle.xml</configLocation>
+          <suppressionsLocation>config/checkstyle/checkstyle-suppressions.xml</suppressionsLocation>
+          <suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression>
+          <encoding>UTF-8</encoding>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
+          <linkXRef>false</linkXRef>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencyManagement>
+    <dependencies>
+
+      <dependency>
+        <groupId>org.elasticsearch</groupId>
+        <artifactId>elasticsearch</artifactId>
+        <version>${elasticsearch.version}</version>
+        <optional>true</optional>
+      </dependency>
+
+      <dependency>
+        <groupId>org.elasticsearch</groupId>
+        <artifactId>elasticsearch</artifactId>
+        <version>${elasticsearch.version}</version>
+        <classifier>tests</classifier>
+        <scope>test</scope>
+      </dependency>
+
+      <!-- internal module dependencies -->
+
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-configuration</artifactId>
+        <version>${flume.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-config-filter-api</artifactId>
+        <version>${flume.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-auth</artifactId>
+        <version>${flume.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-core</artifactId>
+        <version>${flume.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>build-support</artifactId>
+        <version>${flume.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-tools</artifactId>
+        <version>${flume.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-node</artifactId>
+        <version>${flume.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-sdk</artifactId>
+        <version>${flume.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.flume</groupId>
+        <artifactId>flume-ng-sdk</artifactId>
+        <version>${flume.version}</version>
+        <classifier>tests</classifier>
+        <scope>test</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-project-info-reports-plugin</artifactId>
+        <reportSets>
+          <reportSet>
+            <reports>
+              <report>ci-management</report>
+              <report>distribution-management</report>
+              <report>team</report>
+              <report>mailing-lists</report>
+              <report>issue-management</report>
+              <report>licenses</report>
+              <report>scm</report>
+            </reports>
+          </reportSet>
+        </reportSets>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-pmd-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </reporting>
+
+</project>