METAMODEL-1099: Implemented DataContextFactory SPI for ElasticSearch
authorkaspersorensen <i.am.kasper.sorensen@gmail.com>
Fri, 29 Jul 2016 15:38:49 +0000 (08:38 -0700)
committerkaspersorensen <i.am.kasper.sorensen@gmail.com>
Fri, 29 Jul 2016 15:38:49 +0000 (08:38 -0700)
Closes #118

CHANGES.md
core/src/main/java/org/apache/metamodel/factory/DataContextFactory.java
elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java [new file with mode: 0644]
elasticsearch/native/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory [new file with mode: 0644]
elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java [new file with mode: 0644]
elasticsearch/rest/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory [new file with mode: 0644]

index 4f61177..bd2cec8 100644 (file)
@@ -1,6 +1,7 @@
 ### Apache MetaModel 4.5.4 (work in progress)
 
  * [METAMODEL-1099] - Created a new DataContextFactory SPI and a extensible registry of implementations based on ServiceLoader.
+ * [METAMODEL-1099] - Implemented DataContextFactory SPI for connectors: JDBC, CSV, ElasticSearch
  * [METAMODEL-1103] - Fixed a bug pertaining to anchoring of wildcards in LIKE operands.
  * [METAMODEL-1088] - Add support for aliases in MongoDB.
  * [METAMODEL-1086] - Fixed encoding issue when CsvDataContext is instantiated with InputStream.
index b9f8e3e..1a00fa8 100644 (file)
  */
 package org.apache.metamodel.factory;
 
+import java.util.ServiceLoader;
+
 import org.apache.metamodel.ConnectionException;
 import org.apache.metamodel.DataContext;
 
+/**
+ * Represents a factory of {@link DataContext} objects. Factories take
+ * {@link DataContextProperties} and turn them into active {@link DataContext}
+ * instances.
+ * 
+ * Multiple factories can exist in order to serve different kinds of properties,
+ * thereby offering a dynamic factory mechanism. The collection of factories is
+ * accessible via {@link DataContextFactoryRegistry}.
+ * 
+ * These factories are registered via the Java {@link ServiceLoader} SPI API. So
+ * add a file with path
+ * "/META-INF/services/org.apache.metamodel.factory.DataContextFactory" in any
+ * JAR file in order to register another factory.
+ */
 public interface DataContextFactory {
 
     public boolean accepts(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry);
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
new file mode 100644 (file)
index 0000000..94359c4
--- /dev/null
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import org.apache.metamodel.ConnectionException;
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.factory.DataContextFactory;
+import org.apache.metamodel.factory.DataContextProperties;
+import org.apache.metamodel.factory.ResourceFactoryRegistry;
+import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException;
+import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.ImmutableSettings.Builder;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+
+/**
+ * Factory for ElasticSearch data context of native type.
+ * 
+ * The factory will activate when DataContext type is specified as
+ * "elasticsearch", "es-node", "elasticsearch-node", "es-transport",
+ * "elasticsearch-transport".
+ * 
+ * This factory is configured with the following properties:
+ * 
+ * <ul>
+ * <li>clientType (needed if datacontext type is just "elasticsearch" - must be
+ * either "transport" or "node")</li>
+ * <li>hostname (if clientType is "transport")</li>
+ * <li>port (if clientType is "transport")</li>
+ * <li>database (index name)</li>
+ * <li>cluster</li>
+ * <li>username (optional, only available if clientType is "transport")</li>
+ * <li>password (optional, only available if clientType is "transport")</li>
+ * <li>ssl (optional, only available if clientType is "transport")</li>
+ * <li>keystorePath (optional, only available if clientType is "transport")</li>
+ * <li>keystorePassword (optional, only available if clientType is "transport")
+ * </li>
+ * </ul>
+ */
+public class ElasticSearchDataContextFactory implements DataContextFactory {
+
+    @Override
+    public boolean accepts(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) {
+        switch (properties.getDataContextType()) {
+        case "elasticsearch":
+        case "es-node":
+        case "elasticsearch-node":
+        case "es-transport":
+        case "elasticsearch-transport":
+            return acceptsInternal(properties);
+        }
+        return false;
+    }
+
+    private boolean acceptsInternal(DataContextProperties properties) {
+        final String clientType = getClientType(properties);
+        if (clientType == null) {
+            return false;
+        }
+        if (!"node".equals(clientType)) {
+            if (properties.getHostname() == null || properties.getPort() == null) {
+                return false;
+            }
+        }
+        if (getIndex(properties) == null) {
+            return false;
+        }
+        if (getCluster(properties) == null) {
+            return false;
+        }
+        return true;
+    }
+
+    private String getClientType(DataContextProperties properties) {
+        switch (properties.getDataContextType()) {
+        case "elasticsearch-node":
+        case "es-node":
+            return "node";
+        case "elasticsearch-transport":
+        case "es-transport":
+            return "transport";
+        }
+        final String clientType = (String) properties.toMap().get("clientType");
+        return clientType;
+    }
+
+    private String getIndex(DataContextProperties properties) {
+        final String databaseName = properties.getDatabaseName();
+        if (databaseName == null) {
+            return (String) properties.toMap().get("index");
+        }
+        return databaseName;
+    }
+
+    private String getCluster(DataContextProperties properties) {
+        return (String) properties.toMap().get("cluster");
+    }
+
+    @Override
+    public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry)
+            throws UnsupportedDataContextPropertiesException, ConnectionException {
+        final String clientType = getClientType(properties);
+        final Client client;
+        if ("node".equals(clientType)) {
+            client = createNodeClient(properties);
+        } else {
+            client = createTransportClient(properties);
+        }
+        final String indexName = getIndex(properties);
+        final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
+        return new ElasticSearchDataContext(client, indexName, tableDefinitions);
+    }
+
+    private Client createTransportClient(DataContextProperties properties) {
+        final Builder settingsBuilder = ImmutableSettings.builder();
+        settingsBuilder.put("name", "MetaModel");
+        settingsBuilder.put("cluster.name", getCluster(properties));
+        if (properties.getUsername() != null && properties.getPassword() != null) {
+            settingsBuilder.put("shield.user", properties.getUsername() + ":" + properties.getPassword());
+            if ("true".equals(properties.toMap().get("ssl"))) {
+                if (properties.toMap().get("keystorePath") != null) {
+                    settingsBuilder.put("shield.ssl.keystore.path", properties.toMap().get("keystorePath"));
+                    settingsBuilder.put("shield.ssl.keystore.password", properties.toMap().get("keystorePassword"));
+                }
+                settingsBuilder.put("shield.transport.ssl", "true");
+            }
+        }
+        final Settings settings = settingsBuilder.build();
+
+        final TransportClient client = new TransportClient(settings);
+        client.addTransportAddress(new InetSocketTransportAddress(properties.getHostname(), properties.getPort()));
+        return client;
+    }
+
+    private Client createNodeClient(DataContextProperties properties) {
+        final Builder settingsBuilder = ImmutableSettings.builder();
+        settingsBuilder.put("name", "MetaModel");
+        settingsBuilder.put("shield.enabled", false);
+        final Settings settings = settingsBuilder.build();
+        final Node node = NodeBuilder.nodeBuilder().clusterName(getCluster(properties)).client(true).settings(settings)
+                .node();
+        return node.client();
+    }
+}
diff --git a/elasticsearch/native/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory b/elasticsearch/native/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory
new file mode 100644 (file)
index 0000000..b33339b
--- /dev/null
@@ -0,0 +1 @@
+org.apache.metamodel.elasticsearch.nativeclient.ElasticSearchDataContextFactory
\ No newline at end of file
index c452d7b..b55db13 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.metamodel.elasticsearch.rest;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -83,7 +84,9 @@ import io.searchbox.params.Parameters;
  * This implementation supports either automatic discovery of a schema or manual
  * specification of a schema, through the {@link SimpleTableDef} class.
  */
-public class ElasticSearchRestDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext {
+public class ElasticSearchRestDataContext extends QueryPostprocessDataContext implements
+        DataContext,
+        UpdateableDataContext {
 
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataContext.class);
 
@@ -95,16 +98,17 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
     private final JestClient elasticSearchClient;
 
     private final String indexName;
-    // Table definitions that are set from the beginning, not supposed to be changed.
+    // Table definitions that are set from the beginning, not supposed to be
+    // changed.
     private final List<SimpleTableDef> staticTableDefinitions;
 
     // Table definitions that are discovered, these can change
     private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
 
     /**
-     * Constructs a {@link ElasticSearchRestDataContext}. This constructor accepts a
-     * custom array of {@link SimpleTableDef}s which allows the user to define
-     * his own view on the indexes in the engine.
+     * Constructs a {@link ElasticSearchRestDataContext}. This constructor
+     * accepts a custom array of {@link SimpleTableDef}s which allows the user
+     * to define his own view on the indexes in the engine.
      *
      * @param client
      *            the ElasticSearch client
@@ -123,13 +127,14 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
         }
         this.elasticSearchClient = client;
         this.indexName = indexName;
-        this.staticTableDefinitions = Arrays.asList(tableDefinitions);
+        this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections
+                .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions));
         this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
     }
 
     /**
-     * Constructs a {@link ElasticSearchRestDataContext} and automatically detects
-     * the schema structure/view on all indexes (see
+     * Constructs a {@link ElasticSearchRestDataContext} and automatically
+     * detects the schema structure/view on all indexes (see
      * {@link #detectTable(JsonObject, String)}).
      *
      * @param client
@@ -158,20 +163,20 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
             final GetMapping getMapping = new GetMapping.Builder().addIndex(indexName).build();
             jestResult = elasticSearchClient.execute(getMapping);
         } catch (Exception e) {
-            logger.error("Failed to retrieve mappings" , e);
+            logger.error("Failed to retrieve mappings", e);
             throw new MetaModelException("Failed to execute request for index information needed to detect schema", e);
         }
 
-        if(!jestResult.isSucceeded()){
+        if (!jestResult.isSucceeded()) {
             logger.error("Failed to retrieve mappings; {}", jestResult.getErrorMessage());
             throw new MetaModelException("Failed to retrieve mappings; " + jestResult.getErrorMessage());
         }
 
         final List<SimpleTableDef> result = new ArrayList<>();
 
-        final Set<Map.Entry<String, JsonElement>> mappings =
-                jestResult.getJsonObject().getAsJsonObject(indexName).getAsJsonObject("mappings").entrySet();
-        if(mappings.size() == 0){
+        final Set<Map.Entry<String, JsonElement>> mappings = jestResult.getJsonObject().getAsJsonObject(indexName)
+                .getAsJsonObject("mappings").entrySet();
+        if (mappings.size() == 0) {
             logger.warn("No metadata returned for index name '{}' - no tables will be detected.");
         } else {
 
@@ -179,7 +184,8 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
                 final String documentType = entry.getKey();
 
                 try {
-                    final SimpleTableDef table = detectTable(entry.getValue().getAsJsonObject().get("properties").getAsJsonObject(), documentType);
+                    final SimpleTableDef table = detectTable(entry.getValue().getAsJsonObject().get("properties")
+                            .getAsJsonObject(), documentType);
                     result.add(table);
                 } catch (Exception e) {
                     logger.error("Unexpected error during detectTable for document type '{}'", documentType, e);
@@ -199,8 +205,8 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
 
     /**
      * Performs an analysis of an available index type in an ElasticSearch
-     * {@link JestClient} client and tries to detect the index structure based on
-     * the metadata provided by the java client.
+     * {@link JestClient} client and tries to detect the index structure based
+     * on the metadata provided by the java client.
      *
      * @param metadataProperties
      *            the ElasticSearch mapping
@@ -210,8 +216,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
      */
     private static SimpleTableDef detectTable(JsonObject metadataProperties, String documentType) {
         final ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser.parse(metadataProperties);
-        return new SimpleTableDef(documentType, metaData.getColumnNames(),
-                metaData.getColumnTypes());
+        return new SimpleTableDef(documentType, metaData.getColumnNames(), metaData.getColumnTypes());
     }
 
     @Override
@@ -253,10 +258,10 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
     }
 
     @Override
-    protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
-            List<FilterItem> whereItems, int firstRow, int maxRows) {
-        final QueryBuilder queryBuilder = ElasticSearchUtils
-                .createQueryBuilderForSimpleWhere(whereItems, LogicalOperator.AND);
+    protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, List<FilterItem> whereItems,
+            int firstRow, int maxRows) {
+        final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
+                LogicalOperator.AND);
         if (queryBuilder != null) {
             // where clause can be pushed down to an ElasticSearch query
             SearchSourceBuilder searchSourceBuilder = createSearchRequest(firstRow, maxRows, queryBuilder);
@@ -268,8 +273,9 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
     }
 
     private SearchResult executeSearch(Table table, SearchSourceBuilder searchSourceBuilder, boolean scroll) {
-        Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType(table.getName());
-        if(scroll){
+        Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType(
+                table.getName());
+        if (scroll) {
             builder.setParameter(Parameters.SCROLL, TIMEOUT_SCROLL);
         }
 
@@ -277,7 +283,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
         SearchResult result;
         try {
             result = elasticSearchClient.execute(search);
-        } catch (Exception e){
+        } catch (Exception e) {
             logger.warn("Could not execute ElasticSearch query", e);
             throw new MetaModelException("Could not execute ElasticSearch query", e);
         }
@@ -286,7 +292,8 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
 
     @Override
     protected DataSet materializeMainSchemaTable(Table table, Column[] columns, int maxRows) {
-        SearchResult searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), limitMaxRowsIsSet(maxRows));
+        SearchResult searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), limitMaxRowsIsSet(
+                maxRows));
 
         return new JestElasticSearchDataSet(elasticSearchClient, searchResult, columns);
     }
@@ -341,7 +348,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
         CountResult countResult;
         try {
             countResult = elasticSearchClient.execute(count);
-        } catch (Exception e){
+        } catch (Exception e) {
             logger.warn("Could not execute ElasticSearch get query", e);
             throw new MetaModelException("Could not execute ElasticSearch get query", e);
         }
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
new file mode 100644 (file)
index 0000000..b2dc4c3
--- /dev/null
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import org.apache.metamodel.ConnectionException;
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.factory.DataContextFactory;
+import org.apache.metamodel.factory.DataContextProperties;
+import org.apache.metamodel.factory.ResourceFactoryRegistry;
+import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException;
+import org.apache.metamodel.util.SimpleTableDef;
+
+import io.searchbox.client.JestClient;
+import io.searchbox.client.JestClientFactory;
+import io.searchbox.client.config.HttpClientConfig;
+
+/**
+ * Factory for ElasticSearch data context of REST type.
+ * 
+ * The factory will activate when DataContext type is specified as
+ * "elasticsearch", "es-rest" or "elasticsearch-rest".
+ * 
+ * This factory is configured with the following properties:
+ * 
+ * <ul>
+ * <li>url (http or https based base URL of elasticsearch)</li>
+ * <li>database (index name)</li>
+ * <li>username (optional)</li>
+ * <li>password (optional)</li>
+ * </ul>
+ */
+public class ElasticSearchRestDataContextFactory implements DataContextFactory {
+
+    @Override
+    public boolean accepts(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) {
+        switch (properties.getDataContextType()) {
+        case "elasticsearch":
+            // ensure that the url is http or https based to infer that this is
+            // a REST based connection
+            final String url = properties.getUrl();
+            return url != null && url.startsWith("http") && acceptsInternal(properties);
+        case "es-rest":
+        case "elasticsearch-rest":
+            return acceptsInternal(properties);
+        }
+        return false;
+    }
+
+    private boolean acceptsInternal(DataContextProperties properties) {
+        if (properties.getUrl() == null) {
+            return false;
+        }
+        if (getIndex(properties) == null) {
+            return false;
+        }
+        return true;
+    }
+
+    private JestClient createClient(DataContextProperties properties) {
+        final String serverUri = properties.getUrl();
+        final HttpClientConfig.Builder builder = new HttpClientConfig.Builder(serverUri);
+        if (properties.getUsername() != null) {
+            builder.defaultCredentials(properties.getUsername(), properties.getPassword());
+        }
+
+        final JestClientFactory clientFactory = new JestClientFactory();
+        final HttpClientConfig httpClientConfig = new HttpClientConfig(builder);
+        clientFactory.setHttpClientConfig(httpClientConfig);
+        final JestClient client = clientFactory.getObject();
+        return client;
+    }
+
+    private String getIndex(DataContextProperties properties) {
+        final String databaseName = properties.getDatabaseName();
+        if (databaseName == null) {
+            properties.toMap().get("index");
+        }
+        return databaseName;
+    }
+
+    @Override
+    public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry)
+            throws UnsupportedDataContextPropertiesException, ConnectionException {
+        final JestClient client = createClient(properties);
+        final String indexName = getIndex(properties);
+        final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
+        return new ElasticSearchRestDataContext(client, indexName, tableDefinitions);
+    }
+
+}
diff --git a/elasticsearch/rest/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory b/elasticsearch/rest/src/main/resources/META-INF/services/org.apache.metamodel.factory.DataContextFactory
new file mode 100644 (file)
index 0000000..a8924c4
--- /dev/null
@@ -0,0 +1 @@
+org.apache.metamodel.elasticsearch.rest.ElasticSearchRestDataContextFactory
\ No newline at end of file