METAMODEL-1136: New Amazon DynamoDB connector
authorKasper Sørensen <i.am.kasper.sorensen@gmail.com>
Sat, 28 Jan 2017 18:54:35 +0000 (10:54 -0800)
committerKasper Sørensen <i.am.kasper.sorensen@gmail.com>
Sat, 28 Jan 2017 18:54:35 +0000 (10:54 -0800)
Closes #140

13 files changed:
CHANGES.md
dynamodb/.gitignore [new file with mode: 0644]
dynamodb/pom.xml [new file with mode: 0644]
dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbDataContext.java [new file with mode: 0644]
dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbDataSet.java [new file with mode: 0644]
dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbRowInsertionBuilder.java [new file with mode: 0644]
dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbTableCreationBuilder.java [new file with mode: 0644]
dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbTableDropBuilder.java [new file with mode: 0644]
dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbUpdateCallback.java [new file with mode: 0644]
dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbUtils.java [new file with mode: 0644]
dynamodb/src/test/java/org/apache/metamodel/dynamodb/DynamoDbDataContextIntegrationTest.java [new file with mode: 0644]
example-metamodel-integrationtest-configuration.properties
pom.xml

index 475de9f..6ffda44 100644 (file)
@@ -1,5 +1,6 @@
 ### Apache MetaModel [wip]
 
+ * [METAMODEL-1136] - New connector for Amazon DynamoDB.
  * [METAMODEL-1134] - Added NOT IN and NOT LIKE operators to WHERE filters.
 
 ### Apache MetaModel 4.5.5
diff --git a/dynamodb/.gitignore b/dynamodb/.gitignore
new file mode 100644 (file)
index 0000000..4e247ee
--- /dev/null
@@ -0,0 +1,4 @@
+/.settings
+/target
+/.classpath
+/.project
diff --git a/dynamodb/pom.xml b/dynamodb/pom.xml
new file mode 100644 (file)
index 0000000..546a13b
--- /dev/null
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+       <parent>
+               <artifactId>MetaModel</artifactId>
+               <groupId>org.apache.metamodel</groupId>
+               <version>4.5.6-SNAPSHOT</version>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+       <artifactId>MetaModel-dynamodb</artifactId>
+       <name>MetaModel module for Amazon AWS DynamoDB.</name>
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.metamodel</groupId>
+                       <artifactId>MetaModel-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>com.amazonaws</groupId>
+                       <artifactId>aws-java-sdk-dynamodb</artifactId>
+                       <version>1.11.81</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>commons-logging</groupId>
+                                       <artifactId>commons-logging</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>jcl-over-slf4j</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-nop</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>
diff --git a/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbDataContext.java b/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbDataContext.java
new file mode 100644 (file)
index 0000000..5f95219
--- /dev/null
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.dynamodb;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.QueryPostprocessDataContext;
+import org.apache.metamodel.UpdateScript;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.DefaultRow;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.MutableColumn;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.util.SimpleTableDef;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
+import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
+import com.amazonaws.services.dynamodbv2.model.GetItemResult;
+import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndexDescription;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.ListTablesResult;
+import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndexDescription;
+import com.amazonaws.services.dynamodbv2.model.ScanRequest;
+import com.amazonaws.services.dynamodbv2.model.ScanResult;
+import com.amazonaws.services.dynamodbv2.model.TableDescription;
+
+/**
+ * DataContext implementation for Amazon DynamoDB.
+ */
+public class DynamoDbDataContext extends QueryPostprocessDataContext implements UpdateableDataContext, Closeable {
+
+    /**
+     * System property key used for getting the read throughput capacity when
+     * creating new tables. Defaults to 5.
+     */
+    public static final String SYSTEM_PROPERTY_THROUGHPUT_READ_CAPACITY = "metamodel.dynamodb.throughput.capacity.read";
+
+    /**
+     * System property key used for getting the write throughput capacity when
+     * creating new tables. Defaults to 5.
+     */
+    public static final String SYSTEM_PROPERTY_THROUGHPUT_WRITE_CAPACITY = "metamodel.dynamodb.throughput.capacity.write";
+
+    /**
+     * The artificial schema name used by this DataContext.
+     */
+    public static final String SCHEMA_NAME = "public";
+
+    private final AmazonDynamoDB _dynamoDb;
+    private final boolean _shutdownOnClose;
+    private final SimpleTableDef[] _tableDefs;
+
+    public DynamoDbDataContext() {
+        this(AmazonDynamoDBClientBuilder.defaultClient(), null, true);
+    }
+
+    public DynamoDbDataContext(SimpleTableDef[] tableDefs) {
+        this(AmazonDynamoDBClientBuilder.defaultClient(), tableDefs, true);
+    }
+
+    public DynamoDbDataContext(AmazonDynamoDB client) {
+        this(client, null, false);
+    }
+
+    public DynamoDbDataContext(AmazonDynamoDB client, SimpleTableDef[] tableDefs) {
+        this(client, tableDefs, false);
+    }
+
+    private DynamoDbDataContext(AmazonDynamoDB client, SimpleTableDef[] tableDefs, boolean shutdownOnClose) {
+        _dynamoDb = client;
+        _tableDefs = (tableDefs == null ? new SimpleTableDef[0] : tableDefs);
+        _shutdownOnClose = shutdownOnClose;
+    }
+
+    public AmazonDynamoDB getDynamoDb() {
+        return _dynamoDb;
+    }
+
+    @Override
+    public void close() {
+        if (_shutdownOnClose) {
+            _dynamoDb.shutdown();
+        }
+    }
+
+    @Override
+    protected Schema getMainSchema() throws MetaModelException {
+        final Map<String, SimpleTableDef> tableDefs = new HashMap<>();
+        for (final SimpleTableDef tableDef : _tableDefs) {
+            tableDefs.put(tableDef.getName(), tableDef);
+        }
+
+        final MutableSchema schema = new MutableSchema(getMainSchemaName());
+        final ListTablesResult tables = _dynamoDb.listTables();
+        final List<String> tableNames = tables.getTableNames();
+        for (final String tableName : tableNames) {
+            final MutableTable table = new MutableTable(tableName, schema);
+            schema.addTable(table);
+
+            final DescribeTableResult descripeTableResult = _dynamoDb.describeTable(tableName);
+            final TableDescription tableDescription = descripeTableResult.getTable();
+
+            // add primary keys
+            addColumnFromKeySchema("Primary index", tableDescription.getKeySchema(), table, true);
+
+            // add attributes from global and local indices
+            final List<GlobalSecondaryIndexDescription> globalSecondaryIndexes = tableDescription
+                    .getGlobalSecondaryIndexes();
+            if (globalSecondaryIndexes != null) {
+                for (final GlobalSecondaryIndexDescription globalSecondaryIndex : globalSecondaryIndexes) {
+                    addColumnFromKeySchema(globalSecondaryIndex.getIndexName(), globalSecondaryIndex.getKeySchema(),
+                            table, false);
+                }
+            }
+            final List<LocalSecondaryIndexDescription> localSecondaryIndexes = tableDescription
+                    .getLocalSecondaryIndexes();
+            if (localSecondaryIndexes != null) {
+                for (final LocalSecondaryIndexDescription localSecondaryIndex : localSecondaryIndexes) {
+                    addColumnFromKeySchema(localSecondaryIndex.getIndexName(), localSecondaryIndex.getKeySchema(),
+                            table, false);
+                }
+            }
+
+            // add top-level attribute definitions
+            final List<AttributeDefinition> attributeDefinitions = tableDescription.getAttributeDefinitions();
+            for (final AttributeDefinition attributeDefinition : attributeDefinitions) {
+                final String attributeName = attributeDefinition.getAttributeName();
+                MutableColumn column = (MutableColumn) table.getColumnByName(attributeName);
+                if (column == null) {
+                    column = new MutableColumn(attributeName, table);
+                    table.addColumn(column);
+                }
+                final String attributeType = attributeDefinition.getAttributeType();
+                column.setType(DynamoDbUtils.toColumnType(attributeName, attributeType));
+                column.setIndexed(true);
+                column.setNativeType(attributeType);
+            }
+
+            // add additional metadata from SimpleTableDefs if available
+            final SimpleTableDef tableDef = tableDefs.get(tableName);
+            if (tableDef != null) {
+                final String[] columnNames = tableDef.getColumnNames();
+                final ColumnType[] columnTypes = tableDef.getColumnTypes();
+                for (int i = 0; i < columnNames.length; i++) {
+                    final String columnName = columnNames[i];
+                    final ColumnType columnType = columnTypes[i];
+                    MutableColumn column = (MutableColumn) table.getColumnByName(columnName);
+                    if (column == null) {
+                        column = new MutableColumn(columnName, table);
+                        table.addColumn(column);
+                    }
+                    if (column.getType() == null && columnType != null) {
+                        column.setType(columnType);
+                    }
+                }
+            }
+
+            // add additional attributes based on global and local indices
+            if (globalSecondaryIndexes != null) {
+                for (final GlobalSecondaryIndexDescription globalSecondaryIndex : globalSecondaryIndexes) {
+                    final List<String> nonKeyAttributes = globalSecondaryIndex.getProjection().getNonKeyAttributes();
+                    for (final String attributeName : nonKeyAttributes) {
+                        addColumnFromNonKeyAttribute(globalSecondaryIndex.getIndexName(), table, attributeName);
+                    }
+                }
+            }
+            if (localSecondaryIndexes != null) {
+                for (final LocalSecondaryIndexDescription localSecondaryIndex : localSecondaryIndexes) {
+                    final List<String> nonKeyAttributes = localSecondaryIndex.getProjection().getNonKeyAttributes();
+                    for (final String attributeName : nonKeyAttributes) {
+                        addColumnFromNonKeyAttribute(localSecondaryIndex.getIndexName(), table, attributeName);
+                    }
+                }
+            }
+        }
+        return schema;
+    }
+
+    private void addColumnFromNonKeyAttribute(String indexName, MutableTable table, String attributeName) {
+        MutableColumn column = (MutableColumn) table.getColumnByName(attributeName);
+        if (column == null) {
+            column = new MutableColumn(attributeName, table);
+            table.addColumn(column);
+        }
+        appendRemarks(column, indexName + " non-key attribute");
+    }
+
+    private void addColumnFromKeySchema(String indexName, List<KeySchemaElement> keySchema, MutableTable table,
+            boolean primaryKey) {
+        for (final KeySchemaElement keySchemaElement : keySchema) {
+            final String attributeName = keySchemaElement.getAttributeName();
+            if (table.getColumnByName(attributeName) == null) {
+                final String keyType = keySchemaElement.getKeyType();
+                final MutableColumn column = new MutableColumn(attributeName, table).setPrimaryKey(primaryKey);
+                appendRemarks(column, indexName + " member ('" + keyType + "' type)");
+                table.addColumn(column);
+            }
+        }
+    }
+
+    private static void appendRemarks(MutableColumn column, String remarks) {
+        final String existingRemarks = column.getRemarks();
+        if (existingRemarks == null) {
+            column.setRemarks(remarks);
+        } else {
+            column.setRemarks(existingRemarks + ", " + remarks);
+        }
+    }
+
+    @Override
+    protected String getMainSchemaName() throws MetaModelException {
+        return SCHEMA_NAME;
+    }
+
+    @Override
+    protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
+        if (!whereItems.isEmpty()) {
+            return null;
+        }
+        return _dynamoDb.describeTable(table.getName()).getTable().getItemCount();
+    }
+
+    @Override
+    protected DataSet materializeMainSchemaTable(Table table, Column[] columns, int maxRows) {
+        final List<String> attributeNames = new ArrayList<>(columns.length);
+        for (final Column column : columns) {
+            attributeNames.add(column.getName());
+        }
+        final ScanRequest scanRequest = new ScanRequest(table.getName());
+        scanRequest.setAttributesToGet(attributeNames);
+        if (maxRows > 0) {
+            scanRequest.setLimit(maxRows);
+        }
+        final ScanResult result = _dynamoDb.scan(scanRequest);
+        return new DynamoDbDataSet(columns, result);
+    }
+
+    @Override
+    protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn,
+            Object keyValue) {
+        final List<String> attributeNames = new ArrayList<>();
+        for (SelectItem selectItem : selectItems) {
+            attributeNames.add(selectItem.getColumn().getName());
+        }
+
+        final GetItemRequest getItemRequest = new GetItemRequest(table.getName(), Collections.singletonMap(
+                primaryKeyColumn.getName(), DynamoDbUtils.toAttributeValue(keyValue))).withAttributesToGet(
+                        attributeNames);
+        final GetItemResult item = _dynamoDb.getItem(getItemRequest);
+
+        final Object[] values = new Object[selectItems.size()];
+        for (int i = 0; i < values.length; i++) {
+            final AttributeValue attributeValue = item.getItem().get(attributeNames.get(i));
+            values[i] = DynamoDbUtils.toValue(attributeValue);
+        }
+
+        return new DefaultRow(new SimpleDataSetHeader(selectItems), values);
+    }
+
+    @Override
+    public void executeUpdate(UpdateScript update) {
+        final DynamoDbUpdateCallback callback = new DynamoDbUpdateCallback(this);
+        try {
+            update.run(callback);
+        } finally {
+            if (callback.isInterrupted()) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+}
diff --git a/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbDataSet.java b/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbDataSet.java
new file mode 100644 (file)
index 0000000..17b781b
--- /dev/null
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.dynamodb;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.metamodel.data.AbstractDataSet;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.DefaultRow;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.schema.Column;
+
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.ScanResult;
+
+final class DynamoDbDataSet extends AbstractDataSet {
+
+    private final Iterator<Map<String, AttributeValue>> _iterator;
+    private Map<String, AttributeValue> _currentItem;
+
+    public DynamoDbDataSet(Column[] columns, ScanResult result) {
+        super(columns);
+        _iterator = result.getItems().iterator();
+    }
+
+    @Override
+    public boolean next() {
+        final boolean hasNext = _iterator.hasNext();
+        if (hasNext) {
+            _currentItem = _iterator.next();
+            return true;
+        }
+        _currentItem = null;
+        return false;
+    }
+
+    @Override
+    public Row getRow() {
+        if (_currentItem == null) {
+            return null;
+        }
+        final DataSetHeader header = getHeader();
+        final Object[] values = new Object[header.size()];
+        for (int i = 0; i < values.length; i++) {
+            final AttributeValue attributeValue = _currentItem.get(header.getSelectItem(i).getColumn().getName());
+            values[i] = DynamoDbUtils.toValue(attributeValue);
+        }
+        final Row row = new DefaultRow(header, values);
+        return row;
+    }
+}
diff --git a/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbRowInsertionBuilder.java b/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbRowInsertionBuilder.java
new file mode 100644 (file)
index 0000000..c97f815
--- /dev/null
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.dynamodb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+
+final class DynamoDbRowInsertionBuilder extends AbstractRowInsertionBuilder<DynamoDbUpdateCallback> {
+
+    public DynamoDbRowInsertionBuilder(DynamoDbUpdateCallback updateCallback, Table table) {
+        super(updateCallback, table);
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+        final Map<String, AttributeValue> itemValues = new HashMap<>();
+        final Column[] columns = getColumns();
+        final Object[] values = getValues();
+        for (int i = 0; i < columns.length; i++) {
+            final Column column = columns[i];
+            final Object value = values[i];
+            if (column.isPrimaryKey() && value == null) {
+                throw new IllegalArgumentException("Value for '" + column.getName() + "' cannot be null");
+            }
+
+            final AttributeValue attributeValue = DynamoDbUtils.toAttributeValue(value);
+            itemValues.put(column.getName(), attributeValue);
+        }
+
+        final AmazonDynamoDB dynamoDb = getUpdateCallback().getDataContext().getDynamoDb();
+        dynamoDb.putItem(getTable().getName(), itemValues);
+    }
+}
diff --git a/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbTableCreationBuilder.java b/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbTableCreationBuilder.java
new file mode 100644 (file)
index 0000000..c9779df
--- /dev/null
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.dynamodb;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.create.AbstractTableCreationBuilder;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.CreateTableResult;
+import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.TableStatus;
+
+class DynamoDbTableCreationBuilder extends AbstractTableCreationBuilder<DynamoDbUpdateCallback> {
+
+    private static final Logger logger = LoggerFactory.getLogger(DynamoDbTableCreationBuilder.class);
+
+    public DynamoDbTableCreationBuilder(DynamoDbUpdateCallback updateCallback, Schema schema, String name) {
+        super(updateCallback, schema, name);
+    }
+
+    @Override
+    public Table execute() throws MetaModelException {
+        final MutableTable table = getTable();
+        final String tableName = table.getName();
+
+        final Collection<AttributeDefinition> attributes = new ArrayList<>();
+        final Collection<KeySchemaElement> keySchema = new ArrayList<>();
+        final Collection<GlobalSecondaryIndex> globalSecondaryIndices = new ArrayList<>();
+
+        final long readCapacity = Long.parseLong(System.getProperty(
+                DynamoDbDataContext.SYSTEM_PROPERTY_THROUGHPUT_READ_CAPACITY, "5"));
+        final long writeCapacity = Long.parseLong(System.getProperty(
+                DynamoDbDataContext.SYSTEM_PROPERTY_THROUGHPUT_WRITE_CAPACITY, "5"));
+        final ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput(readCapacity, writeCapacity);
+
+        final Column[] columns = table.getColumns();
+        for (Column column : columns) {
+            if (column.isPrimaryKey()) {
+                final KeyType keyType = getKeyType(column.getRemarks());
+                keySchema.add(new KeySchemaElement(column.getName(), keyType));
+                attributes.add(new AttributeDefinition(column.getName(), DynamoDbUtils.toAttributeType(column
+                        .getType())));
+            }
+        }
+
+        final CreateTableRequest createTableRequest = new CreateTableRequest();
+        createTableRequest.setTableName(tableName);
+        createTableRequest.setAttributeDefinitions(attributes);
+        createTableRequest.setGlobalSecondaryIndexes(globalSecondaryIndices);
+        createTableRequest.setKeySchema(keySchema);
+        createTableRequest.setProvisionedThroughput(provisionedThroughput);
+
+        final AmazonDynamoDB client = getUpdateCallback().getDataContext().getDynamoDb();
+
+        final CreateTableResult createTableResult = client.createTable(createTableRequest);
+
+        // await the table creation to be "done".
+        {
+            String tableStatus = createTableResult.getTableDescription().getTableStatus();
+            while (TableStatus.CREATING.name().equals(tableStatus)) {
+                logger.debug("Waiting for table status to be ACTIVE. Currently: {}", tableStatus);
+                try {
+                    Thread.sleep(300);
+                } catch (InterruptedException e) {
+                    getUpdateCallback().setInterrupted(true);
+                }
+                tableStatus = client.describeTable(tableName).getTable().getTableStatus();
+            }
+        }
+
+        return table;
+    }
+
+    private KeyType getKeyType(String remarks) {
+        if ("RANGE".equals(remarks)) {
+            // default
+            return KeyType.RANGE;
+        }
+        return KeyType.HASH;
+    }
+
+}
diff --git a/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbTableDropBuilder.java b/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbTableDropBuilder.java
new file mode 100644 (file)
index 0000000..a83682f
--- /dev/null
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.dynamodb;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.drop.AbstractTableDropBuilder;
+import org.apache.metamodel.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.dynamodbv2.model.DeleteTableResult;
+
+final class DynamoDbTableDropBuilder extends AbstractTableDropBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(DynamoDbTableDropBuilder.class);
+    private final DynamoDbDataContext _dataContext;
+
+    public DynamoDbTableDropBuilder(Table table, DynamoDbDataContext dataContext) {
+        super(table);
+        _dataContext = dataContext;
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+        final String tableName = getTable().getName();
+        final DeleteTableResult result = _dataContext.getDynamoDb().deleteTable(tableName);
+        logger.debug("Dropped table {} in request ID: {}", tableName, result.getSdkResponseMetadata().getRequestId());
+    }
+
+}
diff --git a/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbUpdateCallback.java b/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbUpdateCallback.java
new file mode 100644 (file)
index 0000000..8efd01b
--- /dev/null
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.dynamodb;
+
+import org.apache.metamodel.AbstractUpdateCallback;
+import org.apache.metamodel.create.TableCreationBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.insert.RowInsertionBuilder;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+
+final class DynamoDbUpdateCallback extends AbstractUpdateCallback {
+    
+    private boolean interrupted = false;
+
+    public DynamoDbUpdateCallback(DynamoDbDataContext dataContext) {
+        super(dataContext);
+    }
+    
+    public boolean isInterrupted() {
+        return interrupted;
+    }
+    
+    public void setInterrupted(boolean interrupted) {
+        this.interrupted = interrupted;
+    }
+
+    @Override
+    public DynamoDbDataContext getDataContext() {
+        return (DynamoDbDataContext) super.getDataContext();
+    }
+
+    @Override
+    public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,
+            IllegalStateException {
+        return new DynamoDbTableCreationBuilder(this, schema, name);
+    }
+
+    @Override
+    public boolean isDropTableSupported() {
+        return true;
+    }
+
+    @Override
+    public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new DynamoDbTableDropBuilder(table, getDataContext());
+    }
+
+    @Override
+    public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new DynamoDbRowInsertionBuilder(this, table);
+    }
+
+    @Override
+    public boolean isDeleteSupported() {
+        return false;
+    }
+
+    @Override
+    public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        // This could be implemented ...
+        throw new UnsupportedOperationException();
+    }
+
+}
diff --git a/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbUtils.java b/dynamodb/src/main/java/org/apache/metamodel/dynamodb/DynamoDbUtils.java
new file mode 100644 (file)
index 0000000..e49636b
--- /dev/null
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.dynamodb;
+
+import org.apache.metamodel.schema.ColumnType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+
+final class DynamoDbUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(DynamoDbUtils.class);
+
+    // prevent instantiation
+    private DynamoDbUtils() {
+    }
+
+    public static AttributeValue toAttributeValue(Object value) {
+        if (value instanceof AttributeValue) {
+            return (AttributeValue) value;
+        }
+        final AttributeValue attributeValue = new AttributeValue();
+        if (value == null) {
+            attributeValue.setNULL(true);
+        } else if (value instanceof Number) {
+            attributeValue.setN(value.toString());
+        } else if (value instanceof Boolean) {
+            attributeValue.setBOOL((Boolean) value);
+        } else if (value instanceof String) {
+            attributeValue.setS(value.toString());
+        } else {
+            // TODO: Actually there's a few more value types we should support,
+            // see AttributeValue.setXxxx
+            throw new UnsupportedOperationException("Unsupported value type: " + value);
+        }
+        return attributeValue;
+    }
+
+    public static ScalarAttributeType toAttributeType(ColumnType type) {
+        if (type == null) {
+            return ScalarAttributeType.S;
+        }
+        if (type.isBinary()) {
+            return ScalarAttributeType.B;
+        }
+        if (type.isNumber()) {
+            return ScalarAttributeType.S;
+        }
+        // default to string
+        return ScalarAttributeType.S;
+    }
+
+    public static ColumnType toColumnType(String attributeName, String attributeType) {
+        if (attributeType == null) {
+            return null;
+        }
+        switch (attributeType) {
+        case "S":
+            return ColumnType.STRING;
+        case "N":
+            return ColumnType.NUMBER;
+        case "B":
+            return ColumnType.BINARY;
+        }
+        logger.warn("Unexpected attribute type '{}' for attribute: {}", attributeType, attributeName);
+        return null;
+    }
+
+    public static Object toValue(AttributeValue a) {
+        if (a == null || Boolean.TRUE == a.isNULL()) {
+            return null;
+        }
+        // dynamo is a bit funky this way ... it has a getter for each possible
+        // data type.
+        return firstNonNull(a.getB(), a.getBOOL(), a.getBS(), a.getL(), a.getM(), a.getN(), a.getNS(), a.getS(), a
+                .getSS());
+    }
+
+    private static Object firstNonNull(Object... objects) {
+        for (Object object : objects) {
+            if (object != null) {
+                return object;
+            }
+        }
+        return null;
+    }
+}
diff --git a/dynamodb/src/test/java/org/apache/metamodel/dynamodb/DynamoDbDataContextIntegrationTest.java b/dynamodb/src/test/java/org/apache/metamodel/dynamodb/DynamoDbDataContextIntegrationTest.java
new file mode 100644 (file)
index 0000000..967655d
--- /dev/null
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.dynamodb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.InMemoryDataSet;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.util.SimpleTableDef;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.CreateTableResult;
+import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.Projection;
+import com.amazonaws.services.dynamodbv2.model.ProjectionType;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+
+public class DynamoDbDataContextIntegrationTest {
+
+    private AmazonDynamoDB client;
+
+    @Before
+    public void before() throws Exception {
+        final String userHome = System.getProperty("user.home");
+        final String propertiesFilename = userHome + "/metamodel-integrationtest-configuration.properties";
+        final File file = new File(propertiesFilename);
+
+        Assume.assumeTrue(file.exists());
+
+        final Properties props = new Properties();
+        props.load(new FileReader(file));
+
+        final String accessKey = props.getProperty("dynamodb.accessKey");
+        final String secretKey = props.getProperty("dynamodb.secretKey");
+
+        Assume.assumeNotNull(accessKey, secretKey);
+
+        final Regions region = Regions.fromName(props.getProperty("dynamodb.region", Regions.DEFAULT_REGION.getName()));
+
+        final AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
+        final AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
+        client = AmazonDynamoDBClientBuilder.standard().withRegion(region).withCredentials(credentialsProvider).build();
+    }
+
+    @After
+    public void after() {
+        if (client != null) {
+            client.shutdown();
+        }
+    }
+
+    @Test
+    public void testScenario1() throws Exception {
+        final String tableName = "MetaModel-" + UUID.randomUUID().toString();
+
+        final ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput(5l, 5l);
+
+        final Collection<AttributeDefinition> attributes = new ArrayList<>();
+        attributes.add(new AttributeDefinition("id", ScalarAttributeType.S));
+        attributes.add(new AttributeDefinition("counter", ScalarAttributeType.N));
+
+        final Collection<KeySchemaElement> keySchema = new ArrayList<>();
+        keySchema.add(new KeySchemaElement("id", KeyType.HASH));
+
+        // CreateDateIndex
+        final GlobalSecondaryIndex globalSecondaryIndex = new GlobalSecondaryIndex().withIndexName("counter_index")
+                .withProvisionedThroughput(provisionedThroughput).withKeySchema(new KeySchemaElement()
+                        .withAttributeName("counter").withKeyType(KeyType.HASH)).withProjection(new Projection()
+                                .withProjectionType(ProjectionType.INCLUDE).withNonKeyAttributes("foundation"));
+
+        final CreateTableRequest createTableRequest = new CreateTableRequest();
+        createTableRequest.setTableName(tableName);
+        createTableRequest.setAttributeDefinitions(attributes);
+        createTableRequest.setGlobalSecondaryIndexes(Arrays.asList(globalSecondaryIndex));
+        createTableRequest.setKeySchema(keySchema);
+        createTableRequest.setProvisionedThroughput(provisionedThroughput);
+        final CreateTableResult createTableResult = client.createTable(createTableRequest);
+
+        // await the table creation to be "done".
+        {
+            String tableStatus = createTableResult.getTableDescription().getTableStatus();
+            while (!"ACTIVE".equals(tableStatus)) {
+                System.out.println("Waiting for table status to be ACTIVE. Currently: " + tableStatus);
+                Thread.sleep(300);
+                tableStatus = client.describeTable(tableName).getTable().getTableStatus();
+            }
+        }
+
+        client.putItem(tableName, createItem("id", "foo", "counter", 1, "foundation", "Apache"));
+        client.putItem(tableName, createItem("id", "bar", "counter", 2, "project", "MetaModel"));
+        client.putItem(tableName, createItem("id", "baz", "counter", 3, "url", "http://metamodel.apache.org"));
+
+        final SimpleTableDef[] tableDefs = new SimpleTableDef[] { new SimpleTableDef(tableName, new String[] {
+                "counter", "project" }, new ColumnType[] { ColumnType.INTEGER, ColumnType.STRING }) };
+
+        try {
+            try (final DynamoDbDataContext dc = new DynamoDbDataContext(client, tableDefs)) {
+
+                final Table table = dc.getTableByQualifiedLabel(tableName);
+                assertEquals(tableName, table.getName());
+
+                // Right now we can only discover indexed columns
+                assertEquals("[id, counter, project, foundation]", Arrays.toString(table.getColumnNames()));
+
+                final Column idColumn = table.getColumnByName("id");
+                assertEquals(true, idColumn.isPrimaryKey());
+                assertEquals(true, idColumn.isIndexed());
+                assertEquals(ColumnType.STRING, idColumn.getType());
+                assertEquals("Primary index member ('HASH' type)", idColumn.getRemarks());
+
+                final Column counterColumn = table.getColumnByName("counter");
+                assertEquals(ColumnType.NUMBER, counterColumn.getType());
+                assertEquals(true, counterColumn.isIndexed());
+                assertEquals(false, counterColumn.isPrimaryKey());
+                assertEquals("counter_index member ('HASH' type)", counterColumn.getRemarks());
+
+                final Column projectColumn = table.getColumnByName("project");
+                assertEquals(ColumnType.STRING, projectColumn.getType());
+                assertEquals(false, projectColumn.isIndexed());
+                assertEquals(false, projectColumn.isPrimaryKey());
+                assertEquals(null, projectColumn.getRemarks());
+
+                final Column foundationColumn = table.getColumnByName("foundation");
+                assertEquals(null, foundationColumn.getType());
+                assertEquals(false, foundationColumn.isIndexed());
+                assertEquals(false, foundationColumn.isPrimaryKey());
+                assertEquals("counter_index non-key attribute", foundationColumn.getRemarks());
+
+                try (final DataSet dataSet = dc.query().from(table).select("id", "counter", "project").orderBy("id")
+                        .execute()) {
+                    assertTrue(dataSet.next());
+                    assertEquals("Row[values=[bar, 2, MetaModel]]", dataSet.getRow().toString());
+                    assertTrue(dataSet.next());
+                    assertEquals("Row[values=[baz, 3, null]]", dataSet.getRow().toString());
+                    assertTrue(dataSet.next());
+                    assertEquals("Row[values=[foo, 1, null]]", dataSet.getRow().toString());
+                    assertFalse(dataSet.next());
+                }
+
+                try (final DataSet dataSet = dc.query().from(tableName).select("counter", "project").where("id").eq(
+                        "baz").execute()) {
+                    assertTrue(dataSet instanceof InMemoryDataSet);
+
+                    assertTrue(dataSet.next());
+                    assertEquals("Row[values=[3, null]]", dataSet.getRow().toString());
+                    assertFalse(dataSet.next());
+                }
+            }
+        } finally {
+            client.deleteTable(tableName);
+        }
+    }
+
+    // convenience method
+    private Map<String, AttributeValue> createItem(Object... keyAndValues) {
+        final Map<String, AttributeValue> map = new HashMap<>();
+        for (int i = 0; i < keyAndValues.length; i = i + 2) {
+            final String key = (String) keyAndValues[i];
+            final Object value = keyAndValues[i + 1];
+            map.put(key, DynamoDbUtils.toAttributeValue(value));
+        }
+        return map;
+    }
+}
index 83c5f4e..4f3cbfe 100644 (file)
 #hadoop.hdfs.port=9000
 #hadoop.hdfs.file.path=/apache_metamodel_testfile.txt
 
+# ----------------------------
+# Dynamo DB module properties:
+# ----------------------------
+
+#dynamodb.accessKey=
+#dynamodb.secretKey=
+#dynamodb.region=us-west-2
 
 # ------------------------
 # HBase module properties:
diff --git a/pom.xml b/pom.xml
index 5170658..72d3672 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,7 @@ under the License.
                <module>core</module>
                <module>pojo</module>
                <module>fixedwidth</module>
+               <module>dynamodb</module>
                <module>excel</module>
                <module>csv</module>
                <module>json</module>