Skip to content

Commit

Permalink
Fate reservations moved out of memory
Browse files Browse the repository at this point in the history
- Reservations for MetaFateStore were moved out of memory into ZooKeeper
- Reservations for UserFateStore were moved out of memory into the Accumulo Fate table
- Added test MultipleStoresIT
This commit is one part needed for having Fate be distributed
  • Loading branch information
kevinrr888 committed May 3, 2024
1 parent 4081f46 commit 4a062af
Show file tree
Hide file tree
Showing 19 changed files with 820 additions and 190 deletions.
219 changes: 105 additions & 114 deletions core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,24 @@
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.time.NanoTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -68,17 +70,16 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) {
}
};

protected final Set<FateId> reserved;
protected final Map<FateId,NanoTime> deferred;
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();
private final FateIdGenerator fateIdGenerator;

// This is incremented each time a transaction was unreserved that was non new
protected final SignalCount unreservedNonNewCount = new SignalCount();

// This is incremented each time a transaction is unreserved that was runnable
protected final SignalCount unreservedRunnableCount = new SignalCount();
private final SignalCount unreservedRunnableCount = new SignalCount();

// Keeps track of the number of concurrent callers to waitForStatusChange()
private final AtomicInteger concurrentStatusChangeCallers = new AtomicInteger(0);

public AbstractFateStore() {
this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
Expand All @@ -87,8 +88,7 @@ public AbstractFateStore() {
public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) {
this.maxDeferred = maxDeferred;
this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator);
this.reserved = new HashSet<>();
this.deferred = new HashMap<>();
this.deferred = Collections.synchronizedMap(new HashMap<>());
}

public static byte[] serialize(Object o) {
Expand All @@ -115,38 +115,26 @@ public static Object deserialize(byte[] ser) {
}
}

/**
* Attempt to reserve the fate transaction.
*
* @param fateId The FateId
* @return An Optional containing the FateTxStore if the transaction was successfully reserved, or
* an empty Optional if the transaction was already reserved.
*/
@Override
public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
synchronized (this) {
if (!reserved.contains(fateId)) {
return Optional.of(reserve(fateId));
}
return Optional.empty();
}
}

@Override
public FateTxStore<T> reserve(FateId fateId) {
synchronized (AbstractFateStore.this) {
while (reserved.contains(fateId)) {
try {
AbstractFateStore.this.wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
Preconditions.checkState(!_getStatus(fateId).equals(TStatus.UNKNOWN),
"Attempted to reserve a tx that does not exist: " + fateId);
var retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25))
.incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(30)).backOffFactor(1.5)
.logInterval(Duration.ofMinutes(3)).createRetry();
Optional<FateTxStore<T>> reserveAttempt = tryReserve(fateId);
while (reserveAttempt.isEmpty()) {
try {
retry.waitForNextAttempt(log, "Attempting to reserve " + fateId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalArgumentException(e);
}

reserved.add(fateId);
return newFateTxStore(fateId, true);
reserveAttempt = tryReserve(fateId);
}
retry.logCompletion(log, "Attempting to reserve " + fateId);

return reserveAttempt.orElseThrow();
}

@Override
Expand All @@ -161,18 +149,16 @@ public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) {
try (Stream<FateIdStatus> transactions = getTransactions()) {
transactions.filter(fateIdStatus -> isRunnable(fateIdStatus.getStatus()))
.map(FateIdStatus::getFateId).filter(fateId -> {
synchronized (AbstractFateStore.this) {
var deferredTime = deferred.get(fateId);
if (deferredTime != null) {
if (deferredTime.elapsed().isNegative()) {
// negative elapsed time indicates the deferral time is in the future
return false;
} else {
deferred.remove(fateId);
}
var deferredTime = deferred.get(fateId);
if (deferredTime != null) {
if (deferredTime.elapsed().isNegative()) {
// negative elapsed time indicates the deferral time is in the future
return false;
} else {
deferred.remove(fateId);
}
return !reserved.contains(fateId);
}
return !isReserved(fateId);
}).forEach(fateId -> {
seen.incrementAndGet();
idConsumer.accept(fateId);
Expand Down Expand Up @@ -218,7 +204,7 @@ public Stream<FateIdStatus> list() {

@Override
public ReadOnlyFateTxStore<T> read(FateId fateId) {
return newFateTxStore(fateId, false);
return newUnreservedFateTxStore(fateId);
}

protected boolean isRunnable(TStatus status) {
Expand Down Expand Up @@ -249,13 +235,11 @@ public int getDeferredCount() {
// This method is primarily used right now for unit testing but
// if this synchronization becomes an issue we could add an atomic
// counter instead to track it separately so we don't need to lock
synchronized (AbstractFateStore.this) {
return deferred.size();
}
return deferred.size();
}

private Optional<FateId> create(FateKey fateKey) {
FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey);
FateId fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);

try {
create(fateId, fateKey);
Expand Down Expand Up @@ -286,7 +270,13 @@ private Optional<FateId> create(FateKey fateKey) {

@Override
public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey);
// TODO 4131 not confident about this new implementation of createAndReserve.
// Previously, you could reserve before creation, but with the new impl of reservations
// being stored in ZK (MetaFateStore) and the Accumulo Fate table (UserFateStore), creation
// is needed before reservation.
// TODO 4131 the comments in this method also need to be updated.
// Will wait until after review for this method
FateId fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
final Optional<FateTxStore<T>> txStore;

// First make sure we can reserve in memory the fateId, if not
Expand All @@ -296,34 +286,22 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
// This will create the FateTxStore before creation but this object
// is not exposed until after creation is finished so there should not
// be any errors.
final Optional<FateTxStore<T>> reservedTxStore;
synchronized (this) {
reservedTxStore = tryReserve(fateId);
}

// If present we were able to reserve so try and create
if (reservedTxStore.isPresent()) {
if (!isReserved(fateId)) {
try {
var fateIdFromCreate = create(fateKey);
if (fateIdFromCreate.isPresent()) {
Preconditions.checkState(fateId.equals(fateIdFromCreate.orElseThrow()),
"Transaction creation returned unexpected %s, expected %s", fateIdFromCreate, fateId);
txStore = reservedTxStore;
txStore = tryReserve(fateId);
} else {
// We already exist in a non-new state then un-reserve and an empty
// Optional will be returned. This is expected to happen when the
// system is busy and operations are not running, and we keep seeding them
synchronized (this) {
reserved.remove(fateId);
}
txStore = Optional.empty();
}
} catch (Exception e) {
// Clean up the reservation if the creation failed
// And then throw error
synchronized (this) {
reserved.remove(fateId);
}
if (e instanceof IllegalStateException) {
throw e;
} else {
Expand All @@ -349,91 +327,104 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {

protected abstract Optional<FateKey> getKey(FateId fateId);

protected abstract FateTxStore<T> newFateTxStore(FateId fateId, boolean isReserved);
protected abstract FateTxStore<T> newUnreservedFateTxStore(FateId fateId);

protected abstract boolean isReserved(FateId fateId);

protected abstract FateInstanceType getInstanceType();
// TODO 4131 is public fine for this? Public for tests
public abstract List<FateId> getReservedTxns();

protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> {
protected final FateId fateId;
protected final boolean isReserved;
protected boolean deleted;

protected TStatus observedStatus = null;

protected AbstractFateTxStoreImpl(FateId fateId, boolean isReserved) {
protected AbstractFateTxStoreImpl(FateId fateId) {
this.fateId = fateId;
this.isReserved = isReserved;
this.deleted = false;
}

protected abstract boolean isReserved();

@Override
public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
Preconditions.checkState(!isReserved,
"Attempted to wait for status change while reserved " + fateId);
while (true) {
Preconditions.checkState(!isReserved(),
"Attempted to wait for status change while reserved: " + fateId);
verifyReserved(false);

int currNumCallers = concurrentStatusChangeCallers.incrementAndGet();
// TODO 4131
// TODO make the max time a function of the number of concurrent callers, as the number of
// concurrent callers increases then increase the max wait time
// TODO could support signaling within this instance for known events
// TODO made the maxWait low so this would be responsive... that may put a lot of load in the
// case there are lots of things waiting...
// Made maxWait = num of curr callers
var retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25))
.incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(currNumCallers))
.backOffFactor(1.5).logInterval(Duration.ofMinutes(3)).createRetry();

long countBefore = unreservedNonNewCount.getCount();
while (true) {

TStatus status = _getStatus(fateId);
if (expected.contains(status)) {
retry.logCompletion(log, "Waiting on status change for " + fateId + " expected:"
+ expected + " status:" + status);
concurrentStatusChangeCallers.decrementAndGet();
return status;
}

unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () -> true);
try {
retry.waitForNextAttempt(log, "Waiting on status change for " + fateId + " expected:"
+ expected + " status:" + status);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
concurrentStatusChangeCallers.decrementAndGet();
throw new IllegalStateException(e);
}
}
}

@Override
public void unreserve(long deferTime, TimeUnit timeUnit) {
Duration deferDuration = Duration.of(deferTime, timeUnit.toChronoUnit());
Preconditions.checkState(isReserved(),
"Attempted to unreserve a transaction that was not reserved: " + fateId);

Duration deferDuration = Duration.of(deferTime, timeUnit.toChronoUnit());
if (deferDuration.isNegative()) {
throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
}

synchronized (AbstractFateStore.this) {
if (!reserved.remove(fateId)) {
throw new IllegalStateException("Tried to unreserve id that was not reserved " + fateId);
}

// notify any threads waiting to reserve
AbstractFateStore.this.notifyAll();

// If deferred map has overflowed then skip adding to the deferred map
// and clear the map and set the flag. This will cause the next execution
// of runnable to process all the transactions and to not defer as we
// have a large backlog and want to make progress
if (deferDuration.compareTo(Duration.ZERO) > 0 && !deferredOverflow.get()) {
if (deferred.size() >= maxDeferred) {
log.info(
"Deferred map overflowed with size {}, clearing and setting deferredOverflow to true",
deferred.size());
deferredOverflow.set(true);
deferred.clear();
} else {
deferred.put(fateId, NanoTime.nowPlus(deferDuration));
}
// If deferred map has overflowed then skip adding to the deferred map
// and clear the map and set the flag. This will cause the next execution
// of runnable to process all the transactions and to not defer as we
// have a large backlog and want to make progress
if (deferDuration.compareTo(Duration.ZERO) > 0 && !isDeferredOverflow()) {
if (deferred.size() >= maxDeferred) {
log.info(
"Deferred map overflowed with size {}, clearing and setting deferredOverflow to true",
deferred.size());
deferredOverflow.set(true);
deferred.clear();
} else {
deferred.put(fateId, NanoTime.nowPlus(deferDuration));
}
}

unreserve();

if (observedStatus != null && isRunnable(observedStatus)) {
unreservedRunnableCount.increment();
}

if (observedStatus != TStatus.NEW) {
unreservedNonNewCount.increment();
}
}

protected void verifyReserved(boolean isWrite) {
if (!isReserved && isWrite) {
throw new IllegalStateException("Attempted write on unreserved FATE transaction.");
}
protected abstract void unreserve();

if (isReserved) {
synchronized (AbstractFateStore.this) {
if (!reserved.contains(fateId)) {
throw new IllegalStateException("Tried to operate on unreserved transaction " + fateId);
}
}
protected void verifyReserved(boolean isWrite) {
if (!isReserved() && isWrite) {
throw new IllegalStateException(
"Attempted write on unreserved FATE transaction: " + fateId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
* Attempt to reserve the fate transaction.
*
* @param fateId The FateId
* @return true if reserved by this call, false if already reserved
* @return An Optional containing the {@link FateTxStore} if the transaction was successfully
* reserved, or an empty Optional if the transaction was not able to be reserved.
*/
Optional<FateTxStore<T>> tryReserve(FateId fateId);

Expand Down
Loading

0 comments on commit 4a062af

Please sign in to comment.