IGNITE-10434: MVCC: Fixed races on asynchronous TX rollback. This closes #5569.
authorIgor Seliverstov <gvvinblade@gmail.com>
Fri, 28 Dec 2018 11:34:05 +0000 (14:34 +0300)
committerIgor Seliverstov <gvvinblade@gmail.com>
Fri, 28 Dec 2018 11:34:05 +0000 (14:34 +0300)
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxQueryEnlistResultHandler.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java

index a915478..c4255fb 100644 (file)
@@ -121,9 +121,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     @GridToStringInclude
     protected IgniteTxRemoteState txState;
 
-    /** {@code True} if tx should skip adding itself to completed version map on finish. */
-    private boolean skipCompletedVers;
-
     /** Transaction label. */
     @GridToStringInclude
     @Nullable private String txLbl;
@@ -918,7 +915,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
             // Note that we don't evict near entries here -
             // they will be deleted by their corresponding transactions.
             if (state(ROLLING_BACK) || state() == UNKNOWN) {
-                cctx.tm().rollbackTx(this, false, skipCompletedVers);
+                cctx.tm().rollbackTx(this, false, skipCompletedVersions());
 
                 TxCounters counters = txCounters(false);
 
@@ -964,20 +961,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /**
-     * @return {@code True} if tx should skip adding itself to completed version map on finish.
-     */
-    public boolean skipCompletedVersions() {
-        return skipCompletedVers;
-    }
-
-    /**
-     * @param skipCompletedVers {@code True} if tx should skip adding itself to completed version map on finish.
-     */
-    public void skipCompletedVersions(boolean skipCompletedVers) {
-        this.skipCompletedVers = skipCompletedVers;
-    }
-
-    /**
      * Adds explicit version if there is one.
      *
      * @param e Transaction entry.
index 2864712..936758d 100644 (file)
@@ -898,6 +898,8 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
             m.put(node.id(), mapping = new GridDistributedTxMapping(node));
 
         mapping.markQueryUpdate();
+
+        checkCompleted();
     }
 
     /** */
index 00289ca..72f0173 100644 (file)
@@ -124,7 +124,7 @@ public final class NearTxQueryEnlistResultHandler implements CI1<IgniteInternalF
         GridNearTxQueryEnlistResponse res = createResponse(fut);
 
         if (res.removeMapping()) {
-            tx.forceSkipCompletedVersions();
+            tx.skipCompletedVersions(true);
 
             tx.rollbackDhtLocalAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
                 @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut0) {
index e93834b..b52d440 100644 (file)
@@ -275,6 +275,8 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
 
         if (node.isLocal())
             tx.colocatedLocallyMapped(true);
+
+        checkCompleted();
     }
 
     /**
@@ -395,15 +397,15 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
             tx.setRollbackOnly();
 
         synchronized (this) {
-            boolean done = super.onDone(res, err, cancelled);
-
-            assert done;
-
             GridDhtTxAbstractEnlistFuture localFuture0 = localEnlistFuture;
 
             if (localFuture0 != null && (err != null || cancelled))
                 localFuture0.onDone(cancelled ? new IgniteFutureCancelledCheckedException("Future was cancelled: " + localFuture0) : err);
 
+            boolean done = super.onDone(res, err, cancelled);
+
+            assert done;
+
             // Clean up.
             cctx.mvcc().removeVersionedFuture(this);
 
index 245ef39..b0a83dc 100644 (file)
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.NearTxQueryEnlistResultHandler.createResponse;
 
 /**
@@ -243,7 +244,7 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu
         IgniteInternalFuture<?> txSync = cctx.tm().awaitFinishAckAsync(nodeId, tx.threadId());
 
         if (txSync == null || txSync.isDone())
-            cctx.io().send(nodeId, req, cctx.ioPolicy());
+            cctx.io().send(nodeId, req, QUERY_POOL); // Process query requests in query pool.
         else
             txSync.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> f) {
index 7588fb0..bb27ff0 100644 (file)
@@ -38,6 +38,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.Contract;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -662,5 +663,5 @@ public interface IgniteInternalTx {
      * @return Transaction counters.
      * @param createIfAbsent {@code True} if non-null instance is needed.
      */
-    @Nullable public TxCounters txCounters(boolean createIfAbsent);
+    @Nullable @Contract("true -> !null;") public TxCounters txCounters(boolean createIfAbsent);
 }
index 8be90dd..b392feb 100644 (file)
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -39,6 +37,8 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -278,6 +278,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     @GridToStringInclude
     protected volatile MvccSnapshot mvccSnapshot;
 
+    /** {@code True} if tx should skip adding itself to completed version map on finish. */
+    private boolean skipCompletedVers;
+
     /** Rollback finish future. */
     @GridToStringExclude
     private volatile IgniteInternalFuture rollbackFut;
@@ -428,6 +431,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     }
 
     /**
+     * @return {@code True} if tx should skip adding itself to completed version map on finish.
+     */
+    public boolean skipCompletedVersions() {
+        return skipCompletedVers;
+    }
+
+    /**
+     * @param skipCompletedVers {@code True} if tx should skip adding itself to completed version map on finish.
+     */
+    public void skipCompletedVersions(boolean skipCompletedVers) {
+        this.skipCompletedVers = skipCompletedVers;
+    }
+
+    /**
      * @return Shared cache context.
      */
     public GridCacheSharedContext<?, ?> context() {
@@ -1988,7 +2005,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     public abstract void addActiveCache(GridCacheContext cacheCtx, boolean recovery) throws IgniteCheckedException;
 
     /** {@inheritDoc} */
-    @Nullable @Override public TxCounters txCounters(boolean createIfAbsent) {
+    @Override public TxCounters txCounters(boolean createIfAbsent) {
         if (createIfAbsent && txCounters == null)
             TX_COUNTERS_UPD.compareAndSet(this, null, new TxCounters());
 
index 319073b..a9e6e47 100644 (file)
@@ -1179,9 +1179,6 @@ public class IgniteTxHandler {
             nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null;
             dhtTx = startRemoteTx(nodeId, req, res);
 
-            if (dhtTx != null && req.updateCounters() != null) // Remember update counters on prepare state.
-                dhtTx.txCounters(true).updateCounters(req.updateCounters());
-
             // Set evicted keys from near transaction.
             if (nearTx != null)
                 res.nearEvicted(nearTx.evicted());
@@ -1335,23 +1332,28 @@ public class IgniteTxHandler {
             return;
         }
 
-        final GridDhtTxRemote dhtTx = ctx.tm().tx(req.version());
+        // Always add version to rollback history to prevent races with rollbacks.
+        if (!req.commit())
+            ctx.tm().addRolledbackTx(null, req.version());
+
+        GridDhtTxRemote dhtTx = ctx.tm().tx(req.version());
         GridNearTxRemote nearTx = ctx.tm().nearTx(req.version());
 
-        final GridCacheVersion nearTxId =
-            (dhtTx != null ? dhtTx.nearXidVersion() : (nearTx != null ? nearTx.nearXidVersion() : null));
+        IgniteInternalTx anyTx = U.<IgniteInternalTx>firstNotNull(dhtTx, nearTx);
 
-        if (txFinishMsgLog.isDebugEnabled()) {
-            txFinishMsgLog.debug("Received dht finish request [txId=" + nearTxId +
-                ", dhtTxId=" + req.version() +
+        final GridCacheVersion nearTxId = anyTx != null ? anyTx.nearXidVersion() : null;
+
+        if (txFinishMsgLog.isDebugEnabled())
+            txFinishMsgLog.debug("Received dht finish request [txId=" + nearTxId + ", dhtTxId=" + req.version() +
                 ", node=" + nodeId + ']');
-        }
 
-        // Safety - local transaction will finish explicitly.
-        if (nearTx != null && nearTx.local())
-            nearTx = null;
+        if (anyTx == null && req.commit())
+            ctx.tm().addCommittedTx(null, req.version(), null);
 
-        finish(nodeId, dhtTx, req);
+        if (dhtTx != null)
+            finish(nodeId, dhtTx, req);
+        else
+            applyPartitionsUpdatesCounters(req.updateCounters());
 
         if (nearTx != null)
             finish(nodeId, nearTx, req);
@@ -1404,31 +1406,7 @@ public class IgniteTxHandler {
         IgniteTxRemoteEx tx,
         GridDhtTxFinishRequest req
     ) {
-        // We don't allow explicit locks for transactions and
-        // therefore immediately return if transaction is null.
-        // However, we may decide to relax this restriction in
-        // future.
-        if (tx == null) {
-            if (req.commit())
-                // Must be some long time duplicate, but we add it anyway.
-                ctx.tm().addCommittedTx(tx, req.version(), null);
-            else
-                ctx.tm().addRolledbackTx(tx, req.version());
-
-            if (log.isDebugEnabled())
-                log.debug("Received finish request for non-existing transaction (added to completed set) " +
-                    "[senderNodeId=" + nodeId + ", res=" + req + ']');
-
-            return;
-        }
-        else {
-            if (req.updateCounters() != null)
-                tx.txCounters(true).updateCounters(req.updateCounters());
-
-            if (log.isDebugEnabled())
-                log.debug("Received finish request for transaction [senderNodeId=" + nodeId + ", req=" + req +
-                    ", tx=" + tx + ']');
-        }
+        assert tx != null;
 
         req.txState(tx.txState());
 
@@ -1448,6 +1426,9 @@ public class IgniteTxHandler {
                 tx.commitRemoteTx();
             }
             else {
+                if (tx.dht() && req.updateCounters() != null)
+                    tx.txCounters(true).updateCounters(req.updateCounters());
+
                 tx.doneRemote(req.baseVersion(), null, null, null);
                 tx.mvccSnapshot(req.mvccSnapshot());
                 tx.rollbackRemoteTx();
@@ -1715,6 +1696,8 @@ public class IgniteTxHandler {
                     if (log.isDebugEnabled())
                         log.debug("Attempt to start a completed transaction (will ignore): " + tx);
 
+                    applyPartitionsUpdatesCounters(req.updateCounters());
+
                     return null;
                 }
 
@@ -1725,6 +1708,8 @@ public class IgniteTxHandler {
 
                     ctx.tm().uncommitTx(tx);
 
+                    applyPartitionsUpdatesCounters(req.updateCounters());
+
                     return null;
                 }
             }
@@ -1733,6 +1718,9 @@ public class IgniteTxHandler {
                 tx.transactionNodes(req.transactionNodes());
             }
 
+            if (req.updateCounters() != null)
+                tx.txCounters(true).updateCounters(req.updateCounters());
+
             if (!tx.isSystemInvalidate()) {
                 int idx = 0;
 
index c829d45..0d78017 100644 (file)
@@ -162,9 +162,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     /** */
     private volatile boolean qryEnlisted;
 
-    /** Whether to skip update of completed versions map during rollback caused by empty update set in MVCC TX. */
-    private boolean forceSkipCompletedVers;
-
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -1112,7 +1109,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         }
 
         if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
-            cctx.tm().rollbackTx(this, clearThreadMap, forceSkipCompletedVers);
+            cctx.tm().rollbackTx(this, clearThreadMap, skipCompletedVersions());
 
             cctx.mvccCaching().onTxFinished(this, false);
 
@@ -1133,14 +1130,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /**
-     * Forces transaction to skip update of completed versions map during rollback caused by empty update set
-     * in MVCC TX.
-     */
-    public void forceSkipCompletedVersions() {
-        forceSkipCompletedVers = true;
-    }
-
-    /**
      * @param ctx Cache context.
      * @param key Key.
      * @param expiryPlc Expiry policy.
index c480c2d..bdc5a37 100644 (file)
@@ -650,9 +650,6 @@ public class TxRollbackAsyncTest extends GridCommonAbstractTest {
      */
     @Test
     public void testMixedAsyncRollbackTypes() throws Exception {
-        if (MvccFeatureChecker.forcedMvcc())
-            fail("https://issues.apache.org/jira/browse/IGNITE-10434");
-
         final Ignite client = startClient();
 
         final AtomicBoolean stop = new AtomicBoolean();