IGNITE-5438: JDBC Thin Driver: support Statement.setQueryTimeout. This closes #5772...
authoralapin <lapin1702@gmail.com>
Wed, 16 Jan 2019 14:26:03 +0000 (17:26 +0300)
committerdevozerov <vozerov@gridgain.com>
Wed, 16 Jan 2019 14:26:03 +0000 (17:26 +0300)
modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java
modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementTimeoutSelfTest.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java

index 89ff29b..b959242 100644 (file)
@@ -75,6 +75,7 @@ import org.apache.ignite.jdbc.thin.JdbcThinSchemaCaseTest;
 import org.apache.ignite.jdbc.thin.JdbcThinSelectAfterAlterTable;
 import org.apache.ignite.jdbc.thin.JdbcThinStatementCancelSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinStatementSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinStatementTimeoutSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinStreamingNotOrderedSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinStreamingOrderedSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinTcpIoTest;
@@ -165,6 +166,7 @@ import org.junit.runners.Suite;
     JdbcThinMetadataPrimaryKeysSelfTest.class,
     JdbcThinErrorsSelfTest.class,
     JdbcThinStatementCancelSelfTest.class,
+    JdbcThinStatementTimeoutSelfTest.class,
 
     JdbcThinInsertStatementSelfTest.class,
     JdbcThinUpdateStatementSelfTest.class,
index a58137b..3f5ea7f 100644 (file)
@@ -23,7 +23,6 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
-import java.sql.SQLTimeoutException;
 import java.sql.Statement;
 import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCache;
@@ -386,29 +385,6 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest {
      * @throws Exception If failed.
      */
     @org.junit.Test
-    public void testExecuteQueryTimeout() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-5438");
-
-        final String sqlText = "select sleep_func(3)";
-
-        stmt.setQueryTimeout(1);
-
-        // Timeout
-        GridTestUtils.assertThrows(log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    return stmt.executeQuery(sqlText);
-                }
-            },
-            SQLTimeoutException.class,
-            "Timeout"
-        );
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @org.junit.Test
     public void testExecuteQueryMultipleOnlyResultSets() throws Exception {
         assert conn.getMetaData().supportsMultipleResultSets();
 
@@ -571,29 +547,6 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest {
      * @throws Exception If failed.
      */
     @org.junit.Test
-    public void testExecuteUpdateTimeout() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-5438");
-
-        final String sqlText = "update test set val=1 where _key=sleep_func(3)";
-
-        stmt.setQueryTimeout(1);
-
-        // Timeout
-        GridTestUtils.assertThrows(log,
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    return stmt.executeUpdate(sqlText);
-                }
-            },
-            SQLTimeoutException.class,
-            "Timeout"
-        );
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @org.junit.Test
     public void testClose() throws Exception {
         String sqlText = "select * from test";
 
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementTimeoutSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementTimeoutSelfTest.java
new file mode 100644 (file)
index 0000000..ae63eff
--- /dev/null
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.sql.Statement;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Statement timeout test.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+@RunWith(JUnit4.class)
+public class JdbcThinStatementTimeoutSelfTest extends JdbcThinAbstractSelfTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** URL. */
+    private static final String URL = "jdbc:ignite:thin://127.0.0.1/";
+
+    /** Server thread pull size. */
+    private static final int SERVER_THREAD_POOL_SIZE = 4;
+
+    /** Connection. */
+    private Connection conn;
+
+    /** Statement. */
+    private Statement stmt;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setSqlFunctionClasses(TestSQLFunctions.class);
+        cache.setIndexedTypes(Integer.class, Integer.class, Long.class, Long.class, String.class,
+            JdbcThinAbstractDmlStatementSelfTest.Person.class);
+
+        cfg.setCacheConfiguration(cache);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setClientConnectorConfiguration(new ClientConnectorConfiguration().setThreadPoolSize(SERVER_THREAD_POOL_SIZE));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(3);
+
+        for (int i = 0; i < 10000; ++i)
+            grid(0).cache(DEFAULT_CACHE_NAME).put(i, i);
+
+        for (int i = 0; i < 10000; ++i)
+            grid(0).cache(DEFAULT_CACHE_NAME).put((long)i, (long)i);
+    }
+
+    /**
+     * Called before execution of every test method in class.
+     *
+     * @throws Exception If failed.
+     */
+    @Before
+    public void before() throws Exception {
+        conn = DriverManager.getConnection(URL);
+
+        conn.setSchema('"' + DEFAULT_CACHE_NAME + '"');
+
+        stmt = conn.createStatement();
+
+        assert stmt != null;
+        assert !stmt.isClosed();
+    }
+
+    /**
+     * Called after execution of every test method in class.
+     *
+     * @throws Exception If failed.
+     */
+    @After
+    public void after() throws Exception {
+        if (stmt != null && !stmt.isClosed()) {
+            stmt.close();
+
+            assert stmt.isClosed();
+        }
+
+        conn.close();
+
+        assert stmt.isClosed();
+        assert conn.isClosed();
+    }
+
+    /**
+     * Trying to set negative timeout. <code>SQLException</> with message "Invalid timeout value." is expected.
+     */
+    @Test
+    public void testSettingNegativeQueryTimeout() {
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.setQueryTimeout(-1);
+
+            return null;
+        }, SQLException.class, "Invalid timeout value.");
+    }
+
+    /**
+     * Trying to set zero timeout. Zero timeout means no timeout, so no exception is expected.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSettingZeroQueryTimeout() throws Exception {
+        stmt.setQueryTimeout(0);
+
+        stmt.executeQuery("select sleep_func(1000);");
+    }
+
+    /**
+     * Setting timeout that is greater than query execution time. <code>SQLTimeoutException</code> is expected.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testQueryTimeout() throws Exception {
+        stmt.setQueryTimeout(2);
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.executeQuery("select sleep_func(10) from Integer;");
+
+            return null;
+        }, SQLTimeoutException.class, "The query was cancelled while executing.");
+    }
+
+    /**
+     * Setting timeout that is greater than query execution time. Running same query multiple times.
+     * <code>SQLTimeoutException</code> is expected in all cases.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testQueryTimeoutRepeatable() throws Exception {
+        stmt.setQueryTimeout(2);
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.executeQuery("select sleep_func(10) from Integer;");
+
+            return null;
+        }, SQLTimeoutException.class, "The query was cancelled while executing.");
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.executeQuery("select sleep_func(10) from Integer;");
+
+            return null;
+        }, SQLTimeoutException.class, "The query was cancelled while executing.");
+    }
+
+    /**
+     * Setting timeout that is greater than file uploading execution time.
+     * <code>SQLTimeoutException</code> is expected.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testFileUploadingTimeout() throws Exception {
+
+        File file = File.createTempFile("bulkload", "csv");
+
+        FileWriter writer = new FileWriter(file);
+
+        for (int i = 1; i <= 1_000_000; i++)
+            writer.write(String.format("%d,%d,\"FirstName%d MiddleName%d\",LastName%d", i, i, i, i, i));
+
+        writer.close();
+
+        stmt.setQueryTimeout(1);
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.executeUpdate(
+                "copy from '" + file.getAbsolutePath() + "' into Person" +
+                    " (_key, age, firstName, lastName)" +
+                    " format csv");
+
+            return null;
+        }, SQLTimeoutException.class, "The query was cancelled while executing.");
+    }
+
+    /**
+     * Setting timeout that is greater than batch query execution time.
+     * <code>SQLTimeoutException</code> is expected.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBatchQuery() throws Exception {
+        stmt.setQueryTimeout(1);
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)");
+            stmt.addBatch("update Long set _val = _val + 1 where _key > sleep_func (10)");
+
+            stmt.executeBatch();
+
+            return null;
+        }, SQLTimeoutException.class, "The query was cancelled while executing.");
+    }
+
+    /**
+     * Setting timeout that is greater than multiple statements query execution time.
+     * <code>SQLTimeoutException</code> is expected.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMultipleStatementsQuery() throws Exception {
+        stmt.setQueryTimeout(1);
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.execute(
+                "update Long set _val = _val + 1 where _key > sleep_func (10);"
+                    + "update Long set _val = _val + 1 where _key > sleep_func (10);"
+                    + "update Long set _val = _val + 1 where _key > sleep_func (10);"
+                    + "update Long set _val = _val + 1 where _key > sleep_func (10);"
+                    + "select _val, sleep_func(10) as s from Integer limit 10");
+
+            return null;
+        }, SQLTimeoutException.class, "The query was cancelled while executing.");
+    }
+
+    /**
+     * Setting timeout that is greater than update query execution time.
+     * <code>SQLTimeoutException</code> is expected.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExecuteUpdateTimeout() throws Exception {
+        stmt.setQueryTimeout(1);
+
+        GridTestUtils.assertThrows(log, () ->
+                stmt.executeUpdate("update Integer set _val=1 where _key > sleep_func(10)"),
+            SQLTimeoutException.class, "The query was cancelled while executing.");
+    }
+
+    /**
+     * Utility class with custom SQL functions.
+     */
+    public static class TestSQLFunctions {
+        /**
+         * @param v amount of milliseconds to sleep
+         * @return amount of milliseconds to sleep
+         */
+        @SuppressWarnings("unused")
+        @QuerySqlFunction
+        public static int sleep_func(int v) {
+            try {
+                Thread.sleep(v);
+            }
+            catch (InterruptedException ignored) {
+                // No-op
+            }
+            return v;
+        }
+    }
+}
\ No newline at end of file
index 3ce4319..fb3064e 100644 (file)
@@ -29,6 +29,7 @@ import java.sql.PreparedStatement;
 import java.sql.SQLClientInfoException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLTimeoutException;
 import java.sql.SQLWarning;
 import java.sql.SQLXML;
 import java.sql.Savepoint;
@@ -38,14 +39,19 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
@@ -76,6 +82,12 @@ public class JdbcThinConnection implements Connection {
     /** Logger. */
     private static final Logger LOG = Logger.getLogger(JdbcThinConnection.class.getName());
 
+    /** Request timeout period. */
+    private static final int REQUEST_TIMEOUT_PERIOD = 1_000;
+
+    /** Zero timeout as query timeout means no timeout. */
+    static final int NO_TIMEOUT = 0;
+
     /** Statements modification mutex. */
     private final Object stmtsMux = new Object();
 
@@ -118,6 +130,9 @@ public class JdbcThinConnection implements Connection {
     /** Tracked statements to close on disconnect. */
     private final ArrayList<JdbcThinStatement> stmts = new ArrayList<>();
 
+    /** Query timeout timer */
+    private final Timer timer;
+
     /**
      * Creates new connection.
      *
@@ -135,6 +150,8 @@ public class JdbcThinConnection implements Connection {
 
         cliIo = new JdbcThinTcpIo(connProps);
 
+        timer = new Timer("query-timeout-timer");
+
         ensureConnected();
     }
 
@@ -388,6 +405,8 @@ public class JdbcThinConnection implements Connection {
         closed = true;
 
         cliIo.close();
+
+        timer.cancel();
     }
 
     /** {@inheritDoc} */
@@ -757,10 +776,25 @@ public class JdbcThinConnection implements Connection {
     <R extends JdbcResult> R sendRequest(JdbcRequest req, JdbcThinStatement stmt) throws SQLException {
         ensureConnected();
 
+        RequestTimeoutTimerTask reqTimeoutTimerTask = null;
+
         try {
+            if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT) {
+                reqTimeoutTimerTask = new RequestTimeoutTimerTask(
+                    req instanceof JdbcBulkLoadBatchRequest ? stmt.currentRequestId() : req.requestId(),
+                    stmt.requestTimeout());
+
+                timer.schedule(reqTimeoutTimerTask, 0, REQUEST_TIMEOUT_PERIOD);
+            }
+
             JdbcResponse res = cliIo.sendRequest(req, stmt);
 
-            if (res.status() != ClientListenerResponse.STATUS_SUCCESS)
+            if (res.status() == IgniteQueryErrorCode.QUERY_CANCELED && stmt != null &&
+                stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTimerTask != null && reqTimeoutTimerTask.expired.get()) {
+                throw new SQLTimeoutException(QueryCancelledException.ERR_MSG, SqlStateCode.QUERY_CANCELLED,
+                    IgniteQueryErrorCode.QUERY_CANCELED);
+            }
+            else if (res.status() != ClientListenerResponse.STATUS_SUCCESS)
                 throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()), res.status());
 
             return (R)res.response();
@@ -773,6 +807,10 @@ public class JdbcThinConnection implements Connection {
 
             throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e);
         }
+        finally {
+            if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTimerTask != null)
+                reqTimeoutTimerTask.cancel();
+        }
     }
 
     /**
@@ -844,6 +882,8 @@ public class JdbcThinConnection implements Connection {
 
             stmts.clear();
         }
+
+        timer.cancel();
     }
 
     /**
@@ -873,9 +913,6 @@ public class JdbcThinConnection implements Connection {
         /** Maximum requests count that may be sent before any responses. */
         private static final int MAX_REQUESTS_BEFORE_RESPONSE = 10;
 
-        /** Wait timeout. */
-        private static final long WAIT_TIMEOUT = 1;
-
         /** Batch size for streaming. */
         private int streamBatchSize;
 
@@ -1076,4 +1113,52 @@ public class JdbcThinConnection implements Connection {
     boolean isQueryCancellationSupported() {
         return cliIo.isQueryCancellationSupported();
     }
+
+    /**
+     * Request Timeout Timer Task
+     */
+    private class RequestTimeoutTimerTask extends TimerTask {
+
+        /** Request id. */
+        private long reqId;
+
+        /** Remaining query timeout. */
+        private int remainingQryTimeout;
+
+        /** Flag that shows whether TimerTask was expired or not. */
+        private AtomicBoolean expired;
+
+        /**
+         * @param reqId Request Id to cancel in case of timeout
+         * @param initReqTimeout Initial request timeout
+         */
+        RequestTimeoutTimerTask(long reqId, int initReqTimeout) {
+            this.reqId = reqId;
+
+            remainingQryTimeout = initReqTimeout;
+
+            expired = new AtomicBoolean(false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                if (remainingQryTimeout <= 0) {
+                    expired.set(true);
+
+                    sendQueryCancelRequest(new JdbcQueryCancelRequest(reqId));
+
+                    cancel();
+                }
+
+                remainingQryTimeout -= REQUEST_TIMEOUT_PERIOD;
+            }
+            catch (SQLException e) {
+                LOG.log(Level.WARNING,
+                    "Request timeout processing failure: unable to cancel request [reqId=" + reqId + ']', e);
+
+                cancel();
+            }
+        }
+    }
 }
\ No newline at end of file
index 500e632..935f988 100644 (file)
@@ -25,6 +25,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLTimeoutException;
 import java.sql.SQLWarning;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -81,6 +82,9 @@ public class JdbcThinStatement implements Statement {
     /** Query timeout. */
     private int timeout;
 
+    /** Request timeout. */
+    private int reqTimeout;
+
     /** Fetch size. */
     private int pageSize = DFLT_PAGE_SIZE;
 
@@ -305,34 +309,49 @@ public class JdbcThinStatement implements Statement {
                 byte[] buf = new byte[batchSize];
 
                 int readBytes;
+                int timeSpendMillis = 0;
+
                 while ((readBytes = input.read(buf)) != -1) {
+                    long startTime = System.currentTimeMillis();
+
                     if (readBytes == 0)
                         continue;
 
+                    if (reqTimeout != JdbcThinConnection.NO_TIMEOUT)
+                        reqTimeout -= timeSpendMillis;
+
                     JdbcResult res = conn.sendRequest(new JdbcBulkLoadBatchRequest(
-                        cmdRes.cursorId(),
-                        batchNum++,
-                        JdbcBulkLoadBatchRequest.CMD_CONTINUE,
-                        readBytes == buf.length ? buf : Arrays.copyOf(buf, readBytes)),
+                            cmdRes.cursorId(),
+                            batchNum++,
+                            JdbcBulkLoadBatchRequest.CMD_CONTINUE,
+                            readBytes == buf.length ? buf : Arrays.copyOf(buf, readBytes)),
                         this);
 
                     if (!(res instanceof JdbcQueryExecuteResult))
                         throw new SQLException("Unknown response sent by the server: " + res);
+
+                    timeSpendMillis = (int)(System.currentTimeMillis() - startTime);
                 }
 
+                if (reqTimeout != JdbcThinConnection.NO_TIMEOUT)
+                    reqTimeout -= timeSpendMillis;
+
                 return conn.sendRequest(new JdbcBulkLoadBatchRequest(
-                    cmdRes.cursorId(),
-                    batchNum++,
-                    JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF),
+                        cmdRes.cursorId(),
+                        batchNum++,
+                        JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF),
                     this);
             }
         }
         catch (Exception e) {
+            if (e instanceof SQLTimeoutException)
+                throw (SQLTimeoutException)e;
+
             try {
                 conn.sendRequest(new JdbcBulkLoadBatchRequest(
-                    cmdRes.cursorId(),
-                    batchNum,
-                    JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR),
+                        cmdRes.cursorId(),
+                        batchNum,
+                        JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR),
                     this);
             }
             catch (SQLException e1) {
@@ -466,6 +485,8 @@ public class JdbcThinStatement implements Statement {
             throw new SQLException("Invalid timeout value.");
 
         this.timeout = timeout * 1000;
+
+        reqTimeout = this.timeout;
     }
 
     /** {@inheritDoc} */
@@ -943,7 +964,7 @@ public class JdbcThinStatement implements Statement {
     }
 
     /**
-     * @param currReqId Sets curresnt request Id.
+     * @param currReqId Sets current request Id.
      */
     void currentRequestId(long currReqId) {
         synchronized (cancellationMux) {
@@ -952,6 +973,15 @@ public class JdbcThinStatement implements Statement {
     }
 
     /**
+     * @return Current request Id.
+     */
+    long currentRequestId() {
+        synchronized (cancellationMux) {
+            return currReqId;
+        }
+    }
+
+    /**
      * @return Cancellation mutex.
      */
     Object cancellationMutex() {
@@ -964,4 +994,11 @@ public class JdbcThinStatement implements Statement {
     private boolean isQueryCancellationSupported() {
         return conn.isQueryCancellationSupported();
     }
+
+    /**
+     * @return Request timeout.
+     */
+    int requestTimeout() {
+        return reqTimeout;
+    }
 }