IGNITE-9739 don't write non-baseline nodes to wal TxRecord
authorSergey Kosarev <skosarev@gridgain.com>
Wed, 16 Jan 2019 15:43:38 +0000 (18:43 +0300)
committerAndrey Gura <agura@apache.org>
Wed, 16 Jan 2019 15:43:38 +0000 (18:43 +0300)
Signed-off-by: Andrey Gura <agura@apache.org>
modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java

index 59f773d..ac72afe 100644 (file)
@@ -99,8 +99,6 @@ public class ConsistentIdMapper {
         if (txNodes == null)
             return null;
 
-        Map<Object, Short> constIdMap = baselineTop.consistentIdMapping();
-
         Map<UUID, Short> m = discoveryMgr.consistentId(topVer);
 
         int bltNodes = m.size();
@@ -112,15 +110,19 @@ public class ConsistentIdMapper {
         for (Map.Entry<UUID, Collection<UUID>> e : txNodes.entrySet()) {
             UUID node = e.getKey();
 
+            if (!m.containsKey(node)) // not in blt
+                continue;
+
             Collection<UUID> backupNodes = e.getValue();
 
             Collection<Short> backups = new ArrayList<>(backupNodes.size());
 
             for (UUID backup : backupNodes) {
-                if (m.containsKey(backup))
+                if (m.containsKey(backup)) {
                     nodeCnt++;
 
-                backups.add(mapToCompactId(topVer, backup));
+                    backups.add(mapToCompactId(topVer, backup));
+                }
             }
 
             // Optimization for short store full nodes set.
index ce914e8..e55676c 100644 (file)
@@ -2476,13 +2476,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         if (cctx.wal() != null && logTxRecords) {
             TxRecord txRecord = newTxRecord(tx);
 
-            try {
-                return cctx.wal().log(txRecord);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to log TxRecord: " + txRecord, e);
+            if (txRecord != null) {
+                try {
+                    return cctx.wal().log(txRecord);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to log TxRecord: " + txRecord, e);
 
-                throw new IgniteException("Failed to log TxRecord: " + txRecord, e);
+                    throw new IgniteException("Failed to log TxRecord: " + txRecord, e);
+                }
             }
         }
 
@@ -2498,12 +2500,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private TxRecord newTxRecord(IgniteTxAdapter tx) {
         BaselineTopology baselineTop = cctx.kernalContext().state().clusterState().baselineTopology();
 
-        Map<Short, Collection<Short>> nodes = tx.consistentIdMapper.mapToCompactIds(tx.topVer, tx.txNodes, baselineTop);
+        if (baselineTop != null && baselineTop.consistentIds().contains(cctx.localNode().consistentId())) {
+            Map<Short, Collection<Short>> nodes = tx.consistentIdMapper.mapToCompactIds(tx.topVer, tx.txNodes, baselineTop);
 
-        if (tx.txState().mvccEnabled())
-            return new MvccTxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes, tx.mvccSnapshot());
-        else
-            return new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes);
+            if (tx.txState().mvccEnabled())
+                return new MvccTxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes, tx.mvccSnapshot());
+            else
+                return new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes);
+        }
+
+        return null;
     }
 
     /**
index 2727350..e2cef92 100644 (file)
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
@@ -47,17 +48,21 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
@@ -103,6 +108,8 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
         client = false;
 
         disableAutoActivation = false;
+
+        System.clearProperty(IGNITE_WAL_LOG_TX_RECORDS);
     }
 
     /** {@inheritDoc} */
@@ -834,6 +841,110 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNotMapNonBaselineTxPrimaryNodes() throws Exception {
+        checkNotMapNonBaselineTxNodes(true, false);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNotMapNonBaselineTxBackupNodes() throws Exception {
+        checkNotMapNonBaselineTxNodes(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNotMapNonBaselineNearTxPrimaryNodes() throws Exception {
+        checkNotMapNonBaselineTxNodes(true, true);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNotMapNonBaselineNearTxBackupNodes() throws Exception {
+        checkNotMapNonBaselineTxNodes(false, true);
+    }
+
+    /**
+     * @param primary Whether non-baseline node is primary.
+     * @param near Whether non-baseline nod is near node.
+     * @throws Exception If failed.
+     */
+    public void checkNotMapNonBaselineTxNodes(boolean primary, boolean near) throws Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, "true");
+
+        int bltNodesCnt = 3;
+
+        Ignite ig = startGrids(bltNodesCnt);
+
+        ig.cluster().active(true);
+
+        ig.createCache(new CacheConfiguration<>()
+            .setName(CACHE_NAME)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setBackups(2));
+
+        ig.createCache(
+            new CacheConfiguration<>()
+                .setName(CACHE_NAME + 1)
+                .setDataRegionName("memory")
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setBackups(2)
+        );
+
+        Ignite nonBltIgnite = startGrid(bltNodesCnt);
+
+        awaitPartitionMapExchange();
+
+        ClusterNode nonBltNode = nonBltIgnite.cluster().localNode();
+
+        Ignite nearIgnite = near ? nonBltIgnite : ig;
+
+        IgniteCache<Integer, Integer> persistentCache = nearIgnite.cache(CACHE_NAME);
+
+        IgniteCache<Integer, Integer> inMemoryCache = nearIgnite.cache(CACHE_NAME + 1);
+
+        assertEquals(0, nearIgnite.affinity(persistentCache.getName()).allPartitions(nonBltNode).length);
+
+        assertTrue(nearIgnite.affinity(inMemoryCache.getName()).allPartitions(nonBltNode).length > 0);
+
+        ClusterNode nearNode = nearIgnite.cluster().localNode();
+
+        try (Transaction tx = nearIgnite.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED)) {
+            for (int i = 0; ; i++) {
+                List<ClusterNode> nodes = new ArrayList<>(nearIgnite.affinity(inMemoryCache.getName())
+                    .mapKeyToPrimaryAndBackups(i));
+
+                ClusterNode primaryNode = nodes.get(0);
+
+                List<ClusterNode> backupNodes = nodes.subList(1, nodes.size());
+
+                if (nonBltNode.equals(primaryNode) == primary) {
+                    if (backupNodes.contains(nonBltNode) != primary) {
+                        inMemoryCache.put(i, i);
+
+                        // add some persistent data in the same transaction
+                        for (int j = 0; j < 100; j++)
+                            persistentCache.put(j, j);
+
+                        break;
+                    }
+                }
+            }
+            tx.commit();
+        }
+    }
+
+    /**
      * @throws Exception if failed.
      */
     @Test