IGNITE-10580: SQL: Fixed incorrect re-use of cached connection for local queries...
authortledkov-gridgain <tledkov@gridgain.com>
Tue, 25 Dec 2018 14:25:49 +0000 (17:25 +0300)
committerdevozerov <vozerov@gridgain.com>
Tue, 25 Dec 2018 14:25:49 +0000 (17:25 +0300)
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlLocalQueryConnectionAndStatementTest.java [new file with mode: 0644]
modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java

index e9f293c..ef99a4b 100644 (file)
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import com.sun.org.apache.xml.internal.utils.ObjectPool;
 import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,17 +35,23 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> {
     /** */
     private transient MvccQueryTracker mvccTracker;
 
+    /** Detached connection. */
+    private final ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachedConn;
+
     /**
      * @param data Data.
      * @param mvccTracker Mvcc tracker.
      * @param forUpdate {@code SELECT FOR UPDATE} flag.
+     * @param detachedConn Detached connection.
      * @throws IgniteCheckedException If failed.
      */
-    public H2FieldsIterator(ResultSet data, MvccQueryTracker mvccTracker, boolean forUpdate)
+    public H2FieldsIterator(ResultSet data, MvccQueryTracker mvccTracker, boolean forUpdate,
+        ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachedConn)
         throws IgniteCheckedException {
         super(data, forUpdate);
 
         this.mvccTracker = mvccTracker;
+        this.detachedConn = detachedConn;
     }
 
     /** {@inheritDoc} */
@@ -62,6 +69,9 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> {
             super.onClose();
         }
         finally {
+            if (detachedConn != null)
+                detachedConn.recycle();
+
             if (mvccTracker != null)
                 mvccTracker.onDone();
         }
index 9a2ff90..d1b435d 100644 (file)
@@ -635,6 +635,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                     GridH2QueryContext.set(ctx);
 
+                    ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachedConn = connMgr.detachThreadConnection();
+
                     try {
                         ResultSet rs = executeSqlQueryWithTimer(stmt0, conn, qry0, params, timeout0, cancel);
 
@@ -657,10 +659,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                             enlistFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>() {
                                 @Override public void apply(IgniteInternalFuture<Long> fut) {
-                                    if (fut.error() != null)
-                                        sfuFut0.onResult(IgniteH2Indexing.this.ctx.localNodeId(), 0L, false, fut.error());
-                                    else
-                                        sfuFut0.onResult(IgniteH2Indexing.this.ctx.localNodeId(), fut.result(), false, null);
+                                    if (fut.error() != null) {
+                                        sfuFut0.onResult(
+                                            IgniteH2Indexing.this.ctx.localNodeId(),
+                                            0L,
+                                            false,
+                                            fut.error());
+                                    }
+                                    else {
+                                        sfuFut0.onResult(
+                                            IgniteH2Indexing.this.ctx.localNodeId(),
+                                            fut.result(),
+                                            false,
+                                            null);
+                                    }
                                 }
                             });
 
@@ -679,9 +691,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                             }
                         }
 
-                        return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 != null);
+                        return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 != null,
+                            detachedConn);
                     }
                     catch (IgniteCheckedException | RuntimeException | Error e) {
+                        detachedConn.recycle();
+
                         try {
                             if (mvccTracker0 != null)
                                 mvccTracker0.onDone();
index 7009bd5..d90331c 100644 (file)
@@ -760,7 +760,7 @@ public class GridReduceQueryExecutor {
                                 timeoutMillis,
                                 cancel);
 
-                            resIter = new H2FieldsIterator(res, mvccTracker, false);
+                            resIter = new H2FieldsIterator(res, mvccTracker, false, null);
 
                             mvccTracker = null; // To prevent callback inside finally block;
                         }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlLocalQueryConnectionAndStatementTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlLocalQueryConnectionAndStatementTest.java
new file mode 100644 (file)
index 0000000..bbff841
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test for statement reuse.
+ */
+@RunWith(JUnit4.class)
+public class SqlLocalQueryConnectionAndStatementTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override public void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     */
+    @Test
+    public void testReplicated() {
+        sql("CREATE TABLE repl_tbl (id LONG PRIMARY KEY, val LONG) WITH \"template=replicated\"").getAll();
+
+        for (int i = 0; i < 10; i++)
+            sql("insert into repl_tbl(id,val) VALUES(" + i + "," + i + ")").getAll();
+
+        Iterator<List<?>> it0 = sql(new SqlFieldsQuery("SELECT * FROM repl_tbl where id > ?").setArgs(1)).iterator();
+
+        it0.next();
+
+        sql(new SqlFieldsQuery("SELECT * FROM repl_tbl where id > ?").setArgs(1)).getAll();
+
+        it0.next();
+    }
+
+    /**
+     */
+    @Test
+    public void testLocalQuery() {
+        sql("CREATE TABLE tbl (id LONG PRIMARY KEY, val LONG)").getAll();
+
+        for (int i = 0; i < 10; i++)
+            sql("insert into tbl(id,val) VALUES(" + i + "," + i + ")").getAll();
+
+        Iterator<List<?>> it0 = sql(
+            new SqlFieldsQuery("SELECT * FROM tbl where id > ?")
+                .setArgs(1)
+                .setLocal(true))
+            .iterator();
+
+        it0.next();
+
+        sql(new SqlFieldsQuery("SELECT * FROM tbl where id > ?").setArgs(1).setLocal(true)).getAll();
+
+        it0.next();
+    }
+
+    /**
+     * @param sql SQL query.
+     * @return Results.
+     */
+    private FieldsQueryCursor<List<?>> sql(String sql) {
+        return sql(new SqlFieldsQuery(sql));
+    }
+
+    /**
+     * @param qry SQL query.
+     * @return Results.
+     */
+    private FieldsQueryCursor<List<?>> sql(SqlFieldsQuery qry) {
+        GridQueryProcessor qryProc = grid(0).context().query();
+
+        return qryProc.querySqlFields(qry, true);
+    }
+}
index e4c918e..8273e9e 100644 (file)
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.query.IgniteCacheGroupsCompareQuery
 import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlDistributedJoinSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexMultiNodeSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexSelfTest;
+import org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest;
 import org.apache.ignite.internal.processors.query.h2.CacheQueryEntityWithDateTimeApiFieldsTest;
 import org.apache.ignite.internal.processors.query.h2.twostep.CacheQueryMemoryLeakTest;
 import org.apache.ignite.internal.processors.query.h2.twostep.CreateTableWithDateKeySelfTest;
@@ -134,6 +135,8 @@ public class IgniteBinaryCacheQueryTestSuite2 {
 
         suite.addTest(new JUnit4TestAdapter(IgniteCacheQueriesLoadTest1.class));
 
+        suite.addTest(new JUnit4TestAdapter(SqlLocalQueryConnectionAndStatementTest.class));
+
         return suite;
     }
 }