Skip to content

Commit

Permalink
Changes:
Browse files Browse the repository at this point in the history
- Combined the LockID and reservation attempt UUID into one new object: FateReservation
- Now cover edge case with tryReserve() where a write may make it to the server, but the server dies before a response is received
- New FATE Thread: DeadReservationCleaner which deletes reservations held by Managers that have since died
- Created new classes: ColumnValueMappingIterator and ReservationMappingIterator. ColumnValueMappingIterator abstracts out the common functionality of ReservationMappingIterator and StatusMappingIterator. ReservationMappingIterator is an iterator used for determining if the reservation column for a FateId has a FateReservation set or not
- Expanded/improved/simplified MultipleStoresIT tests
  • Loading branch information
kevinrr888 committed Jun 3, 2024
1 parent 4a062af commit e6671fb
Show file tree
Hide file tree
Showing 31 changed files with 1,187 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -44,6 +43,9 @@
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.time.NanoTime;
Expand All @@ -54,6 +56,10 @@

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

// TODO 4131 should probably add support to AbstractFateStore, MetaFateStore,
// and UserFateStore to accept null lockID and zooCache (maybe make these fields
// Optional<>). This could replace the current createDummyLockID(). This support
// is needed since MFS and UFS aren't always created in the context of a Manager.
public abstract class AbstractFateStore<T> implements FateStore<T> {

private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class);
Expand All @@ -70,6 +76,9 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) {
}
};

// The ZooKeeper lock for the process that's running this store instance
protected final ZooUtil.LockID lockID;
protected final ZooCache zooCache;
protected final Map<FateId,NanoTime> deferred;
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();
Expand All @@ -82,13 +91,20 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) {
private final AtomicInteger concurrentStatusChangeCallers = new AtomicInteger(0);

public AbstractFateStore() {
this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
this(null, null, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
}

public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) {
public AbstractFateStore(ZooUtil.LockID lockID, ZooCache zooCache) {
this(lockID, zooCache, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
}

public AbstractFateStore(ZooUtil.LockID lockID, ZooCache zooCache, int maxDeferred,
FateIdGenerator fateIdGenerator) {
this.maxDeferred = maxDeferred;
this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator);
this.deferred = Collections.synchronizedMap(new HashMap<>());
this.lockID = lockID;
this.zooCache = zooCache;
}

public static byte[] serialize(Object o) {
Expand Down Expand Up @@ -317,6 +333,11 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
return txStore;
}

@Override
public boolean isDeadReservation(FateReservation reservation) {
return !ServiceLock.isLockHeld(zooCache, reservation.getLockID());
}

protected abstract void create(FateId fateId, FateKey fateKey);

protected abstract Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId fateId);
Expand All @@ -329,23 +350,28 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {

protected abstract FateTxStore<T> newUnreservedFateTxStore(FateId fateId);

protected abstract boolean isReserved(FateId fateId);

// TODO 4131 is public fine for this? Public for tests
public abstract List<FateId> getReservedTxns();

protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> {
protected final FateId fateId;
protected boolean deleted;
protected FateReservation reservation;

protected TStatus observedStatus = null;

protected AbstractFateTxStoreImpl(FateId fateId) {
this.fateId = fateId;
this.deleted = false;
this.reservation = null;
}

protected abstract boolean isReserved();
protected AbstractFateTxStoreImpl(FateId fateId, FateReservation reservation) {
this.fateId = fateId;
this.deleted = false;
this.reservation = Objects.requireNonNull(reservation);
}

protected boolean isReserved() {
return this.reservation != null;
}

@Override
public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
Expand Down Expand Up @@ -482,4 +508,14 @@ protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
throw new IllegalStateException("Bad node data " + txInfo);
}
}

/**
* TODO 4131 this is a temporary method used to create a dummy lock when using a FateStore outside
* of the context of a Manager (one example is testing).
*
* @return a dummy {@link ZooUtil.LockID}
*/
public static ZooUtil.LockID createDummyLockID() {
return new ZooUtil.LockID("/path", "node", 123);
}
}
86 changes: 81 additions & 5 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,14 @@ public class Fate<T> {
private final ExecutorService executor;

private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);
private static boolean userDeadReservationCleanerRunning = false;
private static boolean metaDeadReservationCleanerRunning = false;

private final AtomicBoolean keepRunning = new AtomicBoolean(true);
private final TransferQueue<FateId> workQueue;
private final Thread workFinder;
// Will be null if this Fate instance is not running a DeadReservationCleaner
private Thread deadReservationCleaner;

public enum TxInfo {
TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
Expand Down Expand Up @@ -289,6 +293,26 @@ private void undo(FateId fateId, Repo<T> op) {

}

/**
* A thread that finds reservations held by dead processes and unreserves them. Only one thread
* runs per store type across all Fate instances (one to clean up dead reservations for
* {@link org.apache.accumulo.core.fate.user.UserFateStore UserFateStore} and one to clean up dead
* reservations for {@link MetaFateStore}).
*/
private class DeadReservationCleaner implements Runnable {
// TODO 4131 periodic check runs every 30 seconds
// Should this be longer? Shorter? A configurable Property? A function of something?
private static final long INTERVAL_MILLIS = 30_000;

@Override
public void run() {
while (keepRunning.get()) {
store.deleteDeadReservations();
UtilWaitThread.sleep(INTERVAL_MILLIS);
}
}
}

/**
* Creates a Fault-tolerant executor.
*
Expand Down Expand Up @@ -332,6 +356,33 @@ public Fate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStr
this.workFinder.start();
}

/**
* Starts a thread that periodically cleans up "dead reservations" (see
* {@link FateStore#deleteDeadReservations()}). Only one thread is started per store type
* ({@link FateInstanceType}) for subsequent calls to this method.
*/
public void startDeadReservationCleaner() {
// TODO 4131 this is not ideal starting this thread in its own start method, but the other
// threads in the constructor. However, starting this thread in the constructor causes
// a Maven build failure, and do not want to move starting the other threads into a
// method in this PR... should be done standalone (see issue#4609).

if ((store.type().equals(FateInstanceType.USER) && !userDeadReservationCleanerRunning)
|| store.type().equals(FateInstanceType.META) && !metaDeadReservationCleanerRunning) {
if (store.type().equals(FateInstanceType.USER)) {
this.deadReservationCleaner =
Threads.createThread("USER dead reservation cleaner", new DeadReservationCleaner());
userDeadReservationCleanerRunning = true;
} else if (store.type().equals(FateInstanceType.META)) {
this.deadReservationCleaner =
Threads.createThread("META dead reservation cleaner", new DeadReservationCleaner());
metaDeadReservationCleanerRunning = true;
}
this.deadReservationCleaner.start();
}

}

// get a transaction id back to the requester before doing any work
public FateId startTransaction() {
return store.create();
Expand Down Expand Up @@ -498,13 +549,17 @@ public void shutdown(long timeout, TimeUnit timeUnit) {
fatePoolWatcher.shutdown();
executor.shutdown();
workFinder.interrupt();
if (deadReservationCleaner != null) {
deadReservationCleaner.interrupt();
}
}

if (timeout > 0) {
long start = System.nanoTime();

while ((System.nanoTime() - start) < timeUnit.toNanos(timeout)
&& (workFinder.isAlive() || !executor.isTerminated())) {
while ((System.nanoTime() - start) < timeUnit.toNanos(timeout) && (workFinder.isAlive()
|| (deadReservationCleaner != null && deadReservationCleaner.isAlive())
|| !executor.isTerminated())) {
try {
if (!executor.awaitTermination(1, SECONDS)) {
log.debug("Fate {} is waiting for worker threads to terminate", store.type());
Expand All @@ -516,19 +571,40 @@ public void shutdown(long timeout, TimeUnit timeUnit) {
log.debug("Fate {} is waiting for work finder thread to terminate", store.type());
workFinder.interrupt();
}

if (deadReservationCleaner != null) {
deadReservationCleaner.join(1_000);
}
if (deadReservationCleaner != null && deadReservationCleaner.isAlive()) {
log.debug("Fate {} is waiting for dead reservation cleaner thread to terminate",
store.type());
deadReservationCleaner.interrupt();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

if (workFinder.isAlive() || !executor.isTerminated()) {
if (workFinder.isAlive()
|| (deadReservationCleaner != null && deadReservationCleaner.isAlive())
|| !executor.isTerminated()) {
log.warn(
"Waited for {}ms for all fate {} background threads to stop, but some are still running. workFinder:{} executor:{}",
"Waited for {}ms for all fate {} background threads to stop, but some are still running. workFinder:{} deadReservationCleaner:{} executor:{}",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), store.type(),
workFinder.isAlive(), !executor.isTerminated());
workFinder.isAlive(),
(deadReservationCleaner != null && deadReservationCleaner.isAlive()),
!executor.isTerminated());
}
}

// Update that USER/META dead reservation cleaner is no longer running
if (deadReservationCleaner != null && !deadReservationCleaner.isAlive()) {
if (store.type().equals(FateInstanceType.USER)) {
userDeadReservationCleanerRunning = false;
} else if (store.type().equals(FateInstanceType.META)) {
metaDeadReservationCleanerRunning = false;
}
}
// interrupt the background threads
executor.shutdownNow();
}
Expand Down
Loading

0 comments on commit e6671fb

Please sign in to comment.