METAMODEL-1082: Fixed
authorKasper Sørensen <i.am.kasper.sorensen@gmail.com>
Fri, 13 May 2016 15:48:23 +0000 (08:48 -0700)
committerKasper Sørensen <i.am.kasper.sorensen@gmail.com>
Fri, 13 May 2016 15:48:23 +0000 (08:48 -0700)
Closes #98

12 files changed:
CHANGES.md
elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
elasticsearch/rest/pom.xml
elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java

index 25a7ed4..66d1917 100644 (file)
@@ -7,6 +7,7 @@
  * [METAMODEL-247] - Added FixedWidthConfigurationReader for reading fixed width file metadata from external files.
  * [METAMODEL-159] - DataContextFactory misses methods to create HBase and POJO data contexts.
  * [METAMODEL-252] - Fixed a bug that caused JDBC updates to unnecessarily refresh schema objects.
+ * [METAMODEL-1082] - Improved performance of batch ElasticSearch operations by using bulk API.
 
 ### Apache MetaModel 4.5.2
 
index 11d35bd..b298d11 100644 (file)
@@ -21,7 +21,10 @@ package org.apache.metamodel.elasticsearch.common;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.metamodel.query.FilterItem;
 import org.apache.metamodel.query.LogicalOperator;
@@ -40,29 +43,34 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ElasticSearchUtils {
-    public static final String FIELD_ID = "_id";
+
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class);
 
+    public static final String FIELD_ID = "_id";
+    public static final String SYSTEM_PROPERTY_STRIP_INVALID_FIELD_CHARS = "metamodel.elasticsearch.strip_invalid_field_chars";
+
     /**
      * Gets a "filter" query which is both 1.x and 2.x compatible.
      */
     private static QueryBuilder getFilteredQuery(String prefix, String fieldName) {
-        // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(fieldName));
-        // 2.x: itemQueryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName));
+        // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null,
+        // FilterBuilders.missingFilter(fieldName));
+        // 2.x: itemQueryBuilder =
+        // QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName));
         try {
             try {
                 Method method = QueryBuilders.class.getDeclaredMethod(prefix + "Query", String.class);
                 method.setAccessible(true);
                 return QueryBuilders.boolQuery().must((QueryBuilder) method.invoke(null, fieldName));
             } catch (NoSuchMethodException e) {
-                Class<?> clazz = ElasticSearchUtils.class.getClassLoader()
-                        .loadClass("org.elasticsearch.index.query.FilterBuilders");
+                Class<?> clazz = ElasticSearchUtils.class.getClassLoader().loadClass(
+                        "org.elasticsearch.index.query.FilterBuilders");
                 Method filterBuilderMethod = clazz.getDeclaredMethod(prefix + "Filter", String.class);
                 filterBuilderMethod.setAccessible(true);
-                Method queryBuildersFilteredQueryMethod =
-                        QueryBuilders.class.getDeclaredMethod("filteredQuery", QueryBuilder.class, FilterBuilder.class);
-                return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null,
-                        filterBuilderMethod.invoke(null, fieldName));
+                Method queryBuildersFilteredQueryMethod = QueryBuilders.class.getDeclaredMethod("filteredQuery",
+                        QueryBuilder.class, FilterBuilder.class);
+                return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null, filterBuilderMethod.invoke(
+                        null, fieldName));
             }
         } catch (Exception e) {
             logger.error("Failed to resolve/invoke filtering method", e);
@@ -78,34 +86,63 @@ public class ElasticSearchUtils {
         return getFilteredQuery("exists", fieldName);
     }
 
-    public static List<Object> getSourceProperties(final MutableTable table) {
+    public static Map<String, ?> getMappingSource(final MutableTable table) {
         if (table.getColumnByName(FIELD_ID) == null) {
-            final MutableColumn idColumn = new MutableColumn(FIELD_ID, ColumnType.STRING)
-                    .setTable(table).setPrimaryKey(true);
+            final MutableColumn idColumn = new MutableColumn(FIELD_ID, ColumnType.STRING).setTable(table).setPrimaryKey(
+                    true);
             table.addColumn(0, idColumn);
         }
 
-        final List<Object> sourceProperties = new ArrayList<>();
-
+        final Map<String, Map<String, String>> propertiesMap = new LinkedHashMap<>();
+        
         for (Column column : table.getColumns()) {
-            // each column is defined as a property pair of the form: ("field1",
-            // "type=string,store=true")
             final String columnName = column.getName();
             if (FIELD_ID.equals(columnName)) {
                 // do nothing - the ID is a client-side construct
                 continue;
             }
-            sourceProperties.add(columnName);
+            
+            final String fieldName = getValidatedFieldName(columnName);
+            final Map<String, String> propertyMap = new HashMap<>();
+            final String type = getType(column);
+            propertyMap.put("type", type);
+            
+            propertiesMap.put(fieldName, propertyMap);
+        }
 
-            String type = getType(column);
-            if (type == null) {
-                sourceProperties.add("store=true");
+        HashMap<String, Map<String, Map<String, String>>> docTypeMap = new HashMap<>();
+        docTypeMap.put("properties", propertiesMap);
+        
+        final Map<String, Map<String, Map<String, Map<String, String>>>> mapping = new HashMap<>();
+        mapping.put(table.getName(), docTypeMap);
+        return mapping;
+    }
+
+    /**
+     * Field name special characters are:
+     * 
+     * . (used for navigation between name components)
+     * 
+     * # (for delimiting name components in _uid, should work, but is
+     * discouraged)
+     * 
+     * * (for matching names)
+     * 
+     * @param fieldName
+     * @return
+     */
+    public static String getValidatedFieldName(String fieldName) {
+        if (fieldName == null || fieldName.isEmpty()) {
+            throw new IllegalArgumentException("Field name cannot be null or empty");
+        }
+        if (fieldName.contains(".") || fieldName.contains("#") || fieldName.contains("*")) {
+            if ("true".equalsIgnoreCase(System.getProperty(SYSTEM_PROPERTY_STRIP_INVALID_FIELD_CHARS, "true"))) {
+                fieldName = fieldName.replace('.', '_').replace('#', '_').replace('*', '_');
             } else {
-                sourceProperties.add("type=" + type + ",store=true");
+                throw new IllegalArgumentException("Field name '" + fieldName + "' contains illegal character (.#*)");
             }
         }
-
-        return sourceProperties;
+        return fieldName;
     }
 
     /**
@@ -154,8 +191,8 @@ public class ElasticSearchUtils {
             return "object";
         }
 
-        throw new UnsupportedOperationException("Unsupported column type '" + type.getName() + "' of column '"
-                + column.getName() + "' - cannot translate to an ElasticSearch type.");
+        throw new UnsupportedOperationException("Unsupported column type '" + type.getName() + "' of column '" + column
+                .getName() + "' - cannot translate to an ElasticSearch type.");
     }
 
     /**
@@ -204,8 +241,8 @@ public class ElasticSearchUtils {
                     if (operand == null) {
                         itemQueryBuilder = getExistsQuery(fieldName);
                     } else {
-                        itemQueryBuilder = QueryBuilders.boolQuery().mustNot(
-                                QueryBuilders.termQuery(fieldName, operand));
+                        itemQueryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(fieldName,
+                                operand));
                     }
                 } else if (OperatorType.IN.equals(operator)) {
                     final List<?> operands = CollectionUtils.toList(operand);
index 5d6701c..f27e8ac 100644 (file)
@@ -18,7 +18,7 @@
  */
 package org.apache.metamodel.elasticsearch.nativeclient;
 
-import java.util.List;
+import java.util.Map;
 
 import org.apache.metamodel.MetaModelException;
 import org.apache.metamodel.create.AbstractTableCreationBuilder;
@@ -44,15 +44,15 @@ final class ElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder
     @Override
     public Table execute() throws MetaModelException {
         final MutableTable table = getTable();
-        final List<Object> sourceProperties = ElasticSearchUtils.getSourceProperties(table);
+        final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table);
 
         final ElasticSearchDataContext dataContext = getUpdateCallback().getDataContext();
         final IndicesAdminClient indicesAdmin = dataContext.getElasticSearchClient().admin().indices();
         final String indexName = dataContext.getIndexName();
 
-        final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin)
-                .setIndices(indexName).setType(table.getName());
-        requestBuilder.setSource(sourceProperties.toArray());
+        final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName)
+                .setType(table.getName());
+        requestBuilder.setSource(source);
         final PutMappingResponse result = requestBuilder.execute().actionGet();
 
         logger.debug("PutMapping response: acknowledged={}", result.isAcknowledged());
index 1efb0c8..822ef1b 100644 (file)
@@ -41,7 +41,8 @@ final class NativeElasticSearchUtils {
             final Column column = selectItem.getColumn();
 
             assert column != null;
-            assert selectItem.getFunction() == null;
+            assert selectItem.getAggregateFunction() == null;
+            assert selectItem.getScalarFunction() == null;
 
             if (column.isPrimaryKey()) {
                 values[i] = documentId;
index 16c0555..92f7393 100644 (file)
@@ -27,7 +27,7 @@ under the License.
        <modelVersion>4.0.0</modelVersion>
 
        <properties>
-               <jest.version>0.1.7</jest.version>
+               <jest.version>2.0.2</jest.version>
                <elasticsearch.version>1.4.4</elasticsearch.version>
        </properties>
 
index 6b8ac51..c452d7b 100644 (file)
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.metamodel.BatchUpdateScript;
 import org.apache.metamodel.DataContext;
 import org.apache.metamodel.MetaModelException;
 import org.apache.metamodel.QueryPostprocessDataContext;
@@ -354,7 +355,8 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
 
     @Override
     public void executeUpdate(UpdateScript update) {
-        final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this);
+        final boolean isBatch = update instanceof BatchUpdateScript;
+        final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this, isBatch);
         update.run(callback);
         callback.onExecuteUpdateFinished();
     }
index cc26c8d..3e71c4d 100644 (file)
  */
 package org.apache.metamodel.elasticsearch.rest;
 
-import io.searchbox.indices.mapping.PutMapping;
+import java.util.Map;
+
 import org.apache.metamodel.MetaModelException;
 import org.apache.metamodel.create.AbstractTableCreationBuilder;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.schema.*;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
 
-import java.util.List;
+import io.searchbox.indices.mapping.PutMapping;
 
 final class JestElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder<JestElasticSearchUpdateCallback> {
-    public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback updateCallback, Schema schema, String name) {
+    
+    public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback updateCallback, Schema schema,
+            String name) {
         super(updateCallback, schema, name);
     }
 
     @Override
     public Table execute() throws MetaModelException {
         final MutableTable table = getTable();
-        final List<Object> sourceProperties = ElasticSearchUtils.getSourceProperties(table);
+        final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table);
 
         final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext();
         final String indexName = dataContext.getIndexName();
 
-        final PutMapping putMapping = new PutMapping.Builder(indexName, table.getName(), sourceProperties).build();
-        JestClientExecutor.execute(dataContext.getElasticSearchClient(), putMapping);
+        final PutMapping putMapping = new PutMapping.Builder(indexName, table.getName(), source).build();
+        getUpdateCallback().execute(putMapping);
 
         final MutableSchema schema = (MutableSchema) getSchema();
         schema.addTable(table);
index 9678b48..7f485ba 100644 (file)
@@ -106,7 +106,7 @@ final class JestElasticSearchDataSet extends AbstractDataSet {
         }
 
         // try to scroll to the next set of hits
-        SearchScroll scroll = new SearchScroll.Builder(scrollId.getAsString(), ElasticSearchRestDataContext.TIMEOUT_SCROLL).build();
+        final SearchScroll scroll = new SearchScroll.Builder(scrollId.getAsString(), ElasticSearchRestDataContext.TIMEOUT_SCROLL).build();
 
         _searchResponse = JestClientExecutor.execute(_client, scroll);
 
index a4c0c03..cc1c3e7 100644 (file)
@@ -71,6 +71,6 @@ final class JestElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
                 new DeleteByQuery.Builder(searchSourceBuilder.toString()).addIndex(indexName).addType(
                         documentType).build();
 
-        JestClientExecutor.execute(dataContext.getElasticSearchClient(), deleteByQuery);
+        _updateCallback.execute(deleteByQuery);
     }
 }
index d4ddd19..8a1ac71 100644 (file)
@@ -18,7 +18,6 @@
  */
 package org.apache.metamodel.elasticsearch.rest;
 
-import io.searchbox.indices.mapping.DeleteMapping;
 import org.apache.metamodel.MetaModelException;
 import org.apache.metamodel.drop.AbstractTableDropBuilder;
 import org.apache.metamodel.drop.TableDropBuilder;
@@ -27,6 +26,8 @@ import org.apache.metamodel.schema.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.searchbox.indices.mapping.DeleteMapping;
+
 /**
  * {@link TableDropBuilder} for dropping tables (document types) in an
  * ElasticSearch index.
@@ -52,7 +53,7 @@ final class JestElasticSearchDropTableBuilder extends AbstractTableDropBuilder {
 
         final DeleteMapping deleteIndex = new DeleteMapping.Builder(dataContext.getIndexName(), documentType).build();
 
-        JestClientExecutor.execute(dataContext.getElasticSearchClient(), deleteIndex);
+        _updateCallback.execute(deleteIndex);
 
         final MutableSchema schema = (MutableSchema) table.getSchema();
         schema.removeTable(table);
index 327d7d3..746538d 100644 (file)
  */
 package org.apache.metamodel.elasticsearch.rest;
 
-import io.searchbox.core.DocumentResult;
-import io.searchbox.core.Index;
-import io.searchbox.params.Parameters;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
 import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
 import org.apache.metamodel.schema.Column;
 import org.apache.metamodel.schema.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
+import io.searchbox.core.Index;
+import io.searchbox.params.Parameters;
 
 final class JestElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<JestElasticSearchUpdateCallback> {
 
-    private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchInsertBuilder.class);
-
     public JestElasticSearchInsertBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) {
         super(updateCallback, table);
     }
 
     @Override
     public void execute() throws MetaModelException {
-        final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext();
+        final JestElasticSearchUpdateCallback updateCallback = getUpdateCallback();
+        final ElasticSearchRestDataContext dataContext = updateCallback.getDataContext();
         final String indexName = dataContext.getIndexName();
         final String documentType = getTable().getName();
 
-
         final Map<String, Object> source = new HashMap<>();
         final Column[] columns = getColumns();
         final Object[] values = getValues();
         String id = null;
         for (int i = 0; i < columns.length; i++) {
             if (isSet(columns[i])) {
-                final String name = columns[i].getName();
+                final String columnName = columns[i].getName();
+
                 final Object value = values[i];
-                if (ElasticSearchRestDataContext.FIELD_ID.equals(name)) {
+                if (ElasticSearchRestDataContext.FIELD_ID.equals(columnName)) {
                     if (value != null) {
                         id = value.toString();
                     }
                 } else {
-                    source.put(name, value);
+                    final String fieldName = ElasticSearchUtils.getValidatedFieldName(columnName);
+                    source.put(fieldName, value);
                 }
             }
         }
 
         assert !source.isEmpty();
 
-        Index index = new Index.Builder(source).index(indexName).type(documentType).id(id).setParameter(
+        final Index index = new Index.Builder(source).index(indexName).type(documentType).id(id).setParameter(
                 Parameters.OP_TYPE, "create").build();
 
-        final DocumentResult result = JestClientExecutor.execute(dataContext.getElasticSearchClient(), index);
-
-        logger.debug("Inserted document: id={}", result.getId());
+        getUpdateCallback().execute(index);
     }
 
 }
index ca2ed13..521955d 100644 (file)
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.indices.Refresh;
-import org.apache.metamodel.AbstractUpdateCallback;
-import org.apache.metamodel.UpdateCallback;
-import org.apache.metamodel.create.TableCreationBuilder;
-import org.apache.metamodel.delete.RowDeletionBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.insert.RowInsertionBuilder;
-import org.apache.metamodel.schema.Schema;
-import org.apache.metamodel.schema.Table;
-
-/**
- * {@link UpdateCallback} implementation for {@link ElasticSearchRestDataContext}.
- */
-final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback {
-    public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext dataContext) {
-        super(dataContext);
-    }
-
-    @Override
-    public ElasticSearchRestDataContext getDataContext() {
-        return (ElasticSearchRestDataContext) super.getDataContext();
-    }
-
-    @Override
-    public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,
-            IllegalStateException {
-        return new JestElasticSearchCreateTableBuilder(this, schema, name);
-    }
-
-    @Override
-    public boolean isDropTableSupported() {
-        return true;
-    }
-
-    @Override
-    public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchDropTableBuilder(this, table);
-    }
-
-    @Override
-    public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchInsertBuilder(this, table);
-    }
-
-    @Override
-    public boolean isDeleteSupported() {
-        return true;
-    }
-
-    @Override
-    public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchDeleteBuilder(this, table);
-    }
-
-    public void onExecuteUpdateFinished() {
-        final String indexName = getDataContext().getIndexName();
-        Refresh refresh = new Refresh.Builder().addIndex(indexName).build();
-
-        JestClientExecutor.execute(getDataContext().getElasticSearchClient(), refresh, false);
-    }
-}
+/**\r
+ * Licensed to the Apache Software Foundation (ASF) under one\r
+ * or more contributor license agreements.  See the NOTICE file\r
+ * distributed with this work for additional information\r
+ * regarding copyright ownership.  The ASF licenses this file\r
+ * to you under the Apache License, Version 2.0 (the\r
+ * "License"); you may not use this file except in compliance\r
+ * with the License.  You may obtain a copy of the License at\r
+ *\r
+ *   http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing,\r
+ * software distributed under the License is distributed on an\r
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\r
+ * KIND, either express or implied.  See the License for the\r
+ * specific language governing permissions and limitations\r
+ * under the License.\r
+ */\r
+package org.apache.metamodel.elasticsearch.rest;\r
+\r
+import java.util.List;\r
+\r
+import org.apache.metamodel.AbstractUpdateCallback;\r
+import org.apache.metamodel.MetaModelException;\r
+import org.apache.metamodel.UpdateCallback;\r
+import org.apache.metamodel.create.TableCreationBuilder;\r
+import org.apache.metamodel.delete.RowDeletionBuilder;\r
+import org.apache.metamodel.drop.TableDropBuilder;\r
+import org.apache.metamodel.insert.RowInsertionBuilder;\r
+import org.apache.metamodel.schema.Schema;\r
+import org.apache.metamodel.schema.Table;\r
+import org.elasticsearch.action.bulk.BulkRequest;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import io.searchbox.action.Action;\r
+import io.searchbox.action.BulkableAction;\r
+import io.searchbox.client.JestResult;\r
+import io.searchbox.core.Bulk;\r
+import io.searchbox.core.Bulk.Builder;\r
+import io.searchbox.core.BulkResult;\r
+import io.searchbox.core.BulkResult.BulkResultItem;\r
+import io.searchbox.indices.Refresh;\r
+\r
+/**\r
+ * {@link UpdateCallback} implementation for\r
+ * {@link ElasticSearchRestDataContext}.\r
+ */\r
+final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback {\r
+\r
+    private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchUpdateCallback.class);\r
+\r
+    private static final int BULK_BUFFER_SIZE = 1000;\r
+\r
+    private Bulk.Builder bulkBuilder;\r
+    private int bulkActionCount = 0;\r
+    private final boolean isBatch;\r
+\r
+    public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext dataContext, boolean isBatch) {\r
+        super(dataContext);\r
+        this.isBatch = isBatch;\r
+    }\r
+\r
+    private boolean isBatch() {\r
+        return isBatch;\r
+    }\r
+\r
+    @Override\r
+    public ElasticSearchRestDataContext getDataContext() {\r
+        return (ElasticSearchRestDataContext) super.getDataContext();\r
+    }\r
+\r
+    @Override\r
+    public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,\r
+            IllegalStateException {\r
+        return new JestElasticSearchCreateTableBuilder(this, schema, name);\r
+    }\r
+\r
+    @Override\r
+    public boolean isDropTableSupported() {\r
+        return true;\r
+    }\r
+\r
+    @Override\r
+    public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,\r
+            UnsupportedOperationException {\r
+        return new JestElasticSearchDropTableBuilder(this, table);\r
+    }\r
+\r
+    @Override\r
+    public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,\r
+            UnsupportedOperationException {\r
+        return new JestElasticSearchInsertBuilder(this, table);\r
+    }\r
+\r
+    @Override\r
+    public boolean isDeleteSupported() {\r
+        return true;\r
+    }\r
+\r
+    @Override\r
+    public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,\r
+            UnsupportedOperationException {\r
+        return new JestElasticSearchDeleteBuilder(this, table);\r
+    }\r
+\r
+    public void onExecuteUpdateFinished() {\r
+        if (isBatch()) {\r
+            flushBulkActions();\r
+        }\r
+\r
+        final String indexName = getDataContext().getIndexName();\r
+        final Refresh refresh = new Refresh.Builder().addIndex(indexName).build();\r
+\r
+        JestClientExecutor.execute(getDataContext().getElasticSearchClient(), refresh, false);\r
+    }\r
+\r
+    private void flushBulkActions() {\r
+        if (bulkBuilder == null || bulkActionCount == 0) {\r
+            // nothing to flush\r
+            return;\r
+        }\r
+        final Bulk bulk = getBulkBuilder().build();\r
+        logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName());\r
+        executeBlocking(bulk);\r
+\r
+        bulkActionCount = 0;\r
+        bulkBuilder = null;\r
+    }\r
+\r
+    public void execute(Action<?> action) {\r
+        if (isBatch() && action instanceof BulkableAction) {\r
+            final Bulk.Builder bulkBuilder = getBulkBuilder();\r
+            bulkBuilder.addAction((BulkableAction<?>) action);\r
+            bulkActionCount++;\r
+            if (bulkActionCount == BULK_BUFFER_SIZE) {\r
+                flushBulkActions();\r
+            }\r
+        } else {\r
+            executeBlocking(action);\r
+        }\r
+    }\r
+\r
+    private void executeBlocking(Action<?> action) {\r
+        final JestResult result = JestClientExecutor.execute(getDataContext().getElasticSearchClient(), action);\r
+        if (!result.isSucceeded()) {\r
+            if (result instanceof BulkResult) {\r
+                final List<BulkResultItem> failedItems = ((BulkResult) result).getFailedItems();\r
+                for (int i = 0; i < failedItems.size(); i++) {\r
+                    final BulkResultItem failedItem = failedItems.get(i);\r
+                    logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i+1, failedItems.size(), failedItem.id, failedItem.operation, failedItem.status, failedItem.error);\r
+                }\r
+            }\r
+            throw new MetaModelException(result.getResponseCode() + " - " + result.getErrorMessage());\r
+        }\r
+    }\r
+\r
+    private Builder getBulkBuilder() {\r
+        if (bulkBuilder == null) {\r
+            bulkBuilder = new Bulk.Builder();\r
+            bulkBuilder.defaultIndex(getDataContext().getIndexName());\r
+        }\r
+        return bulkBuilder;\r
+    }\r
+}\r