IGNITE-16743 Implement thin client CDC streamer (#169) master
authorNikita Amelchev <nsamelchev@gmail.com>
Thu, 4 Aug 2022 11:39:09 +0000 (14:39 +0300)
committerGitHub <noreply@github.com>
Thu, 4 Aug 2022 11:39:09 +0000 (14:39 +0300)
12 files changed:
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java [new file with mode: 0644]
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java [new file with mode: 0644]
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java [deleted file]
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java [new file with mode: 0644]
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java [new file with mode: 0644]
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java [new file with mode: 0644]
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java [new file with mode: 0644]

diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
new file mode 100644 (file)
index 0000000..ffef911
--- /dev/null
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.F;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID;
+
+/**
+ * Contains logic to process {@link CdcEvent} and apply them to the cluster.
+ */
+public abstract class AbstractCdcEventsApplier<K, V> {
+    /** Maximum batch size. */
+    private final int maxBatchSize;
+
+    /** Update batch. */
+    private final Map<K, V> updBatch = new HashMap<>();
+
+    /** Remove batch. */
+    private final Map<K, GridCacheVersion> rmvBatch = new HashMap<>();
+
+    /** */
+    private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch);
+
+    /** */
+    private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);
+
+    /** */
+    private final IgniteLogger log;
+
+    /**
+     * @param maxBatchSize Maximum batch size.
+     * @param log Logger.
+     */
+    public AbstractCdcEventsApplier(int maxBatchSize, IgniteLogger log) {
+        this.maxBatchSize = maxBatchSize;
+        this.log = log.getLogger(getClass());
+    }
+
+    /**
+     * @param evts Events to process.
+     * @return Number of applied events.
+     * @throws IgniteCheckedException If failed.
+     */
+    public int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
+        int currCacheId = UNDEFINED_CACHE_ID;
+        int evtsApplied = 0;
+
+        for (CdcEvent evt : evts) {
+            if (log.isDebugEnabled())
+                log.debug("Event received [evt=" + evt + ']');
+
+            int cacheId = evt.cacheId();
+
+            if (cacheId != currCacheId) {
+                evtsApplied += applyIf(currCacheId, hasUpdates, hasRemoves);
+
+                currCacheId = cacheId;
+            }
+
+            CacheEntryVersion order = evt.version();
+            K key = toKey(evt);
+            GridCacheVersion ver = new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId());
+
+            if (evt.value() != null) {
+                evtsApplied += applyIf(currCacheId, () -> isApplyBatch(updBatch, key), hasRemoves);
+
+                updBatch.put(key, toValue(currCacheId, evt.value(), ver));
+            }
+            else {
+                evtsApplied += applyIf(currCacheId, hasUpdates, () -> isApplyBatch(rmvBatch, key));
+
+                rmvBatch.put(key, ver);
+            }
+        }
+
+        if (currCacheId != UNDEFINED_CACHE_ID)
+            evtsApplied += applyIf(currCacheId, hasUpdates, hasRemoves);
+
+        return evtsApplied;
+    }
+
+    /**
+     * Applies data from {@link #updBatch} or {@link #rmvBatch} to Ignite if required.
+     *
+     * @param cacheId Current cache ID.
+     * @param applyUpd Apply update batch flag supplier.
+     * @param applyRmv Apply remove batch flag supplier.
+     * @return Number of applied events.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private int applyIf(
+        int cacheId,
+        BooleanSupplier applyUpd,
+        BooleanSupplier applyRmv
+    ) throws IgniteCheckedException {
+        int evtsApplied = 0;
+
+        if (applyUpd.getAsBoolean()) {
+            if (log.isDebugEnabled())
+                log.debug("Applying put batch [cacheId=" + cacheId + ']');
+
+            putAllConflict(cacheId, updBatch);
+
+            evtsApplied += updBatch.size();
+
+            updBatch.clear();
+        }
+
+        if (applyRmv.getAsBoolean()) {
+            if (log.isDebugEnabled())
+                log.debug("Applying remove batch [cacheId=" + cacheId + ']');
+
+            removeAllConflict(cacheId, rmvBatch);
+
+            evtsApplied += rmvBatch.size();
+
+            rmvBatch.clear();
+        }
+
+        return evtsApplied;
+    }
+
+    /** @return {@code True} if update batch should be applied. */
+    private boolean isApplyBatch(Map<K, ?> map, K key) {
+        return map.size() >= maxBatchSize || map.containsKey(key);
+    }
+
+    /** @return Key. */
+    protected abstract K toKey(CdcEvent evt);
+
+    /** @return Value. */
+    protected abstract V toValue(int cacheId, Object val, GridCacheVersion ver);
+
+    /** Stores DR data. */
+    protected abstract void putAllConflict(int cacheId, Map<K, V> drMap);
+
+    /** Removes DR data. */
+    protected abstract void removeAllConflict(int cacheId, Map<K, GridCacheVersion> drMap);
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java
new file mode 100644 (file)
index 0000000..bbe184e
--- /dev/null
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.binary.BinaryTypeImpl;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.resources.LoggerResource;
+
+import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;
+
+/**
+ * Change Data Consumer that streams all data changes to destination cluster by the provided {@link #applier}.
+ *
+ * @see AbstractCdcEventsApplier
+ */
+public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
+    /** */
+    public static final String EVTS_CNT = "EventsCount";
+
+    /** */
+    public static final String TYPES_CNT = "TypesCount";
+
+    /** */
+    public static final String MAPPINGS_CNT = "MappingsCount";
+
+    /** */
+    public static final String EVTS_CNT_DESC = "Count of messages applied to destination cluster";
+
+    /** */
+    public static final String TYPES_CNT_DESC = "Count of received binary types events";
+
+    /** */
+    public static final String MAPPINGS_CNT_DESC = "Count of received mappings events";
+
+    /** */
+    public static final String LAST_EVT_TIME = "LastEventTime";
+
+    /** */
+    public static final String LAST_EVT_TIME_DESC = "Timestamp of last applied event";
+
+    /** Handle only primary entry flag. */
+    private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
+
+    /** Cache names. */
+    private Set<String> caches;
+
+    /** Cache IDs. */
+    protected Set<Integer> cachesIds;
+
+    /** Maximum batch size. */
+    protected int maxBatchSize;
+
+    /** Events applier. */
+    protected AbstractCdcEventsApplier<?, ?> applier;
+
+    /** Timestamp of last sent message. */
+    protected AtomicLongMetric lastEvtTs;
+
+    /** Count of events applied to destination cluster. */
+    protected AtomicLongMetric evtsCnt;
+
+    /** Count of binary types applied to destination cluster. */
+    protected AtomicLongMetric typesCnt;
+
+    /** Count of mappings applied to destination cluster. */
+    protected AtomicLongMetric mappingsCnt;
+
+    /** Logger. */
+    @LoggerResource
+    protected IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override public void start(MetricRegistry mreg) {
+        A.notEmpty(caches, "caches");
+
+        cachesIds = caches.stream()
+            .mapToInt(CU::cacheId)
+            .boxed()
+            .collect(Collectors.toSet());
+
+        this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
+        this.typesCnt = mreg.longMetric(TYPES_CNT, TYPES_CNT_DESC);
+        this.mappingsCnt = mreg.longMetric(MAPPINGS_CNT, MAPPINGS_CNT_DESC);
+        this.lastEvtTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onEvents(Iterator<CdcEvent> events) {
+        try {
+            long msgsSnt = applier.apply(() -> F.iterator(
+                events,
+                F.identity(),
+                true,
+                evt -> !onlyPrimary || evt.primary(),
+                evt -> F.isEmpty(cachesIds) || cachesIds.contains(evt.cacheId()),
+                evt -> evt.version().otherClusterVersion() == null));
+
+            if (msgsSnt > 0) {
+                evtsCnt.add(msgsSnt);
+                lastEvtTs.value(System.currentTimeMillis());
+
+                if (log.isInfoEnabled())
+                    log.info("Events applied [evtsApplied=" + evtsCnt.value() + ']');
+            }
+
+            return true;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
+        cacheEvents.forEachRemaining(e -> {
+            // Just skip. Handle of cache events not supported.
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCacheDestroy(Iterator<Integer> caches) {
+        caches.forEachRemaining(e -> {
+            // Just skip. Handle of cache events not supported.
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMappings(Iterator<TypeMapping> mappings) {
+        mappings.forEachRemaining(mapping -> {
+            registerMapping(binaryContext(), log, mapping);
+
+            mappingsCnt.increment();
+        });
+
+        lastEvtTs.value(System.currentTimeMillis());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTypes(Iterator<BinaryType> types) {
+        types.forEachRemaining(t -> {
+            BinaryMetadata meta = ((BinaryTypeImpl)t).metadata();
+
+            registerBinaryMeta(binaryContext(), log, meta);
+
+            typesCnt.increment();
+        });
+
+        lastEvtTs.value(System.currentTimeMillis());
+    }
+
+    /**
+     * Register {@code meta}.
+     *
+     * @param ctx Binary context.
+     * @param log Logger.
+     * @param meta Binary metadata to register.
+     */
+    public static void registerBinaryMeta(BinaryContext ctx, IgniteLogger log, BinaryMetadata meta) {
+        ctx.updateMetadata(meta.typeId(), meta, false);
+
+        if (log.isInfoEnabled())
+            log.info("BinaryMeta [meta=" + meta + ']');
+    }
+
+    /**
+     * Register {@code mapping}.
+     *
+     * @param ctx Binary context.
+     * @param log Logger.
+     * @param mapping Type mapping to register.
+     */
+    public static void registerMapping(BinaryContext ctx, IgniteLogger log, TypeMapping mapping) {
+        assert mapping.platformType().ordinal() <= Byte.MAX_VALUE;
+
+        byte platformType = (byte)mapping.platformType().ordinal();
+
+        ctx.registerUserClassName(mapping.typeId(), mapping.typeName(), false, false, platformType);
+
+        if (log.isInfoEnabled())
+            log.info("Mapping [mapping=" + mapping + ']');
+    }
+
+    /** @return Binary context. */
+    protected abstract BinaryContext binaryContext();
+
+    /**
+     * Sets whether entries only from primary nodes should be handled.
+     *
+     * @param onlyPrimary Whether entries only from primary nodes should be handled.
+     * @return {@code this} for chaining.
+     */
+    public AbstractIgniteCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
+        this.onlyPrimary = onlyPrimary;
+
+        return this;
+    }
+
+    /**
+     * Sets cache names that participate in CDC.
+     *
+     * @param caches Cache names.
+     * @return {@code this} for chaining.
+     */
+    public AbstractIgniteCdcStreamer setCaches(Set<String> caches) {
+        this.caches = caches;
+
+        return this;
+    }
+
+    /**
+     * Sets maximum batch size that will be applied to destination cluster.
+     *
+     * @param maxBatchSize Maximum batch size.
+     * @return {@code this} for chaining.
+     */
+    public AbstractIgniteCdcStreamer setMaxBatchSize(int maxBatchSize) {
+        this.maxBatchSize = maxBatchSize;
+
+        return this;
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
deleted file mode 100644 (file)
index 41fab27..0000000
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cdc;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BooleanSupplier;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.cache.CacheEntryVersion;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.binary.BinaryMetadata;
-import org.apache.ignite.internal.binary.BinaryTypeImpl;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
-import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
-import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED;
-
-/**
- * Contains logic to process {@link CdcEvent} and apply them to the provided by {@link #ignite()} cluster.
- */
-public abstract class CdcEventsApplier {
-    /** Maximum batch size. */
-    protected int maxBatchSize;
-
-    /** Caches. */
-    private final Map<Integer, IgniteInternalCache<BinaryObject, BinaryObject>> ignCaches = new HashMap<>();
-
-    /** Update batch. */
-    private final Map<KeyCacheObject, GridCacheDrInfo> updBatch = new HashMap<>();
-
-    /** Remove batch. */
-    private final Map<KeyCacheObject, GridCacheVersion> rmvBatch = new HashMap<>();
-
-    /** */
-    private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch);
-
-    /** */
-    private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);
-
-    /**
-     * @param maxBatchSize Maximum batch size.
-     */
-    public CdcEventsApplier(int maxBatchSize) {
-        this.maxBatchSize = maxBatchSize;
-    }
-
-    /**
-     * @param evts Events to process.
-     * @return Number of applied events.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
-        IgniteInternalCache<BinaryObject, BinaryObject> currCache = null;
-
-        int evtsApplied = 0;
-
-        for (CdcEvent evt : evts) {
-            if (log().isDebugEnabled())
-                log().debug("Event received [evt=" + evt + ']');
-
-            IgniteInternalCache<BinaryObject, BinaryObject> cache = ignCaches.computeIfAbsent(evt.cacheId(), cacheId -> {
-                for (String cacheName : ignite().cacheNames()) {
-                    if (CU.cacheId(cacheName) == cacheId) {
-                        // IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call.
-                        ignite().cache(cacheName);
-
-                        IgniteInternalCache<Object, Object> cache0 = ignite().cachex(cacheName);
-
-                        assert cache0 != null;
-
-                        return cache0.keepBinary();
-                    }
-                }
-
-                throw new IllegalStateException("Cache with id not found [cacheId=" + cacheId + ']');
-            });
-
-            if (cache != currCache) {
-                evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);
-
-                currCache = cache;
-            }
-
-            CacheEntryVersion order = evt.version();
-
-            KeyCacheObject key;
-
-            if (evt.key() instanceof KeyCacheObject)
-                key = (KeyCacheObject)evt.key();
-            else
-                key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());
-
-            if (evt.value() != null) {
-                evtsApplied += applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
-
-                CacheObject val;
-
-                if (evt.value() instanceof CacheObject)
-                    val = (CacheObject)evt.value();
-                else
-                    val = new CacheObjectImpl(evt.value(), null);
-
-                GridCacheVersion ver = new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId());
-
-                GridCacheDrInfo drVal = currCache.configuration().getExpiryPolicyFactory() != null ?
-                    new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED, EXPIRE_TIME_CALCULATE)
-                    : new GridCacheDrInfo(val, ver);
-
-                updBatch.put(key, drVal);
-            }
-            else {
-                evtsApplied += applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));
-
-                rmvBatch.put(key,
-                    new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId()));
-            }
-        }
-
-        if (currCache != null)
-            evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);
-
-        return evtsApplied;
-    }
-
-    /**
-     * Applies data from {@link #updBatch} or {@link #rmvBatch} to Ignite if required.
-     *
-     * @param cache Current cache.
-     * @param applyUpd Apply update batch flag supplier.
-     * @param applyRmv Apply remove batch flag supplier.
-     * @return Number of applied events.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private int applyIf(
-        IgniteInternalCache<BinaryObject, BinaryObject> cache,
-        BooleanSupplier applyUpd,
-        BooleanSupplier applyRmv
-    ) throws IgniteCheckedException {
-        int evtsApplied = 0;
-
-        if (applyUpd.getAsBoolean()) {
-            if (log().isDebugEnabled())
-                log().debug("Applying put batch [cache=" + cache.name() + ']');
-
-            cache.putAllConflict(updBatch);
-
-            evtsApplied += updBatch.size();
-
-            updBatch.clear();
-        }
-
-        if (applyRmv.getAsBoolean()) {
-            if (log().isDebugEnabled())
-                log().debug("Applying remove batch [cache=" + cache.name() + ']');
-
-            cache.removeAllConflict(rmvBatch);
-
-            evtsApplied += rmvBatch.size();
-
-            rmvBatch.clear();
-        }
-
-        return evtsApplied;
-    }
-
-    /** @return {@code True} if update batch should be applied. */
-    private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject key) {
-        return map.size() >= maxBatchSize || map.containsKey(key);
-    }
-
-    /**
-     * Register {@code meta} inside {@code ign} instance.
-     *
-     * @param ign Ignite instance.
-     * @param log Logger.
-     * @param meta Binary metadata to register.
-     */
-    public static void registerBinaryMeta(IgniteEx ign, IgniteLogger log, BinaryMetadata meta) {
-        ign.context().cacheObjects().addMeta(
-            meta.typeId(),
-            new BinaryTypeImpl(
-                ((CacheObjectBinaryProcessorImpl)ign.context().cacheObjects()).binaryContext(),
-                meta
-            ),
-            false
-        );
-
-        if (log.isInfoEnabled())
-            log.info("BinaryMeta[meta=" + meta + ']');
-    }
-
-    /**
-     * Register {@code mapping} inside {@code ign} instance.
-     *
-     * @param ign Ignite instance.
-     * @param log Logger.
-     * @param mapping Type mapping to register.
-     */
-    public static void registerMapping(IgniteEx ign, IgniteLogger log, TypeMapping mapping) {
-        assert mapping.platformType().ordinal() <= Byte.MAX_VALUE;
-
-        try {
-            ign.context().marshallerContext().registerClassName(
-                (byte)mapping.platformType().ordinal(),
-                mapping.typeId(),
-                mapping.typeName(),
-                false
-            );
-
-            if (log.isInfoEnabled())
-                log.info("Mapping[mapping=" + mapping + ']');
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** @return Ignite instance. */
-    protected abstract IgniteEx ignite();
-
-    /** @return Logger. */
-    protected abstract IgniteLogger log();
-}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
new file mode 100644 (file)
index 0000000..bce50d3
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED;
+
+/**
+ * Contains logic to process {@link CdcEvent} and apply them to the destination cluster.
+ *
+ * @see IgniteInternalCache#putAllConflict(Map)
+ * @see IgniteInternalCache#removeAllConflict(Map)
+ */
+public class CdcEventsIgniteApplier extends AbstractCdcEventsApplier<KeyCacheObject, GridCacheDrInfo> {
+    /** Destination cluster. */
+    private final IgniteEx ignite;
+
+    /** Caches. */
+    private final IntMap<IgniteInternalCache<BinaryObject, BinaryObject>> ignCaches = new IntHashMap<>();
+
+    /**
+     * @param ignite Destination cluster.
+     * @param maxBatchSize Maximum batch size.
+     * @param log Logger.
+     */
+    public CdcEventsIgniteApplier(IgniteEx ignite, int maxBatchSize, IgniteLogger log) {
+        super(maxBatchSize, log);
+
+        this.ignite = ignite;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void putAllConflict(int cacheId, Map<KeyCacheObject, GridCacheDrInfo> drMap) {
+        try {
+            cache(cacheId).putAllConflict(drMap);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void removeAllConflict(int cacheId, Map<KeyCacheObject, GridCacheVersion> drMap) {
+        try {
+            cache(cacheId).removeAllConflict(drMap);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected KeyCacheObject toKey(CdcEvent evt) {
+        Object key = evt.key();
+
+        if (key instanceof KeyCacheObject)
+            return (KeyCacheObject)key;
+        else
+            return new KeyCacheObjectImpl(key, null, evt.partition());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDrInfo toValue(int cacheId, Object val, GridCacheVersion ver) {
+        CacheObject cacheObj;
+
+        if (val instanceof CacheObject)
+            cacheObj = (CacheObject)val;
+        else
+            cacheObj = new CacheObjectImpl(val, null);
+
+        return cache(cacheId).configuration().getExpiryPolicyFactory() != null ?
+            new GridCacheDrExpirationInfo(cacheObj, ver, TTL_NOT_CHANGED, EXPIRE_TIME_CALCULATE) :
+            new GridCacheDrInfo(cacheObj, ver);
+    }
+
+    /** @return Cache. */
+    private IgniteInternalCache<BinaryObject, BinaryObject> cache(int cacheId) {
+        return ignCaches.computeIfAbsent(cacheId, id -> {
+            for (String cacheName : ignite.cacheNames()) {
+                if (CU.cacheId(cacheName) == id) {
+                    // IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call.
+                    ignite.cache(cacheName);
+
+                    return ignite.cachex(cacheName).keepBinary();
+                }
+            }
+
+            throw new IllegalStateException("Cache with id not found [cacheId=" + id + ']');
+        });
+    }
+}
index 9db1695c0a5ab15869fec4633ecd879bb44043a3..60e1dfa8046fdb85ea11169107b83a446b46f0d0 100644 (file)
 
 package org.apache.ignite.cdc;
 
 
 package org.apache.ignite.cdc;
 
-import java.util.Iterator;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.Ignition;
-import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
 import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
 import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.binary.BinaryMetadata;
-import org.apache.ignite.internal.binary.BinaryTypeImpl;
+import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
-import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.ignite.lang.IgniteExperimental;
-import org.apache.ignite.resources.LoggerResource;
-
-import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;
-import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
 
 /**
  * Change Data Consumer that streams all data changes to provided {@link #dest} Ignite cluster.
 
 /**
  * Change Data Consumer that streams all data changes to provided {@link #dest} Ignite cluster.
@@ -58,150 +44,30 @@ import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.
  * @see CacheVersionConflictResolverImpl
  */
 @IgniteExperimental
  * @see CacheVersionConflictResolverImpl
  */
 @IgniteExperimental
-public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcConsumer {
-    /** */
-    public static final String EVTS_CNT = "EventsCount";
-
-    /** */
-    public static final String TYPES_CNT = "TypesCount";
-
-    /** */
-    public static final String MAPPINGS_CNT = "MappingsCount";
-
-    /** */
-    public static final String EVTS_CNT_DESC = "Count of messages applied to destination cluster";
-
-    /** */
-    public static final String TYPES_CNT_DESC = "Count of received binary types events";
-
-    /** */
-    public static final String MAPPINGS_CNT_DESC = "Count of received mappings events";
-
-    /** */
-    public static final String LAST_EVT_TIME = "LastEventTime";
-
-    /** */
-    public static final String LAST_EVT_TIME_DESC = "Timestamp of last applied event";
-
+public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer {
     /** Destination cluster client configuration. */
     private IgniteConfiguration destIgniteCfg;
 
     /** Destination cluster client configuration. */
     private IgniteConfiguration destIgniteCfg;
 
-    /** Handle only primary entry flag. */
-    private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
-
     /** Destination Ignite cluster client */
     private IgniteEx dest;
 
     /** Destination Ignite cluster client */
     private IgniteEx dest;
 
-    /** Cache names. */
-    private Set<String> caches;
-
-    /** Cache IDs. */
-    private Set<Integer> cachesIds;
-
-    /** Timestamp of last sent message. */
-    private AtomicLongMetric lastEvtTs;
-
-    /** Count of events applied to destination cluster. */
-    protected AtomicLongMetric evtsCnt;
-
-    /** Count of binary types applied to destination cluster. */
-    protected AtomicLongMetric typesCnt;
-
-    /** Count of mappings applied to destination cluster. */
-    protected AtomicLongMetric mappingsCnt;
-
-    /** Logger. */
-    @LoggerResource
-    private IgniteLogger log;
-
-    /** */
-    public IgniteToIgniteCdcStreamer() {
-        super(DFLT_MAX_BATCH_SIZE);
-    }
-
     /** {@inheritDoc} */
     @Override public void start(MetricRegistry mreg) {
     /** {@inheritDoc} */
     @Override public void start(MetricRegistry mreg) {
-        A.notNull(destIgniteCfg, "Destination ignite configuration");
-        A.notEmpty(caches, "caches");
+        super.start(mreg);
 
         if (log.isInfoEnabled())
             log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
 
 
         if (log.isInfoEnabled())
             log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
 
-        cachesIds = caches.stream()
-            .mapToInt(CU::cacheId)
-            .boxed()
-            .collect(Collectors.toSet());
+        A.notNull(destIgniteCfg, "Destination ignite configuration");
 
         dest = (IgniteEx)Ignition.start(destIgniteCfg);
 
 
         dest = (IgniteEx)Ignition.start(destIgniteCfg);
 
-        this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
-        this.typesCnt = mreg.longMetric(TYPES_CNT, TYPES_CNT_DESC);
-        this.mappingsCnt = mreg.longMetric(MAPPINGS_CNT, MAPPINGS_CNT_DESC);
-        this.lastEvtTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onEvents(Iterator<CdcEvent> evts) {
-        try {
-            long msgsSnt = apply(() -> F.iterator(
-                evts,
-                F.identity(),
-                true,
-                evt -> !onlyPrimary || evt.primary(),
-                evt -> F.isEmpty(cachesIds) || cachesIds.contains(evt.cacheId()),
-                evt -> evt.version().otherClusterVersion() == null));
-
-            if (msgsSnt > 0) {
-                evtsCnt.add(msgsSnt);
-                lastEvtTs.value(System.currentTimeMillis());
-
-                if (log.isInfoEnabled())
-                    log.info("Events applied [evtsApplied=" + evtsCnt.value() + ']');
-            }
-
-            return true;
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onTypes(Iterator<BinaryType> types) {
-        types.forEachRemaining(t -> {
-            BinaryMetadata meta = ((BinaryTypeImpl)t).metadata();
-
-            registerBinaryMeta(dest, log, meta);
-
-            typesCnt.increment();
-        });
-
-        lastEvtTs.value(System.currentTimeMillis());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onMappings(Iterator<TypeMapping> mappings) {
-        mappings.forEachRemaining(m -> {
-            registerMapping(dest, log, m);
-
-            mappingsCnt.increment();
-        });
-
-        lastEvtTs.value(System.currentTimeMillis());
+        applier = new CdcEventsIgniteApplier(dest, maxBatchSize, log);
     }
 
     /** {@inheritDoc} */
     }
 
     /** {@inheritDoc} */
-    @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
-        cacheEvents.forEachRemaining(e -> {
-            // Just skip. Handle of cache events not supported.
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onCacheDestroy(Iterator<Integer> caches) {
-        caches.forEachRemaining(e -> {
-            // Just skip. Handle of cache events not supported.
-        });
+    @Override protected BinaryContext binaryContext() {
+        return ((CacheObjectBinaryProcessorImpl)dest.context().cacheObjects()).binaryContext();
     }
 
     /** {@inheritDoc} */
     }
 
     /** {@inheritDoc} */
@@ -209,16 +75,6 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
         dest.close();
     }
 
         dest.close();
     }
 
-    /** {@inheritDoc} */
-    @Override protected IgniteEx ignite() {
-        return dest;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteLogger log() {
-        return log;
-    }
-
     /**
      * Sets Ignite client node configuration that will connect to destination cluster.
      * @param destIgniteCfg Ignite client node configuration that will connect to destination cluster.
     /**
      * Sets Ignite client node configuration that will connect to destination cluster.
      * @param destIgniteCfg Ignite client node configuration that will connect to destination cluster.
@@ -229,40 +85,4 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
 
         return this;
     }
 
         return this;
     }
-
-    /**
-     * Sets whether entries only from primary nodes should be handled.
-     *
-     * @param onlyPrimary Whether entries only from primary nodes should be handled.
-     * @return {@code this} for chaining.
-     */
-    public IgniteToIgniteCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
-        this.onlyPrimary = onlyPrimary;
-
-        return this;
-    }
-
-    /**
-     * Sets cache names that participate in CDC.
-     *
-     * @param caches Cache names.
-     * @return {@code this} for chaining.
-     */
-    public IgniteToIgniteCdcStreamer setCaches(Set<String> caches) {
-        this.caches = caches;
-
-        return this;
-    }
-
-    /**
-     * Sets maximum batch size that will be applied to destination cluster.
-     *
-     * @param maxBatchSize Maximum batch size.
-     * @return {@code this} for chaining.
-     */
-    public IgniteToIgniteCdcStreamer setMaxBatchSize(int maxBatchSize) {
-        this.maxBatchSize = maxBatchSize;
-
-        return this;
-    }
 }
 }
index acb2aebe7cab22d6d9b547fce257d28ff65e1321..7b7f97ae7bae93e123f915e35f6df2d12f7161e6 100644 (file)
@@ -35,8 +35,9 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryVersion;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.cdc.AbstractCdcEventsApplier;
 import org.apache.ignite.cdc.CdcEvent;
 import org.apache.ignite.cdc.CdcEvent;
-import org.apache.ignite.cdc.CdcEventsApplier;
+import org.apache.ignite.cdc.CdcEventsIgniteApplier;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
@@ -83,7 +84,7 @@ import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.META_UPDATE_M
  * @see CdcEvent
  * @see CacheEntryVersion
  */
  * @see CdcEvent
  * @see CacheEntryVersion
  */
-class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnable, AutoCloseable {
+class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
     /** Ignite instance. */
     private final IgniteEx ign;
 
     /** Ignite instance. */
     private final IgniteEx ign;
 
@@ -120,6 +121,9 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
     /** */
     private final AtomicLong rcvdEvts = new AtomicLong();
 
     /** */
     private final AtomicLong rcvdEvts = new AtomicLong();
 
+    /** */
+    private AbstractCdcEventsApplier applier;
+
     /**
      * @param ign Ignite instance.
      * @param log Logger.
     /**
      * @param ign Ignite instance.
      * @param log Logger.
@@ -146,8 +150,6 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
         KafkaToIgniteMetadataUpdater metaUpdr,
         AtomicBoolean stopped
     ) {
         KafkaToIgniteMetadataUpdater metaUpdr,
         AtomicBoolean stopped
     ) {
-        super(maxBatchSize);
-
         this.ign = ign;
         this.kafkaProps = kafkaProps;
         this.topic = topic;
         this.ign = ign;
         this.kafkaProps = kafkaProps;
         this.topic = topic;
@@ -158,6 +160,8 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
         this.metaUpdr = metaUpdr;
         this.stopped = stopped;
         this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
         this.metaUpdr = metaUpdr;
         this.stopped = stopped;
         this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
+
+        applier = new CdcEventsIgniteApplier(ign, maxBatchSize, log);
     }
 
     /** {@inheritDoc} */
     }
 
     /** {@inheritDoc} */
@@ -223,7 +227,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
             );
         }
 
             );
         }
 
-        apply(F.iterator(recs, this::deserialize, true, this::filterAndPossiblyUpdateMetadata));
+        applier.apply(F.iterator(recs, this::deserialize, true, this::filterAndPossiblyUpdateMetadata));
 
         cnsmr.commitSync(Duration.ofMillis(kafkaReqTimeout));
     }
 
         cnsmr.commitSync(Duration.ofMillis(kafkaReqTimeout));
     }
@@ -268,16 +272,6 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
         cnsmrs.forEach(KafkaConsumer::wakeup);
     }
 
         cnsmrs.forEach(KafkaConsumer::wakeup);
     }
 
-    /** {@inheritDoc} */
-    @Override protected IgniteEx ignite() {
-        return ign;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteLogger log() {
-        return log;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(KafkaToIgniteCdcStreamerApplier.class, this);
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(KafkaToIgniteCdcStreamerApplier.class, this);
index c14097b839b3db341ae1e571332cc7bf20c852dd..f15e061dafd5cb204b719d7f95e23cc2e8580681 100644 (file)
@@ -22,10 +22,11 @@ import java.util.Collections;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteLogger;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cdc.CdcEventsApplier;
 import org.apache.ignite.cdc.TypeMapping;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.cdc.TypeMapping;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryMetadata;
 import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -34,6 +35,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.VoidDeserializer;
 
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.VoidDeserializer;
 
+import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.registerBinaryMeta;
+import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.registerMapping;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
@@ -87,6 +90,8 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
 
     /** Polls all available records from metadata topic and applies it to Ignite. */
     public synchronized void updateMetadata() {
 
     /** Polls all available records from metadata topic and applies it to Ignite. */
     public synchronized void updateMetadata() {
+        BinaryContext ctx = ((CacheObjectBinaryProcessorImpl)ign.context().cacheObjects()).binaryContext();
+
         while (true) {
             ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
 
         while (true) {
             ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
 
@@ -100,9 +105,9 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
                 Object data = IgniteUtils.fromBytes(rec.value());
 
                 if (data instanceof BinaryMetadata)
                 Object data = IgniteUtils.fromBytes(rec.value());
 
                 if (data instanceof BinaryMetadata)
-                    CdcEventsApplier.registerBinaryMeta(ign, log, (BinaryMetadata)data);
+                    registerBinaryMeta(ctx, log, (BinaryMetadata)data);
                 else if (data instanceof TypeMapping)
                 else if (data instanceof TypeMapping)
-                    CdcEventsApplier.registerMapping(ign, log, (TypeMapping)data);
+                    registerMapping(ctx, log, (TypeMapping)data);
                 else
                     throw new IllegalArgumentException("Unknown meta type[type=" + data + ']');
             }
                 else
                     throw new IllegalArgumentException("Unknown meta type[type=" + data + ']');
             }
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
new file mode 100644 (file)
index 0000000..5484277
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.thin;
+
+import java.util.Map;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.AbstractCdcEventsApplier;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ * Contains logic to process {@link CdcEvent} and apply them to the destination cluster by thin client.
+ *
+ * @see TcpClientCache#putAllConflict(Map)
+ * @see TcpClientCache#removeAllConflict(Map)
+ */
+public class CdcEventsIgniteClientApplier extends AbstractCdcEventsApplier<Object, T2<Object, GridCacheVersion>> {
+    /** Client connected to the destination cluster. */
+    private final IgniteClient client;
+
+    /** Caches. */
+    private final IntMap<TcpClientCache<Object, Object>> ignCaches = new IntHashMap<>();
+
+    /**
+     * @param client Client connected to the destination cluster.
+     * @param maxBatchSize Maximum batch size.
+     * @param log Logger.
+     */
+    public CdcEventsIgniteClientApplier(IgniteClient client, int maxBatchSize, IgniteLogger log) {
+        super(maxBatchSize, log);
+
+        this.client = client;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Object toKey(CdcEvent evt) {
+        return evt.key();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected T2<Object, GridCacheVersion> toValue(int cacheId, Object val, GridCacheVersion ver) {
+        return new T2<>(val, ver);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void putAllConflict(int cacheId, Map<Object, T2<Object, GridCacheVersion>> drMap) {
+        cache(cacheId).putAllConflict(drMap);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void removeAllConflict(int cacheId, Map<Object, GridCacheVersion> drMap) {
+        cache(cacheId).removeAllConflict(drMap);
+    }
+
+    /** @return Cache. */
+    private TcpClientCache<Object, Object> cache(int cacheId) {
+        return ignCaches.computeIfAbsent(cacheId, id -> {
+            for (String cacheName : client.cacheNames()) {
+                if (CU.cacheId(cacheName) == id)
+                    return (TcpClientCache<Object, Object>)client.cache(cacheName).withKeepBinary();
+            }
+
+            throw new IllegalStateException("Cache with id not found [cacheId=" + id + ']');
+        });
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java
new file mode 100644 (file)
index 0000000..e62feba
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.thin;
+
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.AbstractIgniteCdcStreamer;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
+import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.client.thin.ClientBinary;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.typedef.internal.A;
+
+/**
+ * Change Data Consumer that streams all data changes to destination cluster through Ignite thin client.
+ * <p/>
+ * Consumer will just fail in case of any error during write. Fail of consumer will lead to the fail of {@code ignite-cdc} application.
+ * It expected that {@code ignite-cdc} will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as destination cluster unavailability or other issues.
+ * <p/>
+ * If you have plans to apply written messages to the other Ignite cluster in active-active manner,
+ * e.g. concurrent updates of the same entry in other cluster is possible,
+ * please, be aware of {@link CacheVersionConflictResolverImpl} conflict resolved.
+ * Configuration of {@link CacheVersionConflictResolverImpl} can be found in {@link KafkaToIgniteCdcStreamer} documentation.
+ *
+ * @see IgniteClient
+ * @see CdcMain
+ * @see CacheVersionConflictResolverImpl
+ */
+public class IgniteToIgniteClientCdcStreamer extends AbstractIgniteCdcStreamer {
+    /** Ignite thin client configuration. */
+    private ClientConfiguration destClientCfg;
+
+    /** Ignite thin client. */
+    private IgniteClient dest;
+
+    /** {@inheritDoc} */
+    @Override public void start(MetricRegistry mreg) {
+        super.start(mreg);
+
+        if (log.isInfoEnabled())
+            log.info("Ignite To Ignite Client Streamer [cacheIds=" + cachesIds + ']');
+
+        A.notNull(destClientCfg, "Destination thin client configuration");
+
+        dest = Ignition.startClient(destClientCfg);
+
+        applier = new CdcEventsIgniteClientApplier(dest, maxBatchSize, log);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected BinaryContext binaryContext() {
+        return ((ClientBinary)dest.binary()).binaryContext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        dest.close();
+    }
+
+    /**
+     * Sets Ignite thin client configuration that will connect to destination cluster.
+     *
+     * @param destClientCfg Ignite thin client configuration that will connect to destination cluster.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToIgniteClientCdcStreamer setDestinationClientConfiguration(ClientConfiguration destClientCfg) {
+        this.destClientCfg = destClientCfg;
+
+        return this;
+    }
+}
index 29f42539ffd1070a9d8b008ae683c55439b38854..5a0a221f6c6dcf6fbf9a2352febb79ab2489fb94 100644 (file)
@@ -70,8 +70,8 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
             CdcConfiguration cdcCfg = new CdcConfiguration();
 
             cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer()
             CdcConfiguration cdcCfg = new CdcConfiguration();
 
             cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer()
-                .setMaxBatchSize(KEYS_CNT)
                 .setDestinationIgniteConfiguration(destCfg)
                 .setDestinationIgniteConfiguration(destCfg)
+                .setMaxBatchSize(KEYS_CNT)
                 .setCaches(Collections.singleton(cache)));
 
             cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
                 .setCaches(Collections.singleton(cache)));
 
             cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
index 43e7db62eac74c09d0ea4feca3a97f655618cc15..f182e77e643a881783f8f5c84423360fe26e72b4 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.ignite.cdc;
 import org.apache.ignite.cdc.kafka.CdcKafkaReplicationAppsTest;
 import org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest;
 import org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest;
 import org.apache.ignite.cdc.kafka.CdcKafkaReplicationAppsTest;
 import org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest;
 import org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest;
+import org.apache.ignite.cdc.thin.CdcIgniteToIgniteClientReplicationTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -34,7 +35,8 @@ import org.junit.runners.Suite;
     KafkaToIgniteLoaderTest.class,
     CdcKafkaReplicationTest.class,
     CdcKafkaReplicationAppsTest.class,
     KafkaToIgniteLoaderTest.class,
     CdcKafkaReplicationTest.class,
     CdcKafkaReplicationAppsTest.class,
-    ConflictResolverRestartTest.class
+    ConflictResolverRestartTest.class,
+    CdcIgniteToIgniteClientReplicationTest.class
 })
 public class IgniteCdcTestSuite {
 }
 })
 public class IgniteCdcTestSuite {
 }
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java
new file mode 100644 (file)
index 0000000..b810e18
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.thin;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.ignite.cdc.AbstractReplicationTest;
+import org.apache.ignite.cdc.CdcConfiguration;
+import org.apache.ignite.cdc.IgniteToIgniteCdcStreamer;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ * {@link IgniteToIgniteClientCdcStreamer} test.
+ */
+public class CdcIgniteToIgniteClientReplicationTest extends AbstractReplicationTest {
+    /** {@inheritDoc} */
+    @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (int i = 0; i < srcCluster.length; i++)
+            futs.add(igniteToIgniteClient(srcCluster[i].configuration(), destCluster, cache));
+
+        return futs;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (int i = 0; i < srcCluster.length; i++)
+            futs.add(igniteToIgniteClient(srcCluster[i].configuration(), destCluster, ACTIVE_ACTIVE_CACHE));
+
+        for (int i = 0; i < destCluster.length; i++)
+            futs.add(igniteToIgniteClient(destCluster[i].configuration(), srcCluster, ACTIVE_ACTIVE_CACHE));
+
+        return futs;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkConsumerMetrics(Function<String, Long> longMetric) {
+        assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.LAST_EVT_TIME));
+        assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.EVTS_CNT));
+    }
+
+    /**
+     * @param srcCfg Ignite source node configuration.
+     * @param dest Destination cluster.
+     * @param cache Cache name to replicate.
+     * @return Future for Change Data Capture application.
+     */
+    private IgniteInternalFuture<?> igniteToIgniteClient(IgniteConfiguration srcCfg, IgniteEx[] dest, String cache) {
+        return runAsync(() -> {
+            ClientConfiguration clientCfg = new ClientConfiguration();
+
+            String[] addrs = new String[dest.length];
+
+            for (int i = 0; i < dest.length; i++) {
+                ClusterNode node = dest[i].localNode();
+
+                addrs[i] = F.first(node.addresses()) + ":" + node.attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT);
+            }
+
+            clientCfg.setAddresses(addrs);
+
+            CdcConfiguration cdcCfg = new CdcConfiguration();
+
+            cdcCfg.setConsumer(new IgniteToIgniteClientCdcStreamer()
+                .setDestinationClientConfiguration(clientCfg)
+                .setCaches(Collections.singleton(cache))
+                .setMaxBatchSize(KEYS_CNT));
+
+            cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
+
+            CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg);
+
+            cdcs.add(cdc);
+
+            cdc.run();
+        });
+    }
+}