IGNITE-3303 Apache Flink Integration Fixes #5020
authorsamaitra <saikat.maitra@gmail.com>
Tue, 25 Dec 2018 16:45:30 +0000 (19:45 +0300)
committerDmitriy Pavlov <dpavlov@apache.org>
Tue, 25 Dec 2018 16:45:30 +0000 (19:45 +0300)
modules/flink/pom.xml
modules/flink/src/main/java/org/apache/ignite/source/flink/IgniteSource.java [new file with mode: 0644]
modules/flink/src/main/java/org/apache/ignite/source/flink/TaskRemoteFilter.java [new file with mode: 0644]
modules/flink/src/main/java/org/apache/ignite/source/flink/package-info.java [new file with mode: 0644]
modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTest.java [new file with mode: 0644]
modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTestSuite.java [new file with mode: 0644]
parent/pom.xml

index 1082e7b..1597c2a 100644 (file)
@@ -35,7 +35,8 @@
     <url>http://ignite.apache.org</url>
 
     <properties>
-        <flink.version>1.3.0</flink.version>
+        <flink.version>1.5.0</flink.version>
+        <kryo-serializers.version>0.42</kryo-serializers.version>
     </properties>
 
     <dependencies>
@@ -91,7 +92,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.10</artifactId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
             <version>${flink.version}</version>
             <exclusions>
                 <exclusion>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_2.10</artifactId>
+            <artifactId>flink-clients_2.11</artifactId>
             <version>${flink.version}</version>
             <exclusions>
                 <exclusion>
         </dependency>
 
         <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-core</artifactId>
             <version>${project.version}</version>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/modules/flink/src/main/java/org/apache/ignite/source/flink/IgniteSource.java b/modules/flink/src/main/java/org/apache/ignite/source/flink/IgniteSource.java
new file mode 100644 (file)
index 0000000..2dd670a
--- /dev/null
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.source.flink;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apache Flink Ignite source implemented as a RichParallelSourceFunction.
+ */
+public class IgniteSource extends RichParallelSourceFunction<CacheEvent> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 1L;
+
+    /** Logger. */
+    private static final Logger log = LoggerFactory.getLogger(IgniteSource.class);
+
+    /** Default max number of events taken from the buffer at once. */
+    private static final int DFLT_EVT_BATCH_SIZE = 1;
+
+    /** Default number of milliseconds timeout for event buffer queue operation. */
+    private static final int DFLT_EVT_BUFFER_TIMEOUT = 10;
+
+    /** Event buffer. */
+    private BlockingQueue<CacheEvent> evtBuf = new LinkedBlockingQueue<>();
+
+    /** Remote Listener id. */
+    private UUID rmtLsnrId;
+
+    /** Flag for isRunning state. */
+    private volatile boolean isRunning;
+
+    /** Max number of events taken from the buffer at once. */
+    private int evtBatchSize = DFLT_EVT_BATCH_SIZE;
+
+    /** Number of milliseconds timeout for event buffer queue operation. */
+    private int evtBufTimeout = DFLT_EVT_BUFFER_TIMEOUT;
+
+    /** Local listener. */
+    private final TaskLocalListener locLsnr = new TaskLocalListener();
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private transient Ignite ignite;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /**
+     * Sets Ignite instance.
+     *
+     * @param ignite Ignite instance.
+     */
+    public void setIgnite(Ignite ignite) {
+        this.ignite = ignite;
+    }
+
+    /**
+     * Sets Event Batch Size.
+     *
+     * @param evtBatchSize Event Batch Size.
+     */
+    public void setEvtBatchSize(int evtBatchSize) {
+        this.evtBatchSize = evtBatchSize;
+    }
+
+    /**
+     * Sets Event Buffer timeout.
+     *
+     * @param evtBufTimeout Event Buffer timeout.
+     */
+    public void setEvtBufTimeout(int evtBufTimeout) {
+        this.evtBufTimeout = evtBufTimeout;
+    }
+
+    /**
+     * @return Local Task Listener
+     */
+    TaskLocalListener getLocLsnr() {
+        return locLsnr;
+    }
+
+    /**
+     * Default IgniteSource constructor.
+     *
+     * @param cacheName Cache name.
+     */
+    public IgniteSource(String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    /**
+     * Starts Ignite source.
+     *
+     * @param filter User defined filter.
+     * @param cacheEvts Converts comma-delimited cache events strings to Ignite internal representation.
+     */
+    @SuppressWarnings("unchecked")
+    public void start(IgnitePredicate<CacheEvent> filter, int... cacheEvts) {
+        A.notNull(cacheName, "Cache name");
+
+        TaskRemoteFilter rmtLsnr = new TaskRemoteFilter(cacheName, filter);
+
+        try {
+            synchronized (this) {
+                if (isRunning)
+                    return;
+
+                isRunning = true;
+
+                rmtLsnrId = ignite.events(ignite.cluster().forCacheNodes(cacheName))
+                    .remoteListen(locLsnr, rmtLsnr, cacheEvts);
+            }
+        }
+        catch (IgniteException e) {
+            log.error("Failed to register event listener!", e);
+
+            throw e;
+        }
+    }
+
+    /**
+     * Transfers data from grid.
+     *
+     * @param ctx SourceContext.
+     */
+    @Override public void run(SourceContext<CacheEvent> ctx) {
+        List<CacheEvent> evts = new ArrayList<>(evtBatchSize);
+
+        try {
+            while (isRunning) {
+                // block here for some time if there is no events from source
+                CacheEvent firstEvt = evtBuf.poll(1, TimeUnit.SECONDS);
+
+                if (firstEvt != null)
+                    evts.add(firstEvt);
+
+                if (evtBuf.drainTo(evts, evtBatchSize) > 0) {
+                    synchronized (ctx.getCheckpointLock()) {
+                        for (CacheEvent evt : evts)
+                            ctx.collect(evt);
+
+                        evts.clear();
+                    }
+                }
+            }
+        }
+        catch (Exception e) {
+            if (X.hasCause(e, InterruptedException.class))
+                return; // Executing thread can be interrupted see cancel() javadoc.
+
+            log.error("Error while processing cache event of " + cacheName, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        synchronized (this) {
+            if (!isRunning)
+                return;
+
+            isRunning = false;
+
+            if (rmtLsnrId != null && ignite != null) {
+                ignite.events(ignite.cluster().forCacheNodes(cacheName))
+                    .stopRemoteListen(rmtLsnrId);
+
+                rmtLsnrId = null;
+            }
+        }
+    }
+
+    /**
+     * Local listener buffering cache events to be further sent to Flink.
+     */
+    private class TaskLocalListener implements IgniteBiPredicate<UUID, CacheEvent> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID id, CacheEvent evt) {
+            try {
+                if (!evtBuf.offer(evt, evtBufTimeout, TimeUnit.MILLISECONDS))
+                    log.error("Failed to buffer event {}", evt.name());
+            }
+            catch (InterruptedException ignored) {
+                log.error("Failed to buffer event using local task listener {}", evt.name());
+
+                Thread.currentThread().interrupt(); // Restore interrupt flag.
+            }
+
+            return true;
+        }
+    }
+}
+
diff --git a/modules/flink/src/main/java/org/apache/ignite/source/flink/TaskRemoteFilter.java b/modules/flink/src/main/java/org/apache/ignite/source/flink/TaskRemoteFilter.java
new file mode 100644 (file)
index 0000000..4c89d25
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.source.flink;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Remote filter.
+ */
+public class TaskRemoteFilter implements IgnitePredicate<CacheEvent> {
+    /** Serial version Id. */
+    private static final long serialVersionUID = 1L;
+
+    /** Ignite Instance Resource. */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** User-defined filter. */
+    private final IgnitePredicate<CacheEvent> filter;
+
+    /**
+     * @param cacheName Cache name.
+     * @param filter IgnitePredicate.
+     */
+    TaskRemoteFilter(String cacheName, IgnitePredicate<CacheEvent> filter) {
+        this.cacheName = cacheName;
+        this.filter = filter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(CacheEvent evt) {
+        Affinity<Object> affinity = ignite.affinity(cacheName);
+
+        // Process this event. Ignored on backups.
+        return affinity.isPrimary(ignite.cluster().localNode(), evt.key()) &&
+                (filter == null || filter.apply(evt));
+    }
+}
diff --git a/modules/flink/src/main/java/org/apache/ignite/source/flink/package-info.java b/modules/flink/src/main/java/org/apache/ignite/source/flink/package-info.java
new file mode 100644 (file)
index 0000000..adc33fc
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * IgniteSource -- source connector integration with Apache Flink.
+ */
+package org.apache.ignite.source.flink;
diff --git a/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTest.java b/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTest.java
new file mode 100644 (file)
index 0000000..95a98dc
--- /dev/null
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.source.flink;
+
+import java.util.UUID;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCluster;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link IgniteSource}.
+ */
+@RunWith(JUnit4.class)
+public class FlinkIgniteSourceSelfTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String TEST_CACHE = "testCache";
+
+    /** Flink source context. */
+    private SourceFunction.SourceContext<CacheEvent> ctx;
+
+    /** Ignite instance. */
+    private Ignite ignite;
+
+    /** Cluster Group */
+    private ClusterGroup clsGrp;
+
+    /** Ignite Source instance */
+    private IgniteSource igniteSrc;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUpTest() throws Exception {
+        ctx = mock(SourceFunction.SourceContext.class);
+        ignite = mock(Ignite.class);
+        clsGrp = mock(ClusterGroup.class);
+
+        IgniteEvents igniteEvts = mock(IgniteEvents.class);
+        IgniteCluster igniteCluster = mock(IgniteCluster.class);
+        TaskRemoteFilter taskRemoteFilter = mock(TaskRemoteFilter.class);
+
+        when(ctx.getCheckpointLock()).thenReturn(new Object());
+        when(ignite.events(clsGrp)).thenReturn(igniteEvts);
+        when(ignite.cluster()).thenReturn(igniteCluster);
+
+        igniteSrc = new IgniteSource(TEST_CACHE);
+        igniteSrc.setIgnite(ignite);
+        igniteSrc.setEvtBatchSize(1);
+        igniteSrc.setEvtBufTimeout(1);
+        igniteSrc.setRuntimeContext(createRuntimeContext());
+
+        IgniteBiPredicate locLsnr = igniteSrc.getLocLsnr();
+
+        when(igniteEvts.remoteListen(locLsnr, taskRemoteFilter, EventType.EVT_CACHE_OBJECT_PUT ))
+            .thenReturn(UUID.randomUUID());
+
+        when(igniteCluster.forCacheNodes(TEST_CACHE)).thenReturn(clsGrp);
+    }
+
+    /**  */
+    @After
+    public void tearDownTest() {
+        igniteSrc.cancel();
+    }
+
+    /** Creates streaming runtime context */
+    private RuntimeContext createRuntimeContext() {
+        StreamingRuntimeContext runtimeCtx = mock(StreamingRuntimeContext.class);
+
+        when(runtimeCtx.isCheckpointingEnabled()).thenReturn(true);
+
+        return runtimeCtx;
+    }
+
+    /**
+     * Tests Ignite source start operation.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testIgniteSourceStart() throws Exception {
+        igniteSrc.start(null, EventType.EVT_CACHE_OBJECT_PUT);
+
+        verify(ignite.events(clsGrp), times(1));
+    }
+
+    /**
+     * Tests Ignite source run operation.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testIgniteSourceRun() throws Exception {
+        IgniteInternalFuture f = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    igniteSrc.start(null, EventType.EVT_CACHE_OBJECT_PUT);
+
+                    igniteSrc.run(ctx);
+                }
+                catch (Throwable e) {
+                    igniteSrc.cancel();
+
+                   throw new AssertionError("Unexpected failure.", e);
+                }
+            }
+        });
+
+        long endTime = System.currentTimeMillis() + 2000;
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return f.isDone() || System.currentTimeMillis() > endTime;
+            }
+        }, 3000);
+
+        igniteSrc.cancel();
+
+        f.get(3000);
+    }
+}
diff --git a/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTestSuite.java b/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTestSuite.java
new file mode 100644 (file)
index 0000000..3c144d9
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.source.flink;
+
+import junit.framework.JUnit4TestAdapter;
+import junit.framework.TestSuite;
+import org.junit.runner.RunWith;
+import org.junit.runners.AllTests;
+
+/**
+ * Apache Flink source tests.
+ */
+@RunWith(AllTests.class)
+public class FlinkIgniteSourceSelfTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Apache Flink Source Test Suite");
+
+        suite.addTest(new JUnit4TestAdapter(FlinkIgniteSourceSelfTest.class));
+
+        return suite;
+    }
+}
+
index 841a45b..00e5634 100644 (file)
                                 <packages>org.apache.ignite.osgi*</packages>
                             </group>
                             <group>
-                                <title>Flink integration</title>
+                                <title>Flink Sink Integration</title>
                                 <packages>org.apache.ignite.sink.flink*</packages>
                             </group>
                             <group>
+                                <title>Flink Source Integration</title>
+                                <packages>org.apache.ignite.source.flink*</packages>
+                            </group>
+                            <group>
                                 <title>SpringData integration</title>
                                 <packages>org.apache.ignite.springdata.repository*</packages>
                             </group>