Skip to content

Commit

Permalink
Periodic Transactions garbage collection (fcrepo#1662)
Browse files Browse the repository at this point in the history
* Cleanup closed transactions on a regular basis, but only if they have expired. This will also expire and rollback stale transactions

* Add sleep
  • Loading branch information
bbpennel authored Apr 21, 2020
1 parent cd97a2e commit 37cc1b4
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

import static java.util.UUID.randomUUID;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.inject.Inject;

Expand All @@ -30,6 +31,7 @@
import org.fcrepo.kernel.api.exception.TransactionNotFoundException;
import org.fcrepo.kernel.api.observer.EventAccumulator;
import org.fcrepo.persistence.api.PersistentStorageSessionManager;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
Expand All @@ -40,7 +42,7 @@
@Component
public class TransactionManagerImpl implements TransactionManager {

private final HashMap<String, Transaction> transactions;
private final Map<String, Transaction> transactions;

@Inject
private ContainmentIndex containmentIndex;
Expand All @@ -52,10 +54,32 @@ public class TransactionManagerImpl implements TransactionManager {
private EventAccumulator eventAccumulator;

TransactionManagerImpl() {
transactions = new HashMap<>();
transactions = new ConcurrentHashMap<>();
}

// TODO Add a timer to periodically rollback and cleanup expired transaction?
/**
* Periodically scan for closed transactions for cleanup
*/
@Scheduled(fixedDelayString = "#{systemProperties['fcrepo.session.timeout'] ?: 180000}")
public void cleanupClosedTransactions() {
final var txIt = transactions.entrySet().iterator();
while (txIt.hasNext()) {
final var txEntry = txIt.next();
final var tx = txEntry.getValue();

// Cleanup if transaction is closed and past its expiration time
if (tx.isCommitted() || tx.isRolledBack()) {
if (tx.hasExpired()) {
txIt.remove();
}
} else if (tx.hasExpired()) {
// If the tx has expired but is not already closed, then rollback
// but don't immediately remove it from the list of transactions
// so that the rolled back status can be checked
tx.rollback();
}
}
}

@Override
public synchronized Transaction create() {
Expand All @@ -74,7 +98,6 @@ public Transaction get(final String transactionId) {
final Transaction transaction = transactions.get(transactionId);
if (transaction.hasExpired()) {
transaction.rollback();
transactions.remove(transactionId);
throw new TransactionClosedException("Transaction with transactionId: " + transactionId +
" expired at " + transaction.getExpires() + "!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.springframework.test.util.ReflectionTestUtils.setField;

import org.fcrepo.kernel.api.ContainmentIndex;
import org.fcrepo.kernel.api.TransactionManager;
import org.fcrepo.kernel.api.exception.TransactionClosedException;
import org.fcrepo.kernel.api.exception.TransactionNotFoundException;
import org.fcrepo.kernel.api.exception.TransactionRuntimeException;
import org.fcrepo.kernel.api.observer.EventAccumulator;
import org.fcrepo.persistence.api.PersistentStorageSession;
import org.fcrepo.persistence.api.PersistentStorageSessionManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -45,7 +49,7 @@ public class TransactionManagerImplTest {

private TransactionImpl testTx;

private TransactionManager testTxManager;
private TransactionManagerImpl testTxManager;

@Mock
private PersistentStorageSessionManager pssManager;
Expand All @@ -69,6 +73,11 @@ public void setUp() {
testTx = (TransactionImpl) testTxManager.create();
}

@After
public void cleanup() {
System.clearProperty(TransactionImpl.TIMEOUT_SYSTEM_PROPERTY);
}

@Test
public void testCreateTransaction() {
testTx = (TransactionImpl) testTxManager.create();
Expand All @@ -88,8 +97,110 @@ public void testGetTransactionWithInvalidID() {
}

@Test(expected = TransactionRuntimeException.class)
public void testGetExpiredTransaction() {
public void testGetExpiredTransaction() throws Exception {
testTx.expire();
testTxManager.get(testTx.getId());
try {
testTxManager.get(testTx.getId());
} finally {
// Make sure rollback is triggered
verify(psSession).rollback();
}
}

@Test
public void testCleanupClosedTransactions() {
System.setProperty(TransactionImpl.TIMEOUT_SYSTEM_PROPERTY, "10000");

final var commitTx = testTxManager.create();
commitTx.commit();
final var continuingTx = testTxManager.create();
final var rollbackTx = testTxManager.create();
rollbackTx.rollback();

// verify that transactions retrievable before cleanup
try {
testTxManager.get(commitTx.getId());
fail("Transaction must be committed");
} catch(final TransactionClosedException e) {
//expected
}
try {
testTxManager.get(rollbackTx.getId());
fail("Transaction must be rolled back");
} catch(final TransactionClosedException e) {
//expected
}

assertNotNull("Continuing transaction must be present",
testTxManager.get(continuingTx.getId()));

testTxManager.cleanupClosedTransactions();

// Verify that the closed transactions are stick around since they haven't expired yet
try {
testTxManager.get(commitTx.getId());
fail("Transaction must be present but committed");
} catch(final TransactionClosedException e) {
//expected
}
try {
testTxManager.get(rollbackTx.getId());
fail("Transaction must be present but rolled back");
} catch(final TransactionClosedException e) {
//expected
}

// Force expiration of the closed transactions, rather than waiting for it
commitTx.expire();
rollbackTx.expire();
testTxManager.cleanupClosedTransactions();

// verify that closed transactions cleanedup
try {
testTxManager.get(commitTx.getId());
fail("Committed transaction was not cleaned up");
} catch (final TransactionNotFoundException e) {
//expected
}
try {
testTxManager.get(rollbackTx.getId());
fail("Rolled back transaction was not cleaned up");
} catch (final TransactionNotFoundException e) {
//expected
}

assertNotNull("Continuing transaction must be present",
testTxManager.get(continuingTx.getId()));
}

// Check that the scheduled cleanup process rolls back expired transactions, but leaves
// them around until the next cleanup call so that they can be queried.
@Test
public void testCleanupExpiringTransaction() throws Exception {
System.setProperty(TransactionImpl.TIMEOUT_SYSTEM_PROPERTY, "0");

final var expiringTx = testTxManager.create();

Thread.sleep(100);

testTxManager.cleanupClosedTransactions();

try {
testTxManager.get(expiringTx.getId());
fail("Transaction must be expired");
} catch(final TransactionClosedException e) {
//expected
}

verify(psSession).rollback();

testTxManager.cleanupClosedTransactions();

try {
testTxManager.get(expiringTx.getId());
fail("Expired transaction was not cleaned up");
} catch (final TransactionNotFoundException e) {
//expected
}
}
}

0 comments on commit 37cc1b4

Please sign in to comment.