diff --git a/fcrepo-kernel-impl/src/main/java/org/fcrepo/kernel/impl/TransactionManagerImpl.java b/fcrepo-kernel-impl/src/main/java/org/fcrepo/kernel/impl/TransactionManagerImpl.java index 8724f78ba7..53bca262ff 100644 --- a/fcrepo-kernel-impl/src/main/java/org/fcrepo/kernel/impl/TransactionManagerImpl.java +++ b/fcrepo-kernel-impl/src/main/java/org/fcrepo/kernel/impl/TransactionManagerImpl.java @@ -19,7 +19,8 @@ import static java.util.UUID.randomUUID; -import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.inject.Inject; @@ -30,6 +31,7 @@ import org.fcrepo.kernel.api.exception.TransactionNotFoundException; import org.fcrepo.kernel.api.observer.EventAccumulator; import org.fcrepo.persistence.api.PersistentStorageSessionManager; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** @@ -40,7 +42,7 @@ @Component public class TransactionManagerImpl implements TransactionManager { - private final HashMap transactions; + private final Map transactions; @Inject private ContainmentIndex containmentIndex; @@ -52,10 +54,32 @@ public class TransactionManagerImpl implements TransactionManager { private EventAccumulator eventAccumulator; TransactionManagerImpl() { - transactions = new HashMap<>(); + transactions = new ConcurrentHashMap<>(); } - // TODO Add a timer to periodically rollback and cleanup expired transaction? + /** + * Periodically scan for closed transactions for cleanup + */ + @Scheduled(fixedDelayString = "#{systemProperties['fcrepo.session.timeout'] ?: 180000}") + public void cleanupClosedTransactions() { + final var txIt = transactions.entrySet().iterator(); + while (txIt.hasNext()) { + final var txEntry = txIt.next(); + final var tx = txEntry.getValue(); + + // Cleanup if transaction is closed and past its expiration time + if (tx.isCommitted() || tx.isRolledBack()) { + if (tx.hasExpired()) { + txIt.remove(); + } + } else if (tx.hasExpired()) { + // If the tx has expired but is not already closed, then rollback + // but don't immediately remove it from the list of transactions + // so that the rolled back status can be checked + tx.rollback(); + } + } + } @Override public synchronized Transaction create() { @@ -74,7 +98,6 @@ public Transaction get(final String transactionId) { final Transaction transaction = transactions.get(transactionId); if (transaction.hasExpired()) { transaction.rollback(); - transactions.remove(transactionId); throw new TransactionClosedException("Transaction with transactionId: " + transactionId + " expired at " + transaction.getExpires() + "!"); } diff --git a/fcrepo-kernel-impl/src/test/java/org/fcrepo/kernel/impl/TransactionManagerImplTest.java b/fcrepo-kernel-impl/src/test/java/org/fcrepo/kernel/impl/TransactionManagerImplTest.java index 85e03c844f..7416ce4de7 100644 --- a/fcrepo-kernel-impl/src/test/java/org/fcrepo/kernel/impl/TransactionManagerImplTest.java +++ b/fcrepo-kernel-impl/src/test/java/org/fcrepo/kernel/impl/TransactionManagerImplTest.java @@ -19,16 +19,20 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.springframework.test.util.ReflectionTestUtils.setField; import org.fcrepo.kernel.api.ContainmentIndex; -import org.fcrepo.kernel.api.TransactionManager; +import org.fcrepo.kernel.api.exception.TransactionClosedException; +import org.fcrepo.kernel.api.exception.TransactionNotFoundException; import org.fcrepo.kernel.api.exception.TransactionRuntimeException; import org.fcrepo.kernel.api.observer.EventAccumulator; import org.fcrepo.persistence.api.PersistentStorageSession; import org.fcrepo.persistence.api.PersistentStorageSessionManager; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -45,7 +49,7 @@ public class TransactionManagerImplTest { private TransactionImpl testTx; - private TransactionManager testTxManager; + private TransactionManagerImpl testTxManager; @Mock private PersistentStorageSessionManager pssManager; @@ -69,6 +73,11 @@ public void setUp() { testTx = (TransactionImpl) testTxManager.create(); } + @After + public void cleanup() { + System.clearProperty(TransactionImpl.TIMEOUT_SYSTEM_PROPERTY); + } + @Test public void testCreateTransaction() { testTx = (TransactionImpl) testTxManager.create(); @@ -88,8 +97,110 @@ public void testGetTransactionWithInvalidID() { } @Test(expected = TransactionRuntimeException.class) - public void testGetExpiredTransaction() { + public void testGetExpiredTransaction() throws Exception { testTx.expire(); - testTxManager.get(testTx.getId()); + try { + testTxManager.get(testTx.getId()); + } finally { + // Make sure rollback is triggered + verify(psSession).rollback(); + } + } + + @Test + public void testCleanupClosedTransactions() { + System.setProperty(TransactionImpl.TIMEOUT_SYSTEM_PROPERTY, "10000"); + + final var commitTx = testTxManager.create(); + commitTx.commit(); + final var continuingTx = testTxManager.create(); + final var rollbackTx = testTxManager.create(); + rollbackTx.rollback(); + + // verify that transactions retrievable before cleanup + try { + testTxManager.get(commitTx.getId()); + fail("Transaction must be committed"); + } catch(final TransactionClosedException e) { + //expected + } + try { + testTxManager.get(rollbackTx.getId()); + fail("Transaction must be rolled back"); + } catch(final TransactionClosedException e) { + //expected + } + + assertNotNull("Continuing transaction must be present", + testTxManager.get(continuingTx.getId())); + + testTxManager.cleanupClosedTransactions(); + + // Verify that the closed transactions are stick around since they haven't expired yet + try { + testTxManager.get(commitTx.getId()); + fail("Transaction must be present but committed"); + } catch(final TransactionClosedException e) { + //expected + } + try { + testTxManager.get(rollbackTx.getId()); + fail("Transaction must be present but rolled back"); + } catch(final TransactionClosedException e) { + //expected + } + + // Force expiration of the closed transactions, rather than waiting for it + commitTx.expire(); + rollbackTx.expire(); + testTxManager.cleanupClosedTransactions(); + + // verify that closed transactions cleanedup + try { + testTxManager.get(commitTx.getId()); + fail("Committed transaction was not cleaned up"); + } catch (final TransactionNotFoundException e) { + //expected + } + try { + testTxManager.get(rollbackTx.getId()); + fail("Rolled back transaction was not cleaned up"); + } catch (final TransactionNotFoundException e) { + //expected + } + + assertNotNull("Continuing transaction must be present", + testTxManager.get(continuingTx.getId())); + } + + // Check that the scheduled cleanup process rolls back expired transactions, but leaves + // them around until the next cleanup call so that they can be queried. + @Test + public void testCleanupExpiringTransaction() throws Exception { + System.setProperty(TransactionImpl.TIMEOUT_SYSTEM_PROPERTY, "0"); + + final var expiringTx = testTxManager.create(); + + Thread.sleep(100); + + testTxManager.cleanupClosedTransactions(); + + try { + testTxManager.get(expiringTx.getId()); + fail("Transaction must be expired"); + } catch(final TransactionClosedException e) { + //expected + } + + verify(psSession).rollback(); + + testTxManager.cleanupClosedTransactions(); + + try { + testTxManager.get(expiringTx.getId()); + fail("Expired transaction was not cleaned up"); + } catch (final TransactionNotFoundException e) { + //expected + } } }