diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java index a5cae1194cfd7..9003f5625d9dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -55,7 +56,7 @@ public class TxDeadlockDetection { public static final int DFLT_TX_DEADLOCK_DETECTION_TIMEOUT = 60000; /** Deadlock detection maximum iterations. */ - private static int deadLockTimeout = + private static final int DEAD_LOCK_TIMEOUT = getInteger(IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT, DFLT_TX_DEADLOCK_DETECTION_TIMEOUT); /** Sequence. */ @@ -80,7 +81,7 @@ public TxDeadlockDetection(GridCacheSharedContext cctx) { * * @param tx Target tx. * @param keys Keys. - * @return {@link TxDeadlock} if found, otherwise - {@code null}. + * @return {@link TxDeadlockFuture} future. */ TxDeadlockFuture detectDeadlock(IgniteInternalTx tx, Set keys) { GridCacheVersion txId = tx.nearXidVersion(); @@ -101,7 +102,7 @@ TxDeadlockFuture detectDeadlock(IgniteInternalTx tx, Set keys) { * @param wfg Wait-for-graph. * @param txId Tx ID - start vertex for cycle search in graph. */ - static List findCycle(Map> wfg, GridCacheVersion txId) { + static @Nullable List findCycle(Map> wfg, GridCacheVersion txId) { if (wfg == null || wfg.isEmpty()) return null; @@ -181,7 +182,7 @@ static class TxDeadlockFuture extends GridFutureAdapter { /** Pending keys. */ @GridToStringInclude - private Map> pendingKeys = new HashMap<>(); + private final Map> pendingKeys = new HashMap<>(); /** Nodes queue. */ @GridToStringInclude @@ -233,7 +234,7 @@ private TxDeadlockFuture(GridCacheSharedContext cctx, this.topVer = topVer; this.keys = keys; - if (deadLockTimeout > 0) { + if (DEAD_LOCK_TIMEOUT > 0) { timeoutObj = new DeadlockTimeoutObject(); cctx.time().addTimeoutObject(timeoutObj); @@ -322,7 +323,7 @@ private void detect(TxLocksResponse res) { * Maps tx keys on nodes. Key can be mapped on some node if this node is primary for given key or * node is near for transaction that holds or requests lock for key. * - * Key will not be be mapped to node if both key and node are already handled. + * Key will not be mapped to node if both key and node are already handled. * * @param txKeys Tx keys. * @param txLocks Tx locks. @@ -348,10 +349,7 @@ private void mapTxKeys(@Nullable Set txKeys, Map mappedKeys = pendingKeys.get(nodeId); - - if (mappedKeys == null) - pendingKeys.put(nodeId, mappedKeys = new HashSet<>()); + Set mappedKeys = pendingKeys.computeIfAbsent(nodeId, k -> new HashSet<>()); mappedKeys.add(txKey); } @@ -363,10 +361,7 @@ private void mapTxKeys(@Nullable Set txKeys, Map mappedKeys = pendingKeys.get(nearNodeId); - - if (mappedKeys == null) - pendingKeys.put(nearNodeId, mappedKeys = new HashSet<>()); + Set mappedKeys = pendingKeys.computeIfAbsent(nearNodeId, k -> new HashSet<>()); mappedKeys.add(txKey); } @@ -387,10 +382,7 @@ private void mapTxKeys(@Nullable Set txKeys, Map mappedKeys = pendingKeys.get(nodeId); - - if (mappedKeys == null) - pendingKeys.put(nodeId, mappedKeys = new HashSet<>()); + Set mappedKeys = pendingKeys.computeIfAbsent(nodeId, k -> new HashSet<>()); mappedKeys.add(txKey); } @@ -417,7 +409,7 @@ private UUID primary(IgniteTxKey txKey) { private void merge(TxLocksResponse res) { Map> txLocks = res.txLocks(); - if (txLocks == null || txLocks.isEmpty()) + if (F.isEmpty(txLocks)) return; for (Map.Entry> e : txLocks.entrySet()) { @@ -425,7 +417,7 @@ private void merge(TxLocksResponse res) { List lockList = e.getValue(); - if (lockList != null && !lockList.isEmpty()) { + if (!F.isEmpty(lockList)) { for (TxLock lock : lockList) { if (lock.owner() || lock.candiate()) { if (txs.get(lock.txId()) == null) @@ -435,18 +427,12 @@ private void merge(TxLocksResponse res) { if (lock.owner()) { GridCacheVersion txId = lock.txId(); - Set keys = txLockedKeys.get(txId); - - if (keys == null) - txLockedKeys.put(txId, keys = new HashSet<>()); + Set keys = txLockedKeys.computeIfAbsent(txId, k -> new HashSet<>()); keys.add(txKey); } else if (lock.candiate()) { - Set txs = txRequestedKeys.get(txKey); - - if (txs == null) - txRequestedKeys.put(txKey, txs = new HashSet<>()); + Set txs = txRequestedKeys.computeIfAbsent(txKey, k -> new HashSet<>()); txs.add(lock.txId()); } @@ -473,10 +459,7 @@ private void updateWaitForGraph(Map> txLocks) { txOwner = lock.txId(); if (keys.contains(e.getKey()) && !txId.equals(lock.txId())) { - Set waitingTxs = wfg.get(txId); - - if (waitingTxs == null) - wfg.put(txId, waitingTxs = new HashSet<>()); + Set waitingTxs = wfg.computeIfAbsent(txId, k -> new HashSet<>()); waitingTxs.add(lock.txId()); } @@ -487,10 +470,7 @@ private void updateWaitForGraph(Map> txLocks) { if (lock.candiate() || lock.owner()) { GridCacheVersion txId0 = lock.txId(); - Set waitForTxs = wfg.get(txId0); - - if (waitForTxs == null) - wfg.put(txId0, waitForTxs = new HashSet<>()); + Set waitForTxs = wfg.computeIfAbsent(txId0, k -> new HashSet<>()); waitForTxs.add(txOwner); } @@ -562,7 +542,7 @@ private class DeadlockTimeoutObject extends GridTimeoutObjectAdapter { * Default constructor. */ DeadlockTimeoutObject() { - super(deadLockTimeout); + super(DEAD_LOCK_TIMEOUT); } /** {@inheritDoc} */ @@ -571,7 +551,7 @@ private class DeadlockTimeoutObject extends GridTimeoutObjectAdapter { IgniteLogger log = cctx.kernalContext().log(this.getClass()); - U.warn(log, "Deadlock detection was timed out [timeout=" + deadLockTimeout + ", fut=" + this + ']'); + U.warn(log, "Deadlock detection was timed out [timeout=" + DEAD_LOCK_TIMEOUT + ", fut=" + this + ']'); onDone(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockCauseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockCauseTest.java index a68543abfb509..8d57871626af1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockCauseTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockCauseTest.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; @@ -165,6 +164,9 @@ private void checkCauseObject( final TransactionIsolation isolation, final boolean oneOp ) throws Exception { + if (nodes > 1) + awaitPartitionMapExchange(); + final Ignite ignite = grid(new Random().nextInt(nodes)); final IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME); @@ -183,7 +185,7 @@ private void checkCauseObject( final CyclicBarrier barrier = new CyclicBarrier(2); IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CAX() { - @Override public void applyx() throws IgniteCheckedException { + @Override public void applyx() { try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.PESSIMISTIC, isolation, timeout, keys.size())) { @@ -204,7 +206,9 @@ private void checkCauseObject( tx.commit(); } catch (Exception e) { - ex.compareAndSet(null, e); + // TransactionDeadlockException raised at least for one transaction involved in the deadlock + if (X.hasCause(e, TransactionDeadlockException.class)) + ex.compareAndSet(null, e); } } }, 2, "tx"); @@ -268,7 +272,7 @@ static class Account implements Serializable { /** * Change balance by specified amount. * - * @param amount Amount to add to balance (may be negative). + * @param amount Amount to add to balance (maybe negative). */ void update(double amount) { balance += amount; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java index 5898f0a370191..548b67b925d4d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -32,14 +33,13 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.junit.Test; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT; -import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -54,6 +54,18 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest { /** Cache. */ private static final String CACHE = "cache"; + /** Log listener. */ + private final ListeningTestLogger listeningLog = new ListeningTestLogger(log); + + /** Deadlock timeout, it`s unexpected during these tests. */ + private static final AtomicBoolean DEAD_LOCK_FLAG = new AtomicBoolean(); + + /** */ + private static final Consumer DEAD_LOCK_LSNR = s -> { + if (s.contains("Deadlock detection was timed out")) + DEAD_LOCK_FLAG.set(true); + }; + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -68,6 +80,11 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest { cfg.setCacheConfiguration(ccfg); + assertFalse(DEAD_LOCK_FLAG.get()); + + listeningLog.registerListener(DEAD_LOCK_LSNR); + cfg.setGridLogger(listeningLog); + return cfg; } @@ -82,20 +99,9 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest { @Override protected void afterTest() throws Exception { super.afterTest(); - stopAllGrids(); - } + assertFalse(DEAD_LOCK_FLAG.get()); - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - GridTestUtils.setFieldValue(TxDeadlockDetection.class, "deadLockTimeout", (int)(getTestTimeout() * 2)); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - GridTestUtils.setFieldValue(TxDeadlockDetection.class, "deadLockTimeout", - getInteger(IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT, 60000)); + stopAllGrids(); } /** {@inheritDoc} */