IGNITE-6917: Implemented SQL COPY command
authorgg-shq <kshirokov@gridgain.com>
Wed, 7 Feb 2018 11:28:04 +0000 (14:28 +0300)
committerIgor Sapego <isapego@gridgain.com>
Wed, 7 Feb 2018 11:30:39 +0000 (14:30 +0300)
This closes #3419

49 files changed:
modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBulkLoadSelfTest.java [new file with mode: 0644]
modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java
modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java [new file with mode: 0644]
modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedNearSelfTest.java [new file with mode: 0644]
modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedSelfTest.java [new file with mode: 0644]
modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicReplicatedSelfTest.java [new file with mode: 0644]
modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.java [new file with mode: 0644]
modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedSelfTest.java [new file with mode: 0644]
modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalReplicatedSelfTest.java [new file with mode: 0644]
modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java
modules/clients/src/test/resources/bulkload0.csv [new file with mode: 0644]
modules/clients/src/test/resources/bulkload1.csv [new file with mode: 0644]
modules/clients/src/test/resources/bulkload2.csv [new file with mode: 0644]
modules/clients/src/test/resources/bulkload2_utf.csv [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/cache/query/BulkLoadContextCursor.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadAckClientParameters.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCacheWriter.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadFormat.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadParser.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadStreamerWriter.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CharsetDecoderBlock.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/LineSplitterBlock.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/PipelineBlock.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/StrListAppenderBlock.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java [new file with mode: 0644]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdateMode.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
parent/pom.xml

diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBulkLoadSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBulkLoadSelfTest.java
new file mode 100644 (file)
index 0000000..d9506cf
--- /dev/null
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+
+import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/** COPY command test for the regular JDBC driver. */
+public class JdbcBulkLoadSelfTest extends GridCommonAbstractTest {
+    /** JDBC URL. */
+    private static final String BASE_URL = CFG_URL_PREFIX +
+        "cache=default@modules/clients/src/test/config/jdbc-config.xml";
+
+    /** Connection. */
+    protected Connection conn;
+
+    /** The logger. */
+    protected transient IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        return getConfiguration0(gridName);
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @return Grid configuration used for starting the grid.
+     * @throws Exception If failed.
+     */
+    private IgniteConfiguration getConfiguration0(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setIndexedTypes(
+            Integer.class, Person.class
+        );
+
+        cfg.setCacheConfiguration(cache);
+        cfg.setLocalHost("127.0.0.1");
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+        ipFinder.setAddresses(Collections.singleton("127.0.0.1:47500..47501"));
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setConnectorConfiguration(new ConnectorConfiguration());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Establishes the JDBC connection.
+     *
+     * @return Connection to use for the test.
+     * @throws Exception if failed.
+     */
+    private Connection createConnection() throws Exception {
+        Properties props = new Properties();
+
+        return DriverManager.getConnection(BASE_URL, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        U.closeQuiet(conn);
+
+        ignite(0).cache(DEFAULT_CACHE_NAME).clear();
+
+        super.afterTest();
+    }
+
+    /**
+     * This is more a placeholder for implementation of IGNITE-7553.
+     *
+     * @throws Exception if failed.
+     */
+    public void testBulkLoadThrows() throws Exception {
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                conn = createConnection();
+
+                try (Statement stmt = conn.createStatement()) {
+                    stmt.executeUpdate("copy from \"dummy.csv\" into Person" +
+                        " (_key, id, firstName, lastName) format csv");
+
+                    return null;
+                }
+            }
+        }, SQLException.class, "COPY command is currently supported only in thin JDBC driver.");
+    }
+
+    /**
+     * A test class for creating a query entity.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    private static class Person implements Serializable {
+        /** ID. */
+        @QuerySqlField
+        private final int id;
+
+        /** First name. */
+        @QuerySqlField(index = false)
+        private final String firstName;
+
+        /** Last name. */
+        @QuerySqlField(index = false)
+        private final String lastName;
+
+        /** Age. */
+        @QuerySqlField
+        private final int age;
+
+        /**
+         * @param id ID.
+         * @param firstName First name
+         * @param lastName Last name
+         * @param age Age.
+         */
+        private Person(int id, String firstName, String lastName, int age) {
+            assert !F.isEmpty(firstName);
+            assert !F.isEmpty(lastName);
+            assert age > 0;
+
+            this.id = id;
+            this.firstName = firstName;
+            this.lastName = lastName;
+            this.age = age;
+        }
+    }
+}
index 49746b6..2059408 100644 (file)
@@ -107,7 +107,7 @@ public abstract class JdbcErrorsAbstractSelfTest extends GridCommonAbstractTest
      */
     public void testDmlErrors() throws SQLException {
         checkErrorState("INSERT INTO \"test\".INTEGER(_key, _val) values(1, null)", "22004",
-            "Value for INSERT, MERGE, or UPDATE must not be null");
+            "Value for INSERT, COPY, MERGE, or UPDATE must not be null");
 
         checkErrorState("INSERT INTO \"test\".INTEGER(_key, _val) values(1, 'zzz')", "0700B",
             "Value conversion failed [from=java.lang.String, to=java.lang.Integer]");
index ff4d69f..656e218 100644 (file)
@@ -35,6 +35,12 @@ import org.apache.ignite.jdbc.JdbcResultSetSelfTest;
 import org.apache.ignite.jdbc.JdbcStatementSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinAutoCloseServerCursorTest;
 import org.apache.ignite.jdbc.thin.JdbcThinBatchSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadAtomicPartitionedNearSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadAtomicPartitionedSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadAtomicReplicatedSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalPartitionedNearSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalPartitionedSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalReplicatedSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinComplexDmlDdlSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinComplexQuerySelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinConnectionSelfTest;
@@ -154,6 +160,14 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
         suite.addTest(new TestSuite(JdbcThinDynamicIndexTransactionalPartitionedSelfTest.class));
         suite.addTest(new TestSuite(JdbcThinDynamicIndexTransactionalReplicatedSelfTest.class));
 
+        // New thin JDBC driver, DML tests
+        suite.addTest(new TestSuite(JdbcThinBulkLoadAtomicPartitionedNearSelfTest.class));
+        suite.addTest(new TestSuite(JdbcThinBulkLoadAtomicPartitionedSelfTest.class));
+        suite.addTest(new TestSuite(JdbcThinBulkLoadAtomicReplicatedSelfTest.class));
+        suite.addTest(new TestSuite(JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.class));
+        suite.addTest(new TestSuite(JdbcThinBulkLoadTransactionalPartitionedSelfTest.class));
+        suite.addTest(new TestSuite(JdbcThinBulkLoadTransactionalReplicatedSelfTest.class));
+
         // New thin JDBC driver, full SQL tests
         suite.addTest(new TestSuite(JdbcThinComplexDmlDdlSelfTest.class));
 
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java
new file mode 100644 (file)
index 0000000..761f700
--- /dev/null
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import java.sql.BatchUpdateException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
+
+/**
+ * COPY statement tests.
+ */
+public abstract class JdbcThinBulkLoadAbstractSelfTest extends JdbcThinAbstractDmlStatementSelfTest {
+    /** Default table name. */
+    private static final String TBL_NAME = "Person";
+
+    /** JDBC statement. */
+    private Statement stmt;
+
+    /** A CSV file with zero records */
+    private static final String BULKLOAD_EMPTY_CSV_FILE =
+        Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload0.csv"))
+            .getAbsolutePath();
+
+    /** A CSV file with one record. */
+    private static final String BULKLOAD_ONE_LINE_CSV_FILE =
+        Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload1.csv"))
+            .getAbsolutePath();
+
+    /** A CSV file with two records. */
+    private static final String BULKLOAD_TWO_LINES_CSV_FILE =
+        Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload2.csv"))
+            .getAbsolutePath();
+
+    /** A file with UTF records. */
+    private static final String BULKLOAD_UTF_CSV_FILE =
+        Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload2_utf.csv"))
+            .getAbsolutePath();
+
+    /** Basic COPY statement used in majority of the tests. */
+    public static final String BASIC_SQL_COPY_STMT =
+        "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\"" +
+            " into " + TBL_NAME +
+            " (_key, age, firstName, lastName)" +
+            " format csv";
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfig() {
+        return cacheConfigWithIndexedTypes();
+    }
+
+    /**
+     * Creates cache configuration with {@link QueryEntity} created
+     * using {@link CacheConfiguration#setIndexedTypes(Class[])} call.
+     *
+     * @return The cache configuration.
+     */
+    @SuppressWarnings("unchecked")
+    private CacheConfiguration cacheConfigWithIndexedTypes() {
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(cacheMode());
+        cache.setAtomicityMode(atomicityMode());
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+
+        if (cacheMode() == PARTITIONED)
+            cache.setBackups(1);
+
+        if (nearCache())
+            cache.setNearConfiguration(new NearCacheConfiguration());
+
+        cache.setIndexedTypes(
+            String.class, Person.class
+        );
+
+        return cache;
+    }
+
+    /**
+     * Returns true if we are testing near cache.
+     *
+     * @return true if we are testing near cache.
+     */
+    protected abstract boolean nearCache();
+
+    /**
+     * Returns cache atomicity mode we are testing.
+     *
+     * @return The cache atomicity mode we are testing.
+     */
+    protected abstract CacheAtomicityMode atomicityMode();
+
+    /**
+     * Returns cache mode we are testing.
+     *
+     * @return The cache mode we are testing.
+     */
+    protected abstract CacheMode cacheMode();
+
+    /**
+     * Creates cache configuration with {@link QueryEntity} created
+     * using {@link CacheConfiguration#setQueryEntities(Collection)} call.
+     *
+     * @return The cache configuration.
+     */
+    private CacheConfiguration cacheConfigWithQueryEntity() {
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+
+        QueryEntity e = new QueryEntity();
+
+        e.setKeyType(String.class.getName());
+        e.setValueType("Person");
+
+        e.addQueryField("id", Integer.class.getName(), null);
+        e.addQueryField("age", Integer.class.getName(), null);
+        e.addQueryField("firstName", String.class.getName(), null);
+        e.addQueryField("lastName", String.class.getName(), null);
+
+        cache.setQueryEntities(Collections.singletonList(e));
+
+        return cache;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        System.setProperty(IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK, "TRUE");
+
+        stmt = conn.createStatement();
+
+        assertNotNull(stmt);
+        assertFalse(stmt.isClosed());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (stmt != null && !stmt.isClosed())
+            stmt.close();
+
+        assertTrue(stmt.isClosed());
+
+        System.clearProperty(IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK);
+
+        super.afterTest();
+    }
+
+    /**
+     * Dead-on-arrival test. Imports two-entry CSV file into a table and checks
+     * the created entries using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testBasicStatement() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(BASIC_SQL_COPY_STMT);
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Imports two-entry CSV file with UTF-8 characters into a table and checks
+     * the created entries using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testUtf() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_UTF_CSV_FILE + "\" into " + TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv");
+
+        assertEquals(2, updatesCnt);
+
+        checkUtfCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Imports two-entry CSV file with UTF-8 characters into a table using batch size of one byte
+     * (thus splitting each two-byte UTF-8 character into two batches)
+     * and checks the created entries using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testUtfBatchSize_1() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_UTF_CSV_FILE + "\" into " + TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv batch_size 1");
+
+        assertEquals(2, updatesCnt);
+
+        checkUtfCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Imports one-entry CSV file into a table and checks the entry created using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testOneLineFile() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_ONE_LINE_CSV_FILE + "\" into " + TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv");
+
+        assertEquals(1, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 1);
+    }
+
+    /**
+     * Imports zero-entry CSV file into a table and checks that no entries are created
+     * using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testEmptyFile() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_EMPTY_CSV_FILE + "\" into " + TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv");
+
+        assertEquals(0, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 0);
+    }
+
+    /**
+     * Checks that error is reported for a non-existent file.
+     */
+    public void testWrongFileName() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from \"nonexistent\" into Person" +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Failed to read file: 'nonexistent'");
+    }
+
+    /**
+     * Checks that error is reported if the destination table is missing.
+     */
+    public void testMissingTable() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into Peterson" +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Table does not exist: PETERSON");
+    }
+
+    /**
+     * Checks that error is reported when a non-existing column is specified in the SQL command.
+     */
+    public void testWrongColumnName() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into Person" +
+                        " (_key, age, firstName, lostName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Column \"LOSTNAME\" not found");
+    }
+
+    /**
+     * Checks that error is reported if field read from CSV file cannot be converted to the type of the column.
+     */
+    public void testWrongColumnType() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into Person" +
+                        " (_key, firstName, age, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Value conversion failed [from=java.lang.String, to=java.lang.Integer]");
+    }
+
+    /**
+     * Checks that if even a subset of fields is imported, the imported fields are set correctly.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testFieldsSubset() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into " + TBL_NAME +
+                " (_key, age, firstName)" +
+                " format csv");
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(TBL_NAME, false, 2);
+    }
+
+    /**
+     * Checks that bulk load works when we create table using 'CREATE TABLE' command.
+     *
+     * The majority of the tests in this class use {@link CacheConfiguration#setIndexedTypes(Class[])}
+     * to create the table.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testCreateAndBulkLoadTable() throws SQLException {
+        String tblName = QueryUtils.DFLT_SCHEMA + ".\"PersonTbl\"";
+
+        execute(conn, "create table " + tblName +
+            " (id int primary key, age int, firstName varchar(30), lastName varchar(30))");
+
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into " + tblName +
+                "(_key, age, firstName, lastName)" +
+                " format csv");
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(tblName, true, 2);
+    }
+
+    /**
+     * Checks that bulk load works when we create table with {@link CacheConfiguration#setQueryEntities(Collection)}.
+     *
+     * The majority of the tests in this class use {@link CacheConfiguration#setIndexedTypes(Class[])}
+     * to create a table.
+     *
+     * @throws SQLException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testConfigureQueryEntityAndBulkLoad() throws SQLException {
+        ignite(0).getOrCreateCache(cacheConfigWithQueryEntity());
+
+        int updatesCnt = stmt.executeUpdate(BASIC_SQL_COPY_STMT);
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Checks that bulk load works when we use batch size of 1 byte and thus
+     * create multiple batches per COPY.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testBatchSize_1() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(BASIC_SQL_COPY_STMT + " batch_size 1");
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Verifies exception thrown if COPY is added into a batch.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testMultipleStatement() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.addBatch(BASIC_SQL_COPY_STMT);
+
+                stmt.addBatch("copy from \"" + BULKLOAD_ONE_LINE_CSV_FILE + "\" into " + TBL_NAME +
+                    " (_key, age, firstName, lastName)" +
+                    " format csv");
+
+                stmt.addBatch("copy from \"" + BULKLOAD_UTF_CSV_FILE + "\" into " + TBL_NAME +
+                    " (_key, age, firstName, lastName)" +
+                    " format csv");
+
+                stmt.executeBatch();
+
+                return null;
+            }
+        }, BatchUpdateException.class, "COPY command cannot be executed in batch mode.");
+    }
+
+    /**
+     * Verifies that COPY command is rejected by Statement.executeQuery().
+     *
+     * @throws SQLException If failed.
+     */
+    public void testExecuteQuery() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeQuery(BASIC_SQL_COPY_STMT);
+
+                return null;
+            }
+        }, SQLException.class, "The query isn't SELECT query");
+    }
+
+    /**
+     * Verifies that COPY command works in Statement.execute().
+     *
+     * @throws SQLException If failed.
+     */
+    public void testExecute() throws SQLException {
+        boolean isRowSet = stmt.execute(BASIC_SQL_COPY_STMT);
+
+        assertFalse(isRowSet);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Verifies that COPY command can be called with PreparedStatement.executeUpdate().
+     *
+     * @throws SQLException If failed.
+     */
+    public void testPreparedStatementWithExecuteUpdate() throws SQLException {
+        PreparedStatement pstmt = conn.prepareStatement(BASIC_SQL_COPY_STMT);
+
+        int updatesCnt = pstmt.executeUpdate();
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Verifies that COPY command reports an error when used with PreparedStatement parameter.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testPreparedStatementWithParameter() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    PreparedStatement pstmt = conn.prepareStatement(
+                        "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into " + TBL_NAME +
+                            " (_key, age, firstName, lastName)" +
+                            " format ?");
+
+                    pstmt.setString(1, "csv");
+
+                    pstmt.executeUpdate();
+
+                    return null;
+                }
+            }, SQLException.class, "Unexpected token: \"?\" (expected: \"[identifier]\"");
+    }
+
+    /**
+     * Verifies that COPY command can be called with PreparedStatement.execute().
+     *
+     * @throws SQLException If failed.
+     */
+    public void testPreparedStatementWithExecute() throws SQLException {
+        PreparedStatement pstmt = conn.prepareStatement(BASIC_SQL_COPY_STMT);
+
+        boolean isRowSet = pstmt.execute();
+
+        assertFalse(isRowSet);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Verifies that COPY command is rejected by PreparedStatement.executeQuery().
+     */
+    public void testPreparedStatementWithExecuteQuery() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                PreparedStatement pstmt = conn.prepareStatement(BASIC_SQL_COPY_STMT);
+
+                pstmt.executeQuery();
+
+                return null;
+            }
+        }, SQLException.class, "The query isn't SELECT query");
+    }
+
+    /**
+     * Checks cache contents for a typical test using SQL SELECT command.
+     *
+     * @param tblName Table name to query.
+     * @param checkLastName Check 'lastName' column (not imported in some tests).
+     * @param recCnt Number of records to expect.
+     * @throws SQLException When one of checks has failed.
+     */
+    private void checkCacheContents(String tblName, boolean checkLastName, int recCnt) throws SQLException {
+        ResultSet rs = stmt.executeQuery("select _key, age, firstName, lastName from " + tblName);
+
+        assert rs != null;
+
+        int cnt = 0;
+
+        while (rs.next()) {
+            int id = rs.getInt("_key");
+
+            if (id == 123) {
+                assertEquals(12, rs.getInt("age"));
+                assertEquals("FirstName123 MiddleName123", rs.getString("firstName"));
+                if (checkLastName)
+                    assertEquals("LastName123", rs.getString("lastName"));
+            }
+            else if (id == 456) {
+                assertEquals(45, rs.getInt("age"));
+                assertEquals("FirstName456", rs.getString("firstName"));
+                if (checkLastName)
+                    assertEquals("LastName456", rs.getString("lastName"));
+            }
+            else
+                fail("Wrong ID: " + id);
+
+            cnt++;
+        }
+
+        assertEquals(recCnt, cnt);
+    }
+
+    /**
+     * Checks cache contents for a UTF-8 bulk load tests using SQL SELECT command.
+     *
+     * @param tblName Table name to query.
+     * @param checkLastName Check 'lastName' column (not imported in some tests).
+     * @param recCnt Number of records to expect.
+     * @throws SQLException When one of checks has failed.
+     */
+    private void checkUtfCacheContents(String tblName, boolean checkLastName, int recCnt) throws SQLException {
+        ResultSet rs = stmt.executeQuery("select _key, age, firstName, lastName from " + tblName);
+
+        assert rs != null;
+
+        int cnt = 0;
+
+        while (rs.next()) {
+            int id = rs.getInt("_key");
+
+            if (id == 123) {
+                assertEquals(12, rs.getInt("age"));
+                assertEquals("Имя123 Отчество123", rs.getString("firstName"));
+                if (checkLastName)
+                    assertEquals("Фамилия123", rs.getString("lastName"));
+            }
+            else if (id == 456) {
+                assertEquals(45, rs.getInt("age"));
+                assertEquals("Имя456", rs.getString("firstName"));
+                if (checkLastName)
+                    assertEquals("Фамилия456", rs.getString("lastName"));
+            }
+            else
+                fail("Wrong ID: " + id);
+
+            cnt++;
+        }
+
+        assertEquals(recCnt, cnt);
+    }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedNearSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedNearSelfTest.java
new file mode 100644 (file)
index 0000000..887b1d9
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned atomic near-cache mode. */
+public class JdbcThinBulkLoadAtomicPartitionedNearSelfTest extends JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return true;
+    }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedSelfTest.java
new file mode 100644 (file)
index 0000000..5581333
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned atomic mode. */
+public class JdbcThinBulkLoadAtomicPartitionedSelfTest extends JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return false;
+    }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicReplicatedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicReplicatedSelfTest.java
new file mode 100644 (file)
index 0000000..c3d69af
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for replicated atomic near-cache mode. */
+public class JdbcThinBulkLoadAtomicReplicatedSelfTest extends JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return false;
+    }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.java
new file mode 100644 (file)
index 0000000..9336dd1
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned transactional near-cache mode. */
+public class JdbcThinBulkLoadTransactionalPartitionedNearSelfTest extends JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return true;
+    }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedSelfTest.java
new file mode 100644 (file)
index 0000000..d1dea2a
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned transactional mode. */
+public class JdbcThinBulkLoadTransactionalPartitionedSelfTest extends JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return false;
+    }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalReplicatedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalReplicatedSelfTest.java
new file mode 100644 (file)
index 0000000..1c377fa
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for replicated transactional mode. */
+public class JdbcThinBulkLoadTransactionalReplicatedSelfTest extends JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return false;
+    }
+}
index dbe93a4..539713a 100644 (file)
@@ -25,7 +25,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
diff --git a/modules/clients/src/test/resources/bulkload0.csv b/modules/clients/src/test/resources/bulkload0.csv
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/modules/clients/src/test/resources/bulkload1.csv b/modules/clients/src/test/resources/bulkload1.csv
new file mode 100644 (file)
index 0000000..596ac32
--- /dev/null
@@ -0,0 +1 @@
+123,12,"FirstName123 MiddleName123",LastName123
\ No newline at end of file
diff --git a/modules/clients/src/test/resources/bulkload2.csv b/modules/clients/src/test/resources/bulkload2.csv
new file mode 100644 (file)
index 0000000..d398c19
--- /dev/null
@@ -0,0 +1,2 @@
+123,12,"FirstName123 MiddleName123",LastName123
+456,45,"FirstName456","LastName456"
\ No newline at end of file
diff --git a/modules/clients/src/test/resources/bulkload2_utf.csv b/modules/clients/src/test/resources/bulkload2_utf.csv
new file mode 100644 (file)
index 0000000..bdb6489
--- /dev/null
@@ -0,0 +1,2 @@
+123,12,"Имя123 Отчество123",Фамилия123
+456,45,"Имя456","Фамилия456"
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/BulkLoadContextCursor.java b/modules/core/src/main/java/org/apache/ignite/cache/query/BulkLoadContextCursor.java
new file mode 100644 (file)
index 0000000..b7fdec3
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A special FieldsQueryCursor subclass that is used as a sentinel to transfer data from bulk load
+ * (COPY) command to the JDBC or other client-facing driver: the bulk load batch processor
+ * and parameters to send to the client.
+ * */
+public class BulkLoadContextCursor implements FieldsQueryCursor<List<?>> {
+    /** Bulk load context from SQL command. */
+    private final BulkLoadProcessor processor;
+
+    /** Bulk load parameters to send to the client. */
+    private final BulkLoadAckClientParameters clientParams;
+
+    /**
+     * Creates a cursor.
+     *
+     * @param processor Bulk load context object to store.
+     * @param clientParams Parameters to send to client.
+     */
+    public BulkLoadContextCursor(BulkLoadProcessor processor, BulkLoadAckClientParameters clientParams) {
+        this.processor = processor;
+        this.clientParams = clientParams;
+    }
+
+    /**
+     * Returns a bulk load context.
+     *
+     * @return a bulk load context.
+     */
+    public BulkLoadProcessor bulkLoadProcessor() {
+        return processor;
+    }
+
+    /**
+     * Returns the bulk load parameters to send to the client.
+     *
+     * @return The bulk load parameters to send to the client.
+     */
+    public BulkLoadAckClientParameters clientParams() {
+        return clientParams;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<?>> getAll() {
+        return Collections.singletonList(Arrays.asList(processor, clientParams));
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public Iterator<List<?>> iterator() {
+        return getAll().iterator();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // no-op
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getFieldName(int idx) {
+        if (idx < 0 || idx > 1)
+            throw new IndexOutOfBoundsException();
+
+        return idx == 0 ? "processor" : "clientParams";
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getColumnsCount() {
+        return 2;
+    }
+}
index d29df93..2020011 100644 (file)
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.jdbc.thin;
 
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
 import java.sql.BatchUpdateException;
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -25,21 +28,24 @@ import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLWarning;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadAckResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteMultipleStatementsResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult;
-import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultInfo;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
 
 import static java.sql.ResultSet.CONCUR_READ_ONLY;
 import static java.sql.ResultSet.FETCH_FORWARD;
@@ -132,6 +138,9 @@ public class JdbcThinStatement implements Statement {
 
         assert res0 != null;
 
+        if (res0 instanceof JdbcBulkLoadAckResult)
+            res0 = sendFile((JdbcBulkLoadAckResult)res0);
+
         if (res0 instanceof JdbcQueryExecuteResult) {
             JdbcQueryExecuteResult res = (JdbcQueryExecuteResult)res0;
 
@@ -176,6 +185,61 @@ public class JdbcThinStatement implements Statement {
         assert resultSets.size() > 0 : "At least one results set is expected";
     }
 
+    /**
+     * Sends a file to server in batches via multiple {@link JdbcBulkLoadBatchRequest}s.
+     *
+     * @param cmdRes Result of invoking COPY command: contains server-parsed
+     *    bulk load parameters, such as file name and batch size.
+     */
+    private JdbcResult sendFile(JdbcBulkLoadAckResult cmdRes) throws SQLException {
+        String fileName = cmdRes.params().localFileName();
+        int batchSize = cmdRes.params().batchSize();
+
+        int batchNum = 0;
+
+        try {
+            try (InputStream input = new BufferedInputStream(new FileInputStream(fileName))) {
+                byte[] buf = new byte[batchSize];
+
+                int readBytes;
+                while ((readBytes = input.read(buf)) != -1) {
+                    if (readBytes == 0)
+                        continue;
+
+                    JdbcResult res = conn.sendRequest(new JdbcBulkLoadBatchRequest(
+                        cmdRes.queryId(),
+                        batchNum++,
+                        JdbcBulkLoadBatchRequest.CMD_CONTINUE,
+                        readBytes == buf.length ? buf : Arrays.copyOf(buf, readBytes)));
+
+                    if (!(res instanceof JdbcQueryExecuteResult))
+                        throw new SQLException("Unknown response sent by the server: " + res);
+                }
+
+                return conn.sendRequest(new JdbcBulkLoadBatchRequest(
+                    cmdRes.queryId(),
+                    batchNum++,
+                    JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF));
+            }
+        }
+        catch (Exception e) {
+            try {
+                conn.sendRequest(new JdbcBulkLoadBatchRequest(
+                    cmdRes.queryId(),
+                    batchNum,
+                    JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR));
+            }
+            catch (SQLException e1) {
+                throw new SQLException("Cannot send finalization request: " + e1.getMessage(), e);
+            }
+
+            if (e instanceof SQLException)
+                throw (SQLException) e;
+            else
+                throw new SQLException("Failed to read file: '" + fileName + "'", SqlStateCode.INTERNAL_ERROR, e);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public int executeUpdate(String sql) throws SQLException {
         execute0(JdbcStatementType.UPDATE_STMT_TYPE, sql, null);
index aa9f009..07034f4 100644 (file)
@@ -32,6 +32,8 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteJdbcDriver;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.query.BulkLoadContextCursor;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.IgniteKernal;
@@ -168,7 +170,15 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTaskResult> {
             qry.setLazy(lazy());
             qry.setSchema(schemaName);
 
-            QueryCursorImpl<List<?>> qryCursor = (QueryCursorImpl<List<?>>)cache.withKeepBinary().query(qry);
+            FieldsQueryCursor<List<?>> fldQryCursor = cache.withKeepBinary().query(qry);
+
+            if (fldQryCursor instanceof BulkLoadContextCursor) {
+                fldQryCursor.close();
+                
+                throw new SQLException("COPY command is currently supported only in thin JDBC driver.");
+            }
+
+            QueryCursorImpl<List<?>> qryCursor = (QueryCursorImpl<List<?>>)fldQryCursor;
 
             if (isQry == null)
                 isQry = qryCursor.isQuery();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadAckClientParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadAckClientParameters.java
new file mode 100644 (file)
index 0000000..119d9f9
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Bulk load parameters, which are parsed from SQL command and sent from server to client.
+ */
+public class BulkLoadAckClientParameters {
+    /** Minimum batch size. */
+    public static final int MIN_BATCH_SIZE = 1;
+
+    /**
+     * Maximum batch size. Note that the batch is wrapped to transport objects and the overall packet should fit
+     * into a Java array. 512 has been chosen arbitrarily.
+     */
+    public static final int MAX_BATCH_SIZE = Integer.MAX_VALUE - 512;
+
+    /** Size of a file batch for COPY command. */
+    public static final int DEFAULT_BATCH_SIZE = 4 * 1024 * 1024;
+
+    /** Local name of the file to send to server */
+    @NotNull private final String locFileName;
+
+    /** File batch size in bytes. */
+    private final int batchSize;
+
+    /**
+     * Creates a bulk load parameters.
+     *
+     * @param locFileName File name to send from client to server.
+     * @param batchSize Batch size (Number of bytes in a portion of a file to send in one JDBC request/response).
+     */
+    public BulkLoadAckClientParameters(@NotNull String locFileName, int batchSize) {
+        this.locFileName = locFileName;
+        this.batchSize = batchSize;
+    }
+
+    /**
+     * Returns the local name of file to send.
+     *
+     * @return The local name of file to send.
+     */
+    @NotNull public String localFileName() {
+        return locFileName;
+    }
+
+    /**
+     * Returns the batch size.
+     *
+     * @return The batch size.
+     */
+    public int batchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Checks if batch size value is valid.
+     *
+     * @param sz The batch size to check.
+     * @throws IllegalArgumentException if batch size is invalid.
+     */
+    public static boolean isValidBatchSize(int sz) {
+        return sz >= MIN_BATCH_SIZE && sz <= MAX_BATCH_SIZE;
+    }
+
+    /**
+     * Creates proper batch size error message if {@link #isValidBatchSize(int)} check has failed.
+     *
+     * @param sz The batch size.
+     * @return The string with the error message.
+     */
+    public static String batchSizeErrorMsg(int sz) {
+        return "Batch size should be within [" + MIN_BATCH_SIZE + ".." + MAX_BATCH_SIZE + "]: " + sz;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCacheWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCacheWriter.java
new file mode 100644 (file)
index 0000000..90714c8
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/** A proxy, which stores given key+value pair to a cache. */
+public abstract class BulkLoadCacheWriter implements IgniteInClosure<IgniteBiTuple<?, ?>>, AutoCloseable {
+    /**
+     * Returns number of entry updates made by the writer.
+     *
+     * @return The number of cache entry updates.
+     */
+    public abstract long updateCnt();
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java
new file mode 100644 (file)
index 0000000..6f5e91e
--- /dev/null
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.regex.Pattern;
+
+/** A placeholder for bulk load CSV format parser options. */
+public class BulkLoadCsvFormat extends BulkLoadFormat {
+
+    /** Line separator pattern. */
+    @NotNull public static final Pattern DEFAULT_LINE_SEPARATOR = Pattern.compile("[\r\n]+");
+
+    /** Field separator pattern. */
+    @NotNull public static final Pattern DEFAULT_FIELD_SEPARATOR = Pattern.compile(",");
+
+    /** Quote characters */
+    @NotNull public static final String DEFAULT_QUOTE_CHARS = "\"";
+
+    /** Default escape sequence start characters. */
+    @Nullable public static final String DEFAULT_ESCAPE_CHARS = null;
+
+    /** Line comment start pattern. */
+    @Nullable public static final Pattern DEFAULT_COMMENT_CHARS = null;
+
+    /** Format name. */
+    public static final String NAME = "CSV";
+
+    /** Line separator pattern. */
+    @Nullable private Pattern lineSeparator;
+
+    /** Field separator pattern. */
+    @Nullable private Pattern fieldSeparator;
+
+    /** Set of quote characters. */
+    @Nullable private String quoteChars;
+
+    /** Line comment start pattern. */
+    @Nullable private Pattern commentChars;
+
+    /** Set of escape start characters. */
+    @Nullable private String escapeChars;
+
+    /**
+     * Returns the name of the format.
+     *
+     * @return The name of the format.
+     */
+    @Override public String name() {
+        return NAME;
+    }
+
+    /**
+     * Returns the line separator pattern.
+     *
+     * @return The line separator pattern.
+     */
+    @Nullable public Pattern lineSeparator() {
+        return lineSeparator;
+    }
+
+    /**
+     * Sets the line separator pattern.
+     *
+     * @param lineSeparator The line separator pattern.
+     */
+    public void lineSeparator(@Nullable Pattern lineSeparator) {
+        this.lineSeparator = lineSeparator;
+    }
+
+    /**
+     * Returns the field separator pattern.
+     *
+     * @return The field separator pattern.
+     */
+    @Nullable public Pattern fieldSeparator() {
+        return fieldSeparator;
+    }
+
+    /**
+     * Sets the field separator pattern.
+     *
+     * @param fieldSeparator The field separator pattern.
+     */
+    public void fieldSeparator(@Nullable Pattern fieldSeparator) {
+        this.fieldSeparator = fieldSeparator;
+    }
+
+    /**
+     * Returns the quote characters.
+     *
+     * @return The quote characters.
+     */
+    @Nullable public String quoteChars() {
+        return quoteChars;
+    }
+
+    /**
+     * Sets the quote characters.
+     *
+     * @param quoteChars The quote characters.
+     */
+    public void quoteChars(@Nullable String quoteChars) {
+        this.quoteChars = quoteChars;
+    }
+
+    /**
+     * Returns the line comment start pattern.
+     *
+     * @return The line comment start pattern.
+     */
+    @Nullable public Pattern commentChars() {
+        return commentChars;
+    }
+
+    /**
+     * Sets the line comment start pattern.
+     *
+     * @param commentChars The line comment start pattern.
+     */
+    public void commentChars(@Nullable Pattern commentChars) {
+        this.commentChars = commentChars;
+    }
+
+    /**
+     * Returns the escape characters.
+     *
+     * @return The escape characters.
+     */
+    @Nullable public String escapeChars() {
+        return escapeChars;
+    }
+
+    /**
+     * Sets the escape characters.
+     *
+     * @param escapeChars The escape characters.
+     */
+    public void escapeChars(@Nullable String escapeChars) {
+        this.escapeChars = escapeChars;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java
new file mode 100644 (file)
index 0000000..0511596
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.bulkload.pipeline.CharsetDecoderBlock;
+import org.apache.ignite.internal.processors.bulkload.pipeline.CsvLineProcessorBlock;
+import org.apache.ignite.internal.processors.bulkload.pipeline.PipelineBlock;
+import org.apache.ignite.internal.processors.bulkload.pipeline.StrListAppenderBlock;
+import org.apache.ignite.internal.processors.bulkload.pipeline.LineSplitterBlock;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/** CSV parser for COPY command. */
+public class BulkLoadCsvParser extends BulkLoadParser {
+    /** Processing pipeline input block: a decoder for the input stream of bytes */
+    private final PipelineBlock<byte[], char[]> inputBlock;
+
+    /** A record collecting block that appends its input to {@code List<String>}. */
+    private final StrListAppenderBlock collectorBlock;
+
+    /**
+     * Creates bulk load CSV parser.
+     *
+     *  @param format Format options (parsed from COPY command).
+     */
+    public BulkLoadCsvParser(BulkLoadCsvFormat format) {
+        inputBlock = new CharsetDecoderBlock(BulkLoadFormat.DEFAULT_INPUT_CHARSET);
+
+        collectorBlock = new StrListAppenderBlock();
+
+        // Handling of the other options is to be implemented in IGNITE-7537.
+        inputBlock.append(new LineSplitterBlock(format.lineSeparator()))
+               .append(new CsvLineProcessorBlock(format.fieldSeparator(), format.quoteChars()))
+               .append(collectorBlock);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Iterable<List<Object>> parseBatch(byte[] batchData, boolean isLastBatch)
+        throws IgniteCheckedException {
+        List<List<Object>> res = new LinkedList<>();
+
+        collectorBlock.output(res);
+
+        inputBlock.accept(batchData, isLastBatch);
+
+        return res;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadFormat.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadFormat.java
new file mode 100644 (file)
index 0000000..cff93c5
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import java.nio.charset.Charset;
+
+/** A superclass and a factory for bulk load format options. */
+public abstract class BulkLoadFormat {
+    /** The default input charset. */
+    public static final Charset DEFAULT_INPUT_CHARSET = Charset.forName("UTF-8");
+
+    /**
+     * Returns the format name.
+     *
+     * @return The format name.
+     */
+    public abstract String name();
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadParser.java
new file mode 100644 (file)
index 0000000..252e87b
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.apache.ignite.IgniteCheckedException;
+
+import java.util.List;
+
+/**
+ * Bulk load file format parser superclass + factory of known formats.
+ *
+ * <p>The parser processes a batch of input data and return a list of records.
+ *
+ * <p>The parser uses corresponding options from {@link BulkLoadFormat} subclass.
+ */
+public abstract class BulkLoadParser {
+    /**
+     * Parses a batch of input data and returns a list of records parsed
+     * (in most cases this is a list of strings).
+     *
+     * <p>Note that conversion between parsed and database table type is done by the other
+     * object (see {@link BulkLoadProcessor#dataConverter}) by the request processing code.
+     * This method is not obliged to do this conversion.
+     *
+     * @param batchData Data from the current batch.
+     * @param isLastBatch true if this is the last batch.
+     * @return The list of records.
+     * @throws IgniteCheckedException If any processing error occurs.
+     */
+    protected abstract Iterable<List<Object>> parseBatch(byte[] batchData, boolean isLastBatch)
+        throws IgniteCheckedException;
+
+    /**
+     * Creates a parser for a given format options.
+     *
+     * @param format The input format object.
+     * @return The parser.
+     * @throws IllegalArgumentException if the format is not known to the factory.
+     */
+    public static BulkLoadParser createParser(BulkLoadFormat format) {
+        if (format instanceof BulkLoadCsvFormat)
+            return new BulkLoadCsvParser((BulkLoadCsvFormat)format);
+
+        throw new IllegalArgumentException("Internal error: format is not defined");
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
new file mode 100644 (file)
index 0000000..ccf3e25
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import java.util.List;
+
+/**
+ * Bulk load (COPY) command processor used on server to keep various context data and process portions of input
+ * received from the client side.
+ */
+public class BulkLoadProcessor implements AutoCloseable {
+    /** Parser of the input bytes. */
+    private final BulkLoadParser inputParser;
+
+    /**
+     * Converter, which transforms the list of strings parsed from the input stream to the key+value entry to add to
+     * the cache.
+     */
+    private final IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter;
+
+    /** Streamer that puts actual key/value into the cache. */
+    private final BulkLoadCacheWriter outputStreamer;
+
+    /** Becomes true after {@link #close()} method is called. */
+    private boolean isClosed;
+
+    /**
+     * Creates bulk load processor.
+     *
+     * @param inputParser Parser of the input bytes.
+     * @param dataConverter Converter, which transforms the list of strings parsed from the input stream to the
+     *     key+value entry to add to the cache.
+     * @param outputStreamer Streamer that puts actual key/value into the cache.
+     */
+    public BulkLoadProcessor(BulkLoadParser inputParser, IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter,
+        BulkLoadCacheWriter outputStreamer) {
+        this.inputParser = inputParser;
+        this.dataConverter = dataConverter;
+        this.outputStreamer = outputStreamer;
+        isClosed = false;
+    }
+
+    /**
+     * Returns the streamer that puts actual key/value into the cache.
+     *
+     * @return Streamer that puts actual key/value into the cache.
+     */
+    public BulkLoadCacheWriter outputStreamer() {
+        return outputStreamer;
+    }
+
+    /**
+     * Processes the incoming batch and writes data to the cache by calling the data converter and output streamer.
+     *
+     * @param batchData Data from the current batch.
+     * @param isLastBatch true if this is the last batch.
+     * @throws IgniteIllegalStateException when called after {@link #close()}.
+     */
+    public void processBatch(byte[] batchData, boolean isLastBatch) throws IgniteCheckedException {
+        if (isClosed)
+            throw new IgniteIllegalStateException("Attempt to process a batch on a closed BulkLoadProcessor");
+
+        Iterable<List<Object>> inputRecords = inputParser.parseBatch(batchData, isLastBatch);
+
+        for (List<Object> record : inputRecords) {
+            IgniteBiTuple<?, ?> kv = dataConverter.apply(record);
+
+            outputStreamer.apply(kv);
+        }
+    }
+
+    /**
+     * Aborts processing and closes the underlying objects ({@link IgniteDataStreamer}).
+     */
+    @Override public void close() throws Exception {
+        if (isClosed)
+            return;
+
+        isClosed = true;
+
+        outputStreamer.close();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadStreamerWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadStreamerWriter.java
new file mode 100644 (file)
index 0000000..3e5efd9
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * A bulk load cache writer object that adds entries using {@link IgniteDataStreamer}.
+ */
+public class BulkLoadStreamerWriter extends BulkLoadCacheWriter {
+    /** Serialization version UID. */
+    private static final long serialVersionUID = 0L;
+
+    /** The streamer. */
+    private final IgniteDataStreamer<Object, Object> streamer;
+
+    /**
+     * A number of {@link IgniteDataStreamer#addData(Object, Object)} calls made,
+     * since we don't have any kind of result data back from the streamer.
+     */
+    private long updateCnt;
+
+    /**
+     * Creates a cache writer.
+     *
+     * @param streamer The streamer to use.
+     */
+    public BulkLoadStreamerWriter(IgniteDataStreamer<Object, Object> streamer) {
+        this.streamer = streamer;
+        updateCnt = 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteBiTuple<?, ?> entry) {
+        streamer.addData(entry.getKey(), entry.getValue());
+
+        updateCnt++;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        streamer.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long updateCnt() {
+        return updateCnt;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CharsetDecoderBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CharsetDecoderBlock.java
new file mode 100644 (file)
index 0000000..5b18def
--- /dev/null
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload.pipeline;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteIllegalStateException;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+import java.util.Arrays;
+
+/**
+ * A {@link PipelineBlock}, which converts stream of bytes supplied as byte[] arrays to an array of char[] using
+ * the specified encoding. Decoding errors (malformed input and unmappable characters) are to handled by dropping
+ * the erroneous input, appending the coder's replacement value to the output buffer, and resuming the coding operation.
+ */
+public class CharsetDecoderBlock extends PipelineBlock<byte[], char[]> {
+    /** Charset decoder */
+    private final CharsetDecoder charsetDecoder;
+
+    /** Leftover bytes (partial characters) from the last batch,
+     * or null if everything was processed. */
+    private byte[] leftover;
+
+    /** True once we've reached the end of input. */
+    private boolean isEndOfInput;
+
+    /**
+     * Creates charset decoder block.
+     *
+     * @param charset The charset encoding to decode bytes from.
+     */
+    public CharsetDecoderBlock(Charset charset) {
+        charsetDecoder = charset.newDecoder()
+            .onMalformedInput(CodingErrorAction.REPLACE)
+            .onUnmappableCharacter(CodingErrorAction.REPLACE);
+
+        isEndOfInput = false;
+        leftover = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void accept(byte[] data, boolean isLastAppend) throws IgniteCheckedException {
+        assert nextBlock != null;
+
+        assert !isEndOfInput : "convertBytes() called after end of input";
+
+        isEndOfInput = isLastAppend;
+
+        if (leftover == null && data.length == 0) {
+            nextBlock.accept(new char[0], isLastAppend);
+            return;
+        }
+
+        ByteBuffer dataBuf;
+
+        if (leftover == null)
+            dataBuf = ByteBuffer.wrap(data);
+        else {
+            dataBuf = ByteBuffer.allocate(leftover.length + data.length);
+
+            dataBuf.put(leftover)
+                   .put(data);
+
+            dataBuf.flip();
+
+            leftover = null;
+        }
+
+        int outBufLen = (int)Math.ceil(charsetDecoder.maxCharsPerByte() * (data.length + 1));
+
+        assert outBufLen > 0;
+
+        CharBuffer outBuf = CharBuffer.allocate(outBufLen);
+
+        for (;;) {
+            CoderResult res = charsetDecoder.decode(dataBuf, outBuf, isEndOfInput);
+
+            if (res.isUnderflow()) {
+                // End of input buffer reached. Either skip the partial character at the end or wait for the next batch.
+                if (!isEndOfInput && dataBuf.remaining() > 0)
+                    leftover = Arrays.copyOfRange(dataBuf.array(),
+                        dataBuf.arrayOffset() + dataBuf.position(), dataBuf.limit());
+
+                if (isEndOfInput)
+                    charsetDecoder.flush(outBuf); // See {@link CharsetDecoder} class javadoc for the protocol.
+
+                if (outBuf.position() > 0)
+                    nextBlock.accept(Arrays.copyOfRange(outBuf.array(), outBuf.arrayOffset(), outBuf.position()),
+                        isEndOfInput);
+
+                break;
+            }
+
+            if (res.isOverflow()) { // Not enough space in the output buffer, flush it and retry.
+                assert outBuf.position() > 0;
+
+                nextBlock.accept(Arrays.copyOfRange(outBuf.array(), outBuf.arrayOffset(), outBuf.position()),
+                    isEndOfInput);
+
+                outBuf.flip();
+
+                continue;
+            }
+
+            assert ! res.isMalformed() && ! res.isUnmappable();
+
+            // We're not supposed to reach this point with the current implementation.
+            // The code below will fire exception if Oracle implementation of CharsetDecoder will be changed in future.
+            throw new IgniteIllegalStateException("Unknown CharsetDecoder state");
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java
new file mode 100644 (file)
index 0000000..5b2ee4b
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload.pipeline;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.regex.Pattern;
+
+/**
+ * A {@link PipelineBlock}, which splits line according to CSV format rules and unquotes fields.
+ * The next block {@link PipelineBlock#accept(Object, boolean)} is called per-line.
+ */
+public class CsvLineProcessorBlock extends PipelineBlock<String, String[]> {
+    /** Field delimiter pattern. */
+    private final Pattern fldDelim;
+
+    /** Quote character. */
+    private final String quoteChars;
+
+    /**
+     * Creates a CSV line parser.
+     *
+     * @param fldDelim The pattern for the field delimiter.
+     * @param quoteChars Quoting character.
+     */
+    public CsvLineProcessorBlock(Pattern fldDelim, String quoteChars) {
+        this.fldDelim = fldDelim;
+        this.quoteChars = quoteChars;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void accept(String input, boolean isLastPortion) throws IgniteCheckedException {
+        // Currently we don't process quoted field delimiter properly, will be fixed in IGNITE-7537.
+        String[] fields = fldDelim.split(input);
+
+        for (int i = 0; i < fields.length; i++)
+            fields[i] = trim(fields[i]);
+
+        nextBlock.accept(fields, isLastPortion);
+    }
+
+    /**
+     * Trims quote characters from beginning and end of the line.
+     *
+     * @param str String to trim.
+     * @return The trimmed string.
+     */
+    @NotNull private String trim(String str) {
+        int startPos = quoteChars.indexOf(str.charAt(0)) != -1 ? 1 : 0;
+        int endPos = quoteChars.indexOf(str.charAt(str.length() - 1)) != -1 ? str.length() - 1 : str.length();
+
+        return str.substring(startPos, endPos);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/LineSplitterBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/LineSplitterBlock.java
new file mode 100644 (file)
index 0000000..122d0db
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload.pipeline;
+
+import org.apache.ignite.IgniteCheckedException;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A {@link PipelineBlock}, which splits input stream of char[] into lines using the specified {@link Pattern}
+ * as line separator. Next block {@link PipelineBlock#accept(Object, boolean)} is invoked for each line.
+ * Leftover characters are remembered and used during processing the next input batch,
+ * unless isLastPortion flag is specified.
+ */
+public class LineSplitterBlock extends PipelineBlock<char[], String> {
+    /** Line separator pattern */
+    private final Pattern delim;
+
+    /** Leftover characters from the previous invocation of {@link #accept(char[], boolean)}. */
+    private StringBuilder leftover = new StringBuilder();
+
+    /**
+     * Creates line splitter block.
+     *
+     * @param delim The line separator pattern.
+     */
+    public LineSplitterBlock(Pattern delim) {
+        this.delim = delim;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void accept(char[] chars, boolean isLastPortion) throws IgniteCheckedException {
+        leftover.append(chars);
+
+        String input = leftover.toString();
+        Matcher matcher = delim.matcher(input);
+
+        int lastPos = 0;
+        while (matcher.find()) {
+            String outStr = input.substring(lastPos, matcher.start());
+
+            if (!outStr.isEmpty())
+                nextBlock.accept(outStr, false);
+
+            lastPos = matcher.end();
+        }
+
+        if (lastPos != 0)
+            leftover.delete(0, lastPos);
+
+        if (isLastPortion && leftover.length() > 0) {
+            nextBlock.accept(leftover.toString(), true);
+            leftover.setLength(0);
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/PipelineBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/PipelineBlock.java
new file mode 100644 (file)
index 0000000..914b4b4
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload.pipeline;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A file parsing pipeline block. Accepts an portion of an input (isLastPortion flag is provided to signify the last
+ * block to process) and optionally calls the next block with transformed input or performs any other handling,
+ * such as storing input to internal structures.
+ */
+public abstract class PipelineBlock<I, O> {
+    /** The next block in pipeline or null if this block is a terminator. */
+    @Nullable protected PipelineBlock<O, ?> nextBlock;
+
+    /**
+     * Creates a pipeline block.
+     *
+     * <p>(There is no nextBlock argument in the constructor: setting the next block using
+     * {@link #append(PipelineBlock)} method is more convenient.
+     */
+    protected PipelineBlock() {
+        nextBlock = null;
+    }
+
+    /**
+     * Sets the next block in this block and returns the <b>next</b> block.
+     *
+     * <p>Below is an example of using this method to set up a pipeline:<br>
+     * {@code block1.append(block2).append(block3); }.
+     * <p>Block2 here becomes the next for block1, and block3 is the next one for the block2.
+     *
+     * @param next The next block for the current block.
+     * @return The next block ({@code next} argument).
+     */
+    public <N> PipelineBlock<O, N> append(PipelineBlock<O, N> next) {
+        nextBlock = next;
+        return next;
+    }
+
+    /**
+     * Accepts a portion of input. {@code isLastPortion} parameter should be set if this is a last portion
+     * of the input. The method must not be called after the end of input: the call with {@code isLastPortion == true}
+     * is the last one.
+     *
+     * @param inputPortion Portion of input.
+     * @param isLastPortion Is this the last portion.
+     */
+    public abstract void accept(I inputPortion, boolean isLastPortion) throws IgniteCheckedException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/StrListAppenderBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/StrListAppenderBlock.java
new file mode 100644 (file)
index 0000000..91cbc1e
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload.pipeline;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The PipelineBlock which appends its input to a user-supplied list.
+ *
+ * <p>The list is set using {@link #output(List)} method.
+ */
+public class StrListAppenderBlock extends PipelineBlock<String[], Object> {
+    /** The output list. */
+    private List<List<Object>> output;
+
+    /**
+     * Creates the block. List can be configured using {@link #output(List)} method.
+     */
+    public StrListAppenderBlock() {
+        output = null;
+    }
+
+    /**
+     * Sets the output list.
+     *
+     * @param output The output list.
+     */
+    public void output(List<List<Object>> output) {
+        this.output = output;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void accept(String[] elements, boolean isLastPortion) {
+        output.add(Arrays.asList(elements));
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java
new file mode 100644 (file)
index 0000000..8a170ab
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * A reply from server to SQL COPY command, which is essentially a request from server to client
+ * to send files from client to server (see IGNITE-6917 for details).
+ *
+ * @see JdbcBulkLoadProcessor for the protocol.
+ * @see SqlBulkLoadCommand
+ */
+public class JdbcBulkLoadAckResult extends JdbcResult {
+    /** Query ID for matching this command on server in further {@link JdbcBulkLoadBatchRequest} commands. */
+    private long qryId;
+
+    /**
+     * Bulk load parameters, which are parsed on the server side and sent to client to specify
+     * what files to send, batch size, etc.
+     */
+    private BulkLoadAckClientParameters params;
+
+    /**Creates uninitialized bulk load batch request result. */
+    public JdbcBulkLoadAckResult() {
+        super(BULK_LOAD_ACK);
+
+        qryId = 0;
+        params = null;
+    }
+
+    /**
+     * Constructs a request from server (in form of reply) to send files from client to server.
+     *
+     * @param qryId Query ID to send in further {@link JdbcBulkLoadBatchRequest}s.
+     * @param params Various parameters for sending batches from client side.
+     */
+    public JdbcBulkLoadAckResult(long qryId, BulkLoadAckClientParameters params) {
+        super(BULK_LOAD_ACK);
+
+        this.qryId = qryId;
+        this.params = params;
+    }
+
+    /**
+     * Returns the query ID.
+     *
+     * @return Query ID.
+     */
+    public long queryId() {
+        return qryId;
+    }
+
+    /**
+     * Returns the parameters for the client.
+     *
+     * @return The parameters for the client.
+     */
+    public BulkLoadAckClientParameters params() {
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
+        super.writeBinary(writer);
+
+        writer.writeLong(qryId);
+        writer.writeString(params.localFileName());
+        writer.writeInt(params.batchSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException {
+        super.readBinary(reader);
+
+        qryId = reader.readLong();
+
+        String locFileName = reader.readString();
+        int batchSize = reader.readInt();
+
+        if (!BulkLoadAckClientParameters.isValidBatchSize(batchSize))
+            throw new BinaryObjectException(BulkLoadAckClientParameters.batchSizeErrorMsg(batchSize));
+
+        params = new BulkLoadAckClientParameters(locFileName, batchSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(JdbcBulkLoadAckResult.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java
new file mode 100644 (file)
index 0000000..b75de5a
--- /dev/null
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A JDBC request that sends a batch of a file to the server. Used when handling
+ * {@link SqlBulkLoadCommand} command.
+ */
+public class JdbcBulkLoadBatchRequest extends JdbcRequest {
+    /** A sentinel to indicate that {@link #cmd} field was not initialized. */
+    public static final int CMD_UNKNOWN = -1;
+
+    /** Next batch comes in this request and there are more batches. */
+    public static final int CMD_CONTINUE = 0;
+
+    /**
+     * This is the final batch from the client and there was an error on the client side,
+     * so terminate with error on the server side as well.
+     */
+    public static final int CMD_FINISHED_ERROR = 1;
+
+    /**
+     * This is the final batch of the file and everything went well on the client side.
+     * Server may complete the request.
+     */
+    public static final int CMD_FINISHED_EOF = 2;
+
+    /** QueryID of the original COPY command request. */
+    private long qryId;
+
+    /** Batch index starting from 0. */
+    private int batchIdx;
+
+    /** Command (see CMD_xxx constants above). */
+    private int cmd;
+
+    /** Data in this batch. */
+    @NotNull private byte[] data;
+
+    /**
+     * Creates the request with uninitialized parameters.
+     */
+    public JdbcBulkLoadBatchRequest() {
+        super(BULK_LOAD_BATCH);
+
+        qryId = -1;
+        batchIdx = -1;
+        cmd = CMD_UNKNOWN;
+        data = null;
+    }
+
+    /**
+     * Creates the request with specified parameters and zero-length data.
+     * Typically used with {@link #CMD_FINISHED_ERROR} and {@link #CMD_FINISHED_EOF}.
+     *
+     * @param qryId The query ID from the {@link JdbcBulkLoadAckResult}.
+     * @param batchIdx Index of the current batch starting with 0.
+     * @param cmd The command ({@link #CMD_CONTINUE}, {@link #CMD_FINISHED_EOF}, or {@link #CMD_FINISHED_ERROR}).
+     */
+    public JdbcBulkLoadBatchRequest(long qryId, int batchIdx, int cmd) {
+        this(qryId, batchIdx, cmd, new byte[0]);
+    }
+
+    /**
+     * Creates the request with the specified parameters.
+     *
+     * @param qryId The query ID from the {@link JdbcBulkLoadAckResult}.
+     * @param batchIdx Index of the current batch starting with 0.
+     * @param cmd The command ({@link #CMD_CONTINUE}, {@link #CMD_FINISHED_EOF}, or {@link #CMD_FINISHED_ERROR}).
+     * @param data The data block (zero length is acceptable).
+     */
+    public JdbcBulkLoadBatchRequest(long qryId, int batchIdx, int cmd, @NotNull byte[] data) {
+        super(BULK_LOAD_BATCH);
+
+        this.qryId = qryId;
+        this.batchIdx = batchIdx;
+
+        assert isCmdValid(cmd) : "Invalid command value: " + cmd;
+        this.cmd = cmd;
+
+        this.data = data;
+    }
+
+    /**
+     * Returns the original query ID.
+     *
+     * @return The original query ID.
+     */
+    public long queryId() {
+        return qryId;
+    }
+
+    /**
+     * Returns the batch index.
+     *
+     * @return The batch index.
+     */
+    public long batchIdx() {
+        return batchIdx;
+    }
+
+    /**
+     * Returns the command (see CMD_xxx constants for details).
+     *
+     * @return The command.
+     */
+    public int cmd() {
+        return cmd;
+    }
+
+    /**
+     * Returns the data.
+     *
+     * @return data if data was not supplied
+     */
+    @NotNull public byte[] data() {
+        return data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
+        super.writeBinary(writer);
+
+        writer.writeLong(qryId);
+        writer.writeInt(batchIdx);
+        writer.writeInt(cmd);
+        writer.writeByteArray(data);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException {
+        super.readBinary(reader);
+
+        qryId = reader.readLong();
+        batchIdx = reader.readInt();
+
+        int c = reader.readInt();
+        if (!isCmdValid(c))
+            throw new BinaryObjectException("Invalid command: " + cmd);
+
+        cmd = c;
+
+        data = reader.readByteArray();
+        assert data != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(JdbcBulkLoadBatchRequest.class, this);
+    }
+
+    /**
+     * Checks if the command value is valid.
+     *
+     * @param c The command value to check.
+     * @return True if valid, false otherwise.
+     */
+    private static boolean isCmdValid(int c) {
+        return c >= CMD_CONTINUE && c <= CMD_FINISHED_EOF;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java
new file mode 100644 (file)
index 0000000..9757791
--- /dev/null
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_CONTINUE;
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF;
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR;
+
+/**
+ * JDBC wrapper around {@link BulkLoadProcessor} that provides extra logic.
+ *
+ * Unlike other "single shot" request-reply commands, the
+ * COPY command the client-server interaction looks like this:
+ *
+ * <pre>
+ * Thin JDBC client                            Server
+ *        |                                       |
+ *        |------- JdbcQueryExecuteRequest ------>|
+ *        |         with SQL copy command         |
+ *        |                                       |
+ *        |<---- JdbcBulkLoadAckResult -----------|
+ *        | with BulkLoadAckClientParameters      |
+ *        | containing file name and batch size.  |
+ *        |                                       |
+ * (open the file,                                |
+ *  read portions and send them)                  |
+ *        |                                       |
+ *        |------- JdbcBulkLoadBatchRequest #1 -->|
+ *        | with a portion of input file.         |
+ *        |                                       |
+ *        |<--- JdbcQueryExecuteResult -----------|
+ *        | with current update counter.          |
+ *        |                                       |
+ *        |------- JdbcBulkLoadBatchRequest #2--->|
+ *        | with a portion of input file.         |
+ *        |                                       |
+ *        |<--- JdbcQueryExecuteResult -----------|
+ *        | with current update counter.          |
+ *        |                                       |
+ *        |------- JdbcBulkLoadBatchRequest #3--->|
+ *        | with the LAST portion of input file.  |
+ *        |                                       |
+ *        |<--- JdbcQueryExecuteResult -----------|
+ *        | with the final update counter.        |
+ *        |                                       |
+ * (close the file)                               |
+ *        |                                       |
+ * </pre>
+ *
+ * In case of input file reading error, a flag is carried to the server:
+ * {@link JdbcBulkLoadBatchRequest#CMD_FINISHED_ERROR} and the processing
+ * is aborted on the both sides.
+ */
+public class JdbcBulkLoadProcessor {
+    /** A core processor that handles incoming data packets. */
+    private final BulkLoadProcessor processor;
+
+    /** Next batch index (for a very simple check that all batches were delivered to us). */
+    protected long nextBatchIdx;
+
+    /**
+     * Creates a JDBC-specific adapter for bulk load processor.
+     *
+     * @param processor Bulk load processor from the core to delegate calls to.
+     */
+    public JdbcBulkLoadProcessor(BulkLoadProcessor processor) {
+        this.processor = processor;
+        nextBatchIdx = 0;
+    }
+
+    /**
+     * Completely processes a bulk load batch request.
+     *
+     * Calls {@link BulkLoadProcessor} wrapping around some JDBC-specific logic
+     * (commands, bulk load batch index checking).
+     *
+     * @param req The current request.
+     */
+    public void processBatch(JdbcBulkLoadBatchRequest req)
+        throws IgniteCheckedException {
+        if (nextBatchIdx != req.batchIdx())
+            throw new IgniteSQLException("Batch #" + (nextBatchIdx + 1) +
+                    " is missing. Received #" + req.batchIdx() + " instead.");
+
+        nextBatchIdx++;
+
+        switch (req.cmd()) {
+            case CMD_FINISHED_EOF:
+                processor.processBatch(req.data(), true);
+
+                break;
+
+            case CMD_CONTINUE:
+                processor.processBatch(req.data(), false);
+
+                break;
+
+            case CMD_FINISHED_ERROR:
+                break;
+
+            default:
+                throw new IgniteIllegalStateException("Command was not recognized: " + req.cmd());
+        }
+    }
+
+    /**
+     * Closes the underlying objects.
+     * Currently we don't handle normal termination vs. abort.
+     */
+    public void close() throws Exception {
+        processor.close();
+
+        nextBatchIdx = -1;
+    }
+
+    /**
+     * Provides update counter for sending in the {@link JdbcBatchExecuteResult}.
+     *
+     * @return The update counter for sending in {@link JdbcBatchExecuteResult}.
+     */
+    public long updateCnt() {
+        return processor.outputStreamer().updateCnt();
+    }
+}
index 385924c..22522ad 100644 (file)
@@ -60,6 +60,8 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin
     /** Get schemas metadata request. */
     static final byte META_SCHEMAS = 12;
 
+    /** Send a batch of a data from client to server. */
+    static final byte BULK_LOAD_BATCH = 13;
 
     /** Request type. */
     private byte type;
@@ -154,6 +156,11 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin
 
                 break;
 
+            case BULK_LOAD_BATCH:
+                req = new JdbcBulkLoadBatchRequest();
+
+                break;
+
             default:
                 throw new IgniteException("Unknown SQL listener request ID: [request ID=" + reqType + ']');
         }
index 11b50ec..59fc06b 100644 (file)
@@ -32,11 +32,14 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.BulkLoadContextCursor;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteVersionUtils;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
@@ -57,9 +60,13 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_CONTINUE;
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF;
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR;
 import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_3_0;
 import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_4_0;
 import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC;
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BULK_LOAD_BATCH;
 import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_COLUMNS;
 import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_INDEXES;
 import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_PARAMS;
@@ -93,6 +100,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
     /** Current queries cursors. */
     private final ConcurrentHashMap<Long, JdbcQueryCursor> qryCursors = new ConcurrentHashMap<>();
 
+    /** Current bulk load processors. */
+    private final ConcurrentHashMap<Long, JdbcBulkLoadProcessor> bulkLoadRequests = new ConcurrentHashMap<>();
+
     /** Distributed joins flag. */
     private final boolean distributedJoins;
 
@@ -197,6 +207,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
 
                 case META_SCHEMAS:
                     return getSchemas((JdbcMetaSchemasRequest)req);
+
+                case BULK_LOAD_BATCH:
+                    return processBulkLoadFileBatch((JdbcBulkLoadBatchRequest)req);
             }
 
             return new JdbcResponse(IgniteQueryErrorCode.UNSUPPORTED_OPERATION,
@@ -207,6 +220,46 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
         }
     }
 
+    /**
+     * Processes a file batch sent from client as part of bulk load COPY command.
+     *
+     * @param req Request object with a batch of a file received from client.
+     * @return Response to send to the client.
+     */
+    private ClientListenerResponse processBulkLoadFileBatch(JdbcBulkLoadBatchRequest req) {
+        JdbcBulkLoadProcessor processor = bulkLoadRequests.get(req.queryId());
+
+        if (ctx == null)
+            return new JdbcResponse(IgniteQueryErrorCode.UNEXPECTED_OPERATION, "Unknown query ID: "
+                + req.queryId() + ". Bulk load session may have been reclaimed due to timeout.");
+
+        try {
+            processor.processBatch(req);
+
+            switch (req.cmd()) {
+                case CMD_FINISHED_ERROR:
+                case CMD_FINISHED_EOF:
+                    bulkLoadRequests.remove(req.queryId());
+
+                    processor.close();
+
+                    break;
+
+                case CMD_CONTINUE:
+                    break;
+
+                default:
+                    throw new IllegalArgumentException();
+            }
+
+            return new JdbcResponse(new JdbcQueryExecuteResult(req.queryId(), processor.updateCnt()));
+        }
+        catch (Exception e) {
+            U.error(null, "Error processing file batch", e);
+            return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server error: " + e);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public ClientListenerResponse handleException(Exception e, ClientListenerRequest req) {
         return exceptionToResult(e);
@@ -237,6 +290,17 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
             {
                 for (JdbcQueryCursor cursor : qryCursors.values())
                     cursor.close();
+
+                for (JdbcBulkLoadProcessor processor : bulkLoadRequests.values()) {
+                    try {
+                        processor.close();
+                    }
+                    catch (Exception e) {
+                        U.error(null, "Error closing JDBC bulk load processor.", e);
+                    }
+                }
+
+                bulkLoadRequests.clear();
             }
             finally {
                 busyLock.leaveBusy();
@@ -310,10 +374,22 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
             List<FieldsQueryCursor<List<?>>> results = ctx.query().querySqlFields(qry, true,
                 protocolVer.compareTo(VER_2_3_0) < 0);
 
-            if (results.size() == 1) {
-                FieldsQueryCursor<List<?>> qryCur = results.get(0);
+            FieldsQueryCursor<List<?>> fieldsCur = results.get(0);
 
-                JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), (QueryCursorImpl)qryCur);
+            if (fieldsCur instanceof BulkLoadContextCursor) {
+                BulkLoadContextCursor blCur = (BulkLoadContextCursor) fieldsCur;
+
+                BulkLoadProcessor blProcessor = blCur.bulkLoadProcessor();
+                BulkLoadAckClientParameters clientParams = blCur.clientParams();
+
+                bulkLoadRequests.put(qryId, new JdbcBulkLoadProcessor(blProcessor));
+
+                return new JdbcResponse(new JdbcBulkLoadAckResult(qryId, clientParams));
+            }
+
+            if (results.size() == 1) {
+                JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(),
+                    (QueryCursorImpl)fieldsCur);
 
                 JdbcQueryExecuteResult res;
 
@@ -350,8 +426,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
                     if (qryCur.isQuery()) {
                         jdbcRes = new JdbcResultInfo(true, -1, qryId);
 
-                        JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(),
-                            (QueryCursorImpl)qryCur);
+                        JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), qryCur);
 
                         qryCursors.put(qryId, cur);
 
@@ -370,8 +445,6 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
 
                 return new JdbcResponse(new JdbcQueryExecuteMultipleStatementsResult(jdbcResults, items, last));
             }
-
-
         }
         catch (Exception e) {
             qryCursors.remove(qryId);
@@ -534,6 +607,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
             List<FieldsQueryCursor<List<?>>> qryRes = ctx.query().querySqlFields(qry, true, true);
 
             for (FieldsQueryCursor<List<?>> cur : qryRes) {
+                if (cur instanceof BulkLoadContextCursor)
+                    throw new IgniteSQLException("COPY command cannot be executed in batch mode.");
+
                 assert !((QueryCursorImpl)cur).isQuery();
 
                 Iterator<List<?>> it = cur.iterator();
index 623a339..43631e9 100644 (file)
@@ -65,6 +65,9 @@ public class JdbcResult implements JdbcRawBinarylizable {
     /** Columns metadata result V3. */
     static final byte META_COLUMNS_V3 = 15;
 
+    /** A request to send file from client to server. */
+    static final byte BULK_LOAD_ACK = 16;
+
     /** Success status. */
     private byte type;
 
@@ -163,6 +166,11 @@ public class JdbcResult implements JdbcRawBinarylizable {
 
                 break;
 
+            case BULK_LOAD_ACK:
+                res = new JdbcBulkLoadAckResult();
+
+                break;
+
             default:
                 throw new IgniteException("Unknown SQL listener request ID: [request ID=" + resId + ']');
         }
index f5b8c3c..0238b01 100644 (file)
@@ -42,6 +42,9 @@ public class SqlKeyword {
     /** Keyword: BOOL. */
     public static final String BOOL = "BOOL";
 
+    /** Keyword: BATCH_SIZE. */
+    public static final String BATCH_SIZE = "BATCH_SIZE";
+
     /** Keyword: BOOLEAN. */
     public static final String BOOLEAN = "BOOLEAN";
 
@@ -54,6 +57,9 @@ public class SqlKeyword {
     /** Keyword: CHARACTER. */
     public static final String CHARACTER = "CHARACTER";
 
+    /** Keyword: COPY. */
+    public static final String COPY = "COPY";
+
     /** Keyword: CREATE. */
     public static final String CREATE = "CREATE";
 
@@ -90,6 +96,12 @@ public class SqlKeyword {
     /** Keyword: FLOAT8. */
     public static final String FLOAT8 = "FLOAT8";
 
+    /** Keyword: FORMAT. */
+    public static final String FORMAT = "FORMAT";
+
+    /** Keyword: FROM. */
+    public static final String FROM = "FROM";
+
     /** Keyword: FULLTEXT. */
     public static final String FULLTEXT = "FULLTEXT";
 
@@ -120,6 +132,9 @@ public class SqlKeyword {
     /** Keyword: INTEGER. */
     public static final String INTEGER = "INTEGER";
 
+    /** Keyword: INTO. */
+    public static final String INTO = "INTO";
+
     /** Keyword: KEY. */
     public static final String KEY = "KEY";
 
index 401ee98..0627def 100644 (file)
 package org.apache.ignite.internal.sql;
 
 import org.apache.ignite.internal.sql.command.SqlAlterTableCommand;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
 import org.apache.ignite.internal.sql.command.SqlCommand;
 import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand;
 import org.apache.ignite.internal.sql.command.SqlDropIndexCommand;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.sql.SqlKeyword.ALTER;
+import static org.apache.ignite.internal.sql.SqlKeyword.COPY;
 import static org.apache.ignite.internal.sql.SqlKeyword.CREATE;
 import static org.apache.ignite.internal.sql.SqlKeyword.DROP;
 import static org.apache.ignite.internal.sql.SqlKeyword.HASH;
@@ -103,6 +105,11 @@ public class SqlParser {
 
                             break;
 
+                        case COPY:
+                            cmd = processCopy();
+
+                            break;
+
                         case ALTER:
                             cmd = processAlter();
                     }
@@ -115,7 +122,7 @@ public class SqlParser {
                         return cmd;
                     }
                     else
-                        throw errorUnexpectedToken(lex, CREATE, DROP, ALTER);
+                        throw errorUnexpectedToken(lex, CREATE, DROP, COPY, ALTER);
 
                 case QUOTED:
                 case MINUS:
@@ -130,6 +137,15 @@ public class SqlParser {
     }
 
     /**
+     * Processes COPY command.
+     *
+     * @return The {@link SqlBulkLoadCommand} command.
+     */
+    private SqlCommand processCopy() {
+        return new SqlBulkLoadCommand().parse(lex);
+    }
+
+    /**
      * Process CREATE keyword.
      *
      * @return Command.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java
new file mode 100644 (file)
index 0000000..e5246d5
--- /dev/null
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.command;
+
+import org.apache.ignite.internal.processors.bulkload.BulkLoadCsvFormat;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadFormat;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.sql.SqlKeyword;
+import org.apache.ignite.internal.sql.SqlLexer;
+import org.apache.ignite.internal.sql.SqlLexerTokenType;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.ignite.internal.sql.SqlParserUtils.error;
+import static org.apache.ignite.internal.sql.SqlParserUtils.parseIdentifier;
+import static org.apache.ignite.internal.sql.SqlParserUtils.parseInt;
+import static org.apache.ignite.internal.sql.SqlParserUtils.parseQualifiedIdentifier;
+import static org.apache.ignite.internal.sql.SqlParserUtils.skipCommaOrRightParenthesis;
+import static org.apache.ignite.internal.sql.SqlParserUtils.skipIfMatches;
+import static org.apache.ignite.internal.sql.SqlParserUtils.skipIfMatchesKeyword;
+
+/**
+ * A parser for a COPY command (called 'bulk load' in the code, since word 'copy' is too generic).
+ */
+public class SqlBulkLoadCommand implements SqlCommand {
+    /** Local file name to send from client to server. */
+    private String locFileName;
+
+    /** Schema name + table name. */
+    private SqlQualifiedName tblQName;
+
+    /** User-specified list of columns. */
+    private List<String> cols;
+
+    /** File format. */
+    private BulkLoadFormat inputFormat;
+
+    /** Batch size (size of portion of a file sent in each sub-request). */
+    private Integer batchSize;
+
+    /**
+     * Parses the command.
+     *
+     * @param lex The lexer.
+     * @return The parsed command object.
+     */
+    @Override public SqlCommand parse(SqlLexer lex) {
+        skipIfMatchesKeyword(lex, SqlKeyword.FROM); // COPY keyword is already parsed
+
+        parseFileName(lex);
+
+        parseTableName(lex);
+
+        parseColumns(lex);
+
+        parseFormat(lex);
+
+        parseParameters(lex);
+
+        return this;
+    }
+
+    /**
+     * Parses the file name.
+     *
+     * @param lex The lexer.
+     */
+    private void parseFileName(SqlLexer lex) {
+        locFileName = parseIdentifier(lex);
+    }
+
+    /**
+     * Parses the schema and table names.
+     *
+     * @param lex The lexer.
+     */
+    private void parseTableName(SqlLexer lex) {
+        skipIfMatchesKeyword(lex, SqlKeyword.INTO);
+
+        tblQName = parseQualifiedIdentifier(lex);
+    }
+
+    /**
+     * Parses the list of columns.
+     *
+     * @param lex The lexer.
+     */
+    private void parseColumns(SqlLexer lex) {
+        skipIfMatches(lex, SqlLexerTokenType.PARENTHESIS_LEFT);
+
+        cols = new ArrayList<>();
+
+        do {
+            cols.add(parseColumn(lex));
+        }
+        while (!skipCommaOrRightParenthesis(lex));
+    }
+
+    /**
+     * Parses column clause.
+     *
+     * @param lex The lexer.
+     * @return The column name.
+     */
+    private String parseColumn(SqlLexer lex) {
+        return parseIdentifier(lex);
+    }
+
+    /**
+     * Parses the format clause.
+     *
+     * @param lex The lexer.
+     */
+    private void parseFormat(SqlLexer lex) {
+        skipIfMatchesKeyword(lex, SqlKeyword.FORMAT);
+
+        String name = parseIdentifier(lex);
+
+        switch (name.toUpperCase()) {
+            case BulkLoadCsvFormat.NAME:
+                BulkLoadCsvFormat fmt = new BulkLoadCsvFormat();
+
+                // IGNITE-7537 will introduce user-defined values
+                fmt.lineSeparator(BulkLoadCsvFormat.DEFAULT_LINE_SEPARATOR);
+                fmt.fieldSeparator(BulkLoadCsvFormat.DEFAULT_FIELD_SEPARATOR);
+                fmt.quoteChars(BulkLoadCsvFormat.DEFAULT_QUOTE_CHARS);
+                fmt.commentChars(BulkLoadCsvFormat.DEFAULT_COMMENT_CHARS);
+                fmt.escapeChars(BulkLoadCsvFormat.DEFAULT_ESCAPE_CHARS);
+
+                inputFormat = fmt;
+
+                break;
+
+            default:
+                throw error(lex, "Unknown format name: " + name +
+                    ". Currently supported format is " + BulkLoadCsvFormat.NAME);
+        }
+    }
+
+    /**
+     * Parses the optional parameters.
+     *
+     * @param lex The lexer.
+     */
+    private void parseParameters(SqlLexer lex) {
+        while (lex.lookAhead().tokenType() == SqlLexerTokenType.DEFAULT) {
+            switch (lex.lookAhead().token()) {
+                case SqlKeyword.BATCH_SIZE:
+                    lex.shift();
+
+                    int sz = parseInt(lex);
+
+                    if (!BulkLoadAckClientParameters.isValidBatchSize(sz))
+                        throw error(lex, BulkLoadAckClientParameters.batchSizeErrorMsg(sz));
+
+                    batchSize = sz;
+
+                    break;
+
+                default:
+                    return;
+            }
+        }
+    }
+
+    /**
+     * Returns the schemaName.
+     *
+     * @return schemaName.
+     */
+    @Override public String schemaName() {
+        return tblQName.schemaName();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void schemaName(String schemaName) {
+        tblQName.schemaName(schemaName);
+    }
+
+    /**
+     * Returns the table name.
+     *
+     * @return The table name
+     */
+    public String tableName() {
+        return tblQName.name();
+    }
+
+    /**
+     * Sets the table name
+     *
+     * @param tblName The table name.
+     */
+    public void tableName(String tblName) {
+        tblQName.name(tblName);
+    }
+
+    /**
+     * Returns the local file name.
+     *
+     * @return The local file name.
+     */
+    public String localFileName() {
+        return locFileName;
+    }
+
+    /**
+     * Sets the local file name.
+     *
+     * @param locFileName The local file name.
+     */
+    public void localFileName(String locFileName) {
+        this.locFileName = locFileName;
+    }
+
+    /**
+     * Returns the list of columns.
+     *
+     * @return The list of columns.
+     */
+    public List<String> columns() {
+        return cols;
+    }
+
+    /**
+     * Returns the input file format.
+     *
+     * @return The input file format.
+     */
+    public BulkLoadFormat inputFormat() {
+        return inputFormat;
+    }
+
+    /**
+     * Returns the batch size.
+     *
+     * @return The batch size.
+     */
+    public Integer batchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Sets the batch size.
+     *
+     * @param batchSize The batch size.
+     */
+    public void batchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SqlBulkLoadCommand.class, this);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java
new file mode 100644 (file)
index 0000000..b5cd55b
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql;
+
+/**
+ * Tests for SQL parser: COPY command.
+ */
+public class SqlParserBulkLoadSelfTest extends SqlParserAbstractSelfTest {
+    /**
+     * Tests for COPY command.
+     *
+     * @throws Exception If any of sub-tests was failed.
+     */
+    public void testCopy() {
+        assertParseError(null,
+            "copy grom \"any.file\" into Person (_key, age, firstName, lastName) format csv",
+            "Unexpected token: \"GROM\" (expected: \"FROM\")");
+
+        assertParseError(null,
+            "copy from into Person (_key, age, firstName, lastName) format csv",
+            "Unexpected token: \"INTO\" (expected: \"[identifier]\"");
+
+        assertParseError(null,
+            "copy from any.file into Person (_key, age, firstName, lastName) format csv",
+            "Unexpected token: \".\" (expected: \"INTO\"");
+
+        assertParseError(null,
+            "copy from \"any.file\" to Person (_key, age, firstName, lastName) format csv",
+            "Unexpected token: \"TO\" (expected: \"INTO\")");
+
+        // Column list
+
+        assertParseError(null,
+            "copy from \"any.file\" into Person () format csv",
+            "Unexpected token: \")\" (expected: \"[identifier]\")");
+
+        assertParseError(null,
+            "copy from \"any.file\" into Person (,) format csv",
+            "Unexpected token: \",\" (expected: \"[identifier]\")");
+
+        assertParseError(null,
+            "copy from \"any.file\" into Person format csv",
+            "Unexpected token: \"FORMAT\" (expected: \"(\")");
+
+        // FORMAT
+
+        assertParseError(null,
+            "copy from \"any.file\" into Person (_key, age, firstName, lastName)",
+            "Unexpected end of command (expected: \"FORMAT\")");
+
+        assertParseError(null,
+            "copy from \"any.file\" into Person (_key, age, firstName, lastName) format lsd",
+            "Unknown format name: LSD");
+    }
+}
index 72e80e2..c46c906 100644 (file)
@@ -39,8 +39,15 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.BulkLoadContextCursor;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadParser;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
@@ -58,8 +65,12 @@ import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
+import org.apache.ignite.internal.sql.command.SqlCommand;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
 import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T3;
@@ -967,6 +978,67 @@ public class DmlStatementsProcessor {
         return updateSqlFields(schemaName, c, GridSqlQueryParser.prepared(stmt), fldsQry, local, filter, cancel);
     }
 
+    /**
+     * Runs a DML statement for which we have internal command executor.
+     *
+     * @param sql The SQL command text to execute.
+     * @param cmd The command to execute.
+     * @return The cursor returned by the statement.
+     * @throws IgniteSQLException If failed.
+     */
+    public FieldsQueryCursor<List<?>> runNativeDmlStatement(String sql, SqlCommand cmd) {
+        try {
+            if (cmd instanceof SqlBulkLoadCommand)
+                return processBulkLoadCommand((SqlBulkLoadCommand)cmd);
+            else
+                throw new IgniteSQLException("Unsupported DML operation: " + sql,
+                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+        }
+        catch (IgniteSQLException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteSQLException("Unexpected DML operation failure: " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Process bulk load COPY command.
+     *
+     * @param cmd The command.
+     * @return The context (which is the result of the first request/response).
+     * @throws IgniteCheckedException If something failed.
+     */
+    public FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd) throws IgniteCheckedException {
+        if (cmd.batchSize() == null)
+            cmd.batchSize(BulkLoadAckClientParameters.DEFAULT_BATCH_SIZE);
+
+        GridH2Table tbl = idx.dataTable(cmd.schemaName(), cmd.tableName());
+
+        if (tbl == null)
+            throw new IgniteSQLException("Table does not exist: " + cmd.tableName(),
+                IgniteQueryErrorCode.TABLE_NOT_FOUND);
+
+        UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl);
+
+        IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new BulkLoadDataConverter(plan);
+
+        GridCacheContext cache = tbl.cache();
+
+        IgniteDataStreamer<Object, Object> streamer = cache.grid().dataStreamer(cache.name());
+
+        BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer);
+
+        BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat());
+
+        BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter);
+
+        BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.batchSize());
+
+        return new BulkLoadContextCursor(processor, params);
+    }
+
     /** */
     private final static class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
         /** Value to set. */
@@ -1081,4 +1153,31 @@ public class DmlStatementsProcessor {
         }
     }
 
+    /**
+     * Converts a row of values to actual key+value using {@link UpdatePlan#processRow(List)}.
+     */
+    private static class BulkLoadDataConverter extends IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> {
+        /** Update plan to convert incoming rows. */
+        private final UpdatePlan plan;
+
+        /**
+         * Creates the converter with the given update plan.
+         *
+         * @param plan The update plan to use.
+         */
+        private BulkLoadDataConverter(UpdatePlan plan) {
+            this.plan = plan;
+        }
+
+        /**
+         * Converts the record to a key+value.
+         *
+         * @param record The record to convert.
+         * @return The key+value.
+         * @throws IgniteCheckedException If conversion failed for some reason.
+         */
+        @Override public IgniteBiTuple<?, ?> applyx(List<?> record) throws IgniteCheckedException {
+            return plan.processRow(record);
+        }
+    }
 }
index 96b8935..06c936b 100644 (file)
@@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -99,7 +100,6 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
 import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
-import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory;
@@ -120,6 +120,7 @@ import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisito
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.sql.SqlParseException;
 import org.apache.ignite.internal.sql.SqlParser;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
 import org.apache.ignite.internal.sql.command.SqlAlterTableCommand;
 import org.apache.ignite.internal.sql.command.SqlCommand;
 import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand;
@@ -190,6 +191,9 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType
  */
 @SuppressWarnings({"UnnecessaryFullyQualifiedName", "NonFinalStaticVariableUsedInClassInitialization"})
 public class IgniteH2Indexing implements GridQueryIndexing {
+    public static final Pattern INTERNAL_CMD_RE = Pattern.compile(
+        "^(create|drop)\\s+index|^alter\\s+table|^copy", Pattern.CASE_INSENSITIVE);
+
     /*
      * Register IO for indexes.
      */
@@ -1437,9 +1441,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     private List<FieldsQueryCursor<List<?>>> tryQueryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry) {
         // Heuristic check for fast return.
-        String sqlUpper = qry.getSql().toUpperCase();
-
-        if (!(sqlUpper.contains("INDEX") || sqlUpper.contains("ALTER")))
+        if (!INTERNAL_CMD_RE.matcher(qry.getSql().trim()).find())
             return null;
 
         // Parse.
@@ -1454,9 +1456,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (parser.nextCommand() != null)
                 return null;
 
-            // Only CREATE/DROP INDEX and ALTER TABLE commands are supported for now.
+            // Currently supported commands are: CREATE/DROP INDEX/COPY/ALTER TABLE
             if (!(cmd instanceof SqlCreateIndexCommand || cmd instanceof SqlDropIndexCommand ||
-                cmd instanceof SqlAlterTableCommand))
+                cmd instanceof SqlBulkLoadCommand || cmd instanceof SqlAlterTableCommand))
                 return null;
         }
         catch (Exception e) {
@@ -1472,17 +1474,26 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (e instanceof SqlParseException)
                 code = ((SqlParseException)e).code();
 
-            throw new IgniteSQLException("Failed to parse DDL statement: " + qry.getSql(), code, e);
+            throw new IgniteSQLException("Failed to parse DDL statement: " + qry.getSql() + ": " + e.getMessage(),
+                code, e);
         }
 
         // Execute.
-        try {
-            FieldsQueryCursor<List<?>> res = ddlProc.runDdlStatement(qry.getSql(), cmd);
+        if (cmd instanceof SqlBulkLoadCommand) {
+            FieldsQueryCursor<List<?>> cursor = dmlProc.runNativeDmlStatement(qry.getSql(), cmd);
 
-            return Collections.singletonList(res);
+            return Collections.singletonList(cursor);
         }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() + ']', e);
+        else {
+            try {
+                FieldsQueryCursor<List<?>> cursor = ddlProc.runDdlStatement(qry.getSql(), cmd);
+
+                return Collections.singletonList(cursor);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() + "]: "
+                    + e.getMessage(), e);
+            }
         }
     }
 
index ca7680a..6f5b51f 100644 (file)
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.sql.command.SqlDropIndexCommand;
 import org.apache.ignite.internal.sql.command.SqlIndexColumn;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.command.Prepared;
 import org.h2.command.ddl.AlterTableAlterColumn;
 import org.h2.command.ddl.CreateIndex;
@@ -483,6 +484,7 @@ public class DdlStatementsProcessor {
             return resCur;
         }
         catch (SchemaOperationException e) {
+            U.error(null, "DDL operation failure", e);
             throw convert(e);
         }
         catch (IgniteSQLException e) {
index 0440648..d9c627a 100644 (file)
@@ -22,15 +22,18 @@ package org.apache.ignite.internal.processors.query.h2.dml;
  * or UPDATE/DELETE from subquery or literals/params based.
  */
 public enum UpdateMode {
-    /** */
+    /** MERGE command. */
     MERGE,
 
-    /** */
+    /** INSERT command. */
     INSERT,
 
-    /** */
+    /** UPDATE command. */
     UPDATE,
 
-    /** */
+    /** DELETE command. */
     DELETE,
+
+    /** COPY command. */
+    BULK_LOAD
 }
index 17dc9d1..10d485a 100644 (file)
@@ -39,6 +39,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.h2.table.Column;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.query.h2.dml.UpdateMode.BULK_LOAD;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT;
 
 /**
@@ -182,6 +183,10 @@ public final class UpdatePlan {
      * @throws IgniteCheckedException if failed.
      */
     public IgniteBiTuple<?, ?> processRow(List<?> row) throws IgniteCheckedException {
+        if (mode != BULK_LOAD && row.size() != colNames.length)
+            throw new IgniteSQLException("Not enough values in a row: " + row.size() + " instead of " + colNames.length,
+                IgniteQueryErrorCode.ENTRY_PROCESSING);
+
         GridH2RowDescriptor rowDesc = tbl.rowDescriptor();
         GridQueryTypeDescriptor desc = rowDesc.type();
 
@@ -205,7 +210,8 @@ public final class UpdatePlan {
 
         if (key == null) {
             if (F.isEmpty(desc.keyFieldName()))
-                throw new IgniteSQLException("Key for INSERT or MERGE must not be null", IgniteQueryErrorCode.NULL_KEY);
+                throw new IgniteSQLException("Key for INSERT, COPY, or MERGE must not be null",
+                    IgniteQueryErrorCode.NULL_KEY);
             else
                 throw new IgniteSQLException("Null value is not allowed for column '" + desc.keyFieldName() + "'",
                     IgniteQueryErrorCode.NULL_KEY);
@@ -213,16 +219,18 @@ public final class UpdatePlan {
 
         if (val == null) {
             if (F.isEmpty(desc.valueFieldName()))
-                throw new IgniteSQLException("Value for INSERT, MERGE, or UPDATE must not be null",
+                throw new IgniteSQLException("Value for INSERT, COPY, MERGE, or UPDATE must not be null",
                     IgniteQueryErrorCode.NULL_VALUE);
             else
                 throw new IgniteSQLException("Null value is not allowed for column '" + desc.valueFieldName() + "'",
                     IgniteQueryErrorCode.NULL_VALUE);
         }
 
+        int actualColCnt = Math.min(colNames.length, row.size());
+
         Map<String, Object> newColVals = new HashMap<>();
 
-        for (int i = 0; i < colNames.length; i++) {
+        for (int i = 0; i < actualColCnt; i++) {
             if (i == keyColIdx || i == valColIdx)
                 continue;
 
@@ -241,14 +249,14 @@ public final class UpdatePlan {
 
         // We update columns in the order specified by the table for a reason - table's
         // column order preserves their precedence for correct update of nested properties.
-        Column[] cols = tbl.getColumns();
+        Column[] tblCols = tbl.getColumns();
 
         // First 3 columns are _key, _val and _ver. Skip 'em.
-        for (int i = DEFAULT_COLUMNS_COUNT; i < cols.length; i++) {
+        for (int i = DEFAULT_COLUMNS_COUNT; i < tblCols.length; i++) {
             if (tbl.rowDescriptor().isKeyValueOrVersionColumn(i))
                 continue;
 
-            String colName = cols[i].getName();
+            String colName = tblCols[i].getName();
 
             if (!newColVals.containsKey(colName))
                 continue;
index 3305b00..bced836 100644 (file)
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUpdate;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -407,6 +408,91 @@ public final class UpdatePlanBuilder {
     }
 
     /**
+     * Prepare update plan for COPY command (AKA bulk load).
+     *
+     * @param cmd Bulk load command
+     * @return The update plan for this command.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public static UpdatePlan planForBulkLoad(SqlBulkLoadCommand cmd, GridH2Table tbl) throws IgniteCheckedException {
+        GridH2RowDescriptor desc = tbl.rowDescriptor();
+
+        if (desc == null)
+            throw new IgniteSQLException("Row descriptor undefined for table '" + tbl.getName() + "'",
+                IgniteQueryErrorCode.NULL_TABLE_DESCRIPTOR);
+
+        GridCacheContext<?, ?> cctx = desc.context();
+
+        List<String> cols = cmd.columns();
+
+        if (cols == null)
+            throw new IgniteSQLException("Columns are not defined", IgniteQueryErrorCode.NULL_TABLE_DESCRIPTOR);
+
+        String[] colNames = new String[cols.size()];
+
+        int[] colTypes = new int[cols.size()];
+
+        int keyColIdx = -1;
+        int valColIdx = -1;
+
+        boolean hasKeyProps = false;
+        boolean hasValProps = false;
+
+        for (int i = 0; i < cols.size(); i++) {
+            String colName = cols.get(i);
+
+            colNames[i] = colName;
+
+            Column h2Col = tbl.getColumn(colName);
+
+            colTypes[i] = h2Col.getType();
+            int colId = h2Col.getColumnId();
+
+            if (desc.isKeyColumn(colId)) {
+                keyColIdx = i;
+                continue;
+            }
+
+            if (desc.isValueColumn(colId)) {
+                valColIdx = i;
+                continue;
+            }
+
+            GridQueryProperty prop = desc.type().property(colName);
+
+            assert prop != null : "Property '" + colName + "' not found.";
+
+            if (prop.key())
+                hasKeyProps = true;
+            else
+                hasValProps = true;
+        }
+
+        KeyValueSupplier keySupplier = createSupplier(cctx, desc.type(), keyColIdx, hasKeyProps,
+            true, false);
+        KeyValueSupplier valSupplier = createSupplier(cctx, desc.type(), valColIdx, hasValProps,
+            false, false);
+
+        return new UpdatePlan(
+            UpdateMode.BULK_LOAD,
+            tbl,
+            colNames,
+            colTypes,
+            keySupplier,
+            valSupplier,
+            keyColIdx,
+            valColIdx,
+            null,
+            true,
+            null,
+            0,
+            null,
+            null
+        );
+    }
+
+    /**
      * Detect appropriate method of instantiating key or value (take from param, create binary builder,
      * invoke default ctor, or allocate).
      *
index 68610a1..6295d8d 100644 (file)
@@ -165,6 +165,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
 import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistributedJoinsTest;
 import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest;
 import org.apache.ignite.internal.processors.sql.SqlConnectorConfigurationValidationSelfTest;
+import org.apache.ignite.internal.sql.SqlParserBulkLoadSelfTest;
 import org.apache.ignite.internal.sql.SqlParserCreateIndexSelfTest;
 import org.apache.ignite.internal.sql.SqlParserDropIndexSelfTest;
 import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;
@@ -183,6 +184,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
 
         suite.addTestSuite(SqlParserCreateIndexSelfTest.class);
         suite.addTestSuite(SqlParserDropIndexSelfTest.class);
+        suite.addTestSuite(SqlParserBulkLoadSelfTest.class);
 
         suite.addTestSuite(SqlConnectorConfigurationValidationSelfTest.class);
         suite.addTestSuite(ClientConnectorConfigurationValidationSelfTest.class);
index ba5d576..5d85f0d 100644 (file)
                                         <exclude>**/*index*.md</exclude><!--readme files-->
                                         <exclude>**/*.timestamp</exclude><!--tmp-files-->
                                         <exclude>**/*.iml</exclude><!--IDEA files-->
+                                        <exclude>**/*.csv</exclude><!--CSV files-->
                                         <exclude>**/pom-installed.xml</exclude><!--tmp-files-->
                                         <exclude>**/keystore/*.jks</exclude><!--bin-files-->
                                         <exclude>**/keystore/*.pem</exclude><!--auto generated files-->
                                         <exclude>**/books/*.txt</exclude><!--books examples-->
                                         <exclude>src/main/java/org/apache/ignite/examples/streaming/wordcount/*.txt</exclude><!--books examples-->
                                         <exclude>examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/*.txt</exclude><!--books examples-->
-                                        <exclude>src/main/resources/person.csv</exclude><!--CacheLoadOnlyStoreExample csv-->
                                         <exclude>**/resources/datasets/**/*</exclude><!--KNN Datasets in ml module-->
-                                        <exclude>examples/src/main/resources/person.csv</exclude><!--CacheLoadOnlyStoreExample csv-->
                                         <exclude>src/main/java/org/jetbrains/annotations/*.java</exclude><!--copyright-->
                                         <exclude>dev-tools/IGNITE-*.patch</exclude>
                                         <exclude>dev-tools/.gradle/**/*</exclude>