IGNITE-10827: SQL: automatically close a cursor when iterator end is reached. This...
authorYuriy Gerzhedovich <ygerzhedovich@gridgain.com>
Mon, 14 Jan 2019 14:54:02 +0000 (17:54 +0300)
committerdevozerov <vozerov@gridgain.com>
Mon, 14 Jan 2019 14:54:02 +0000 (17:54 +0300)
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/AutoClosableCursorIterator.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/AutoClosableCursorIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/AutoClosableCursorIterator.java
new file mode 100644 (file)
index 0000000..4e94ebd
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Iterator;
+import org.apache.ignite.cache.query.QueryCursor;
+
+/**
+ * Implementation of iterator wrapper to close cursor when all data has been read from iterator.
+ *
+ * @param <T> The type of elements returned by this iterator.
+ */
+class AutoClosableCursorIterator<T> implements Iterator<T> {
+    /** Cursor. */
+    private final QueryCursor cursor;
+
+    /** Iterator. */
+    private final Iterator<T> iter;
+
+    /**
+     * Constructor.
+     *
+     * @param cursor Query cursor.
+     * @param iter Wrapped iterator.
+     */
+    public AutoClosableCursorIterator(QueryCursor cursor, Iterator<T> iter) {
+        this.cursor = cursor;
+        this.iter = iter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() {
+        boolean hasNext = iter.hasNext();
+
+        if (!hasNext)
+            cursor.close();
+
+        return hasNext;
+    }
+
+    /** {@inheritDoc} */
+    @Override public T next() {
+        return iter.next();
+    }
+}
index ccd98da..a17a155 100644 (file)
@@ -89,6 +89,13 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T>, FieldsQueryCursor<T
 
     /** {@inheritDoc} */
     @Override public Iterator<T> iterator() {
+        return new AutoClosableCursorIterator<>(this, iter());
+    }
+
+    /**
+     * @return An simple iterator.
+     */
+    private Iterator<T> iter() {
         if (!STATE_UPDATER.compareAndSet(this, IDLE, EXECUTION))
             throw new IgniteException("Iterator is already fetched or query was cancelled.");
 
@@ -111,8 +118,10 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T>, FieldsQueryCursor<T
         List<T> all = new ArrayList<>();
 
         try {
-            for (T t : this) // Implicitly calls iterator() to do all checks.
-                all.add(t);
+            Iterator<T> iter = iter(); // Implicitly calls iterator() to do all checks.
+
+            while (iter.hasNext())
+                all.add(iter.next());
         }
         finally {
             close();
@@ -124,8 +133,10 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T>, FieldsQueryCursor<T
     /** {@inheritDoc} */
     @Override public void getAll(QueryCursorEx.Consumer<T> clo) throws IgniteCheckedException {
         try {
-            for (T t : this)
-                clo.consume(t);
+            Iterator<T> iter = iter(); // Implicitly calls iterator() to do all checks.
+
+            while (iter.hasNext())
+                clo.consume(iter.next());
         }
         finally {
             close();
index f1f02a3..a5d8e2c 100644 (file)
@@ -220,6 +220,30 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest {
     }
 
     /**
+     * Check auto clenup running queries on fully readed iterator.
+     *
+     * @throws Exception Exception in case of failure.
+     */
+    @Test
+    public void testAutoCloseQueryAfterIteratorIsExhausted(){
+        IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, i);
+
+        FieldsQueryCursor<List<?>> query = cache.query(new SqlFieldsQuery("SELECT * FROM Integer order by _key"));
+
+        query.iterator().forEachRemaining((e) -> {
+            Assert.assertEquals("Should be one running query",
+                1,
+                ignite.context().query().runningQueries(-1).size());
+        });
+
+        assertNoRunningQueries();
+
+    }
+
+    /**
      * Check tracking running queries for Select.
      *
      * @throws Exception Exception in case of failure.