Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fate reservations moved out of memory #4524

Merged
merged 24 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4a062af
Fate reservations moved out of memory
kevinrr888 May 3, 2024
e6671fb
Changes:
kevinrr888 Jun 3, 2024
8d72860
Merge branch 'elasticity' into elasticity-feature-4131
kevinrr888 Jun 3, 2024
47d16b0
Changes:
kevinrr888 Jun 3, 2024
0809fd3
Changed a method call which is available in hadoop 3.3.6 but not in the
kevinrr888 Jun 3, 2024
ee6bb88
formatting
kevinrr888 Jun 3, 2024
a92095d
Changed a method call which is available in hadoop 3.3.6 but not in the
kevinrr888 Jun 3, 2024
be1ac10
Mockito -> EasyMock in MultipleStoresIT
kevinrr888 Jun 4, 2024
bc388d5
Merge branch 'elasticity' into elasticity-feature-4131
kevinrr888 Jun 25, 2024
51a7093
Addressed review comments:
kevinrr888 Jul 2, 2024
d704555
Merge branch 'elasticity' into elasticity-feature-4131
kevinrr888 Jul 2, 2024
176b9c3
Build fix
kevinrr888 Jul 2, 2024
6760035
Changes:
kevinrr888 Jul 23, 2024
0447ede
Merge branch 'elasticity' into elasticity-feature-4131
kevinrr888 Jul 24, 2024
d352678
Fixed UserFateStore.createAndReserve()
kevinrr888 Jul 24, 2024
bd1f174
Bug fixes and code quality improvements:
kevinrr888 Aug 1, 2024
d62c17e
Merge branch 'elasticity' into elasticity-feature-4131
kevinrr888 Aug 1, 2024
367bc7c
Added check to FateStoreIT.testAbsent()
kevinrr888 Aug 2, 2024
7df2f82
Trivial change to RowFateStatusFilter
kevinrr888 Aug 2, 2024
71227de
Verify err msg contains fate id in MultipleStoresIT
kevinrr888 Aug 6, 2024
9d46daa
Removed TODOs for this PR: 'TODO 4131'
kevinrr888 Aug 7, 2024
55f02d1
Merge branch 'elasticity' into elasticity-feature-4131
kevinrr888 Aug 21, 2024
3557252
removed synchronization block:
kevinrr888 Aug 21, 2024
fa73611
Merge branch 'main' into elasticity-feature-4131
kevinrr888 Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
351 changes: 145 additions & 206 deletions core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java

Large diffs are not rendered by default.

65 changes: 54 additions & 11 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,7 +75,8 @@ public class Fate<T> {
private final FateStore<T> store;
private final T environment;
private final ScheduledThreadPoolExecutor fatePoolWatcher;
private final ExecutorService executor;
private final ExecutorService transactionExecutor;
private final ExecutorService deadResCleanerExecutor;

private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);

Expand Down Expand Up @@ -323,13 +326,25 @@ protected Repo<T> executeCall(FateId fateId, Repo<T> op) throws Exception {
return next;
}

/**
* A thread that finds reservations held by dead processes and unreserves them
*/
private class DeadReservationCleaner implements Runnable {
@Override
public void run() {
if (keepRunning.get()) {
store.deleteDeadReservations();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not looked into it, but need to understand what happens if this thread dies/throws an exception. Accumulo servers processes can designate some threads as critical and that would essentially cause the manager process to die when that is done. May want to do that for this thread, if its not already done.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the DeadReservationCleaner dies, the Manager continues to run normally. I changed it to be a critical thread. When looking at this, I noticed that none of the other Fate threads are critical. It seems that maybe they should be? I can create a follow on issue if so. If the WorkFinder or any of the actual workers (TransactionRunner) die, Manager will continue to run normally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be good to open a follow on issue about making all of the fate threads critical.

}
}
}

/**
* Creates a Fault-tolerant executor.
*
* @param toLogStrFunc A function that converts Repo to Strings that are suitable for logging
*/
public Fate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStrFunc,
AccumuloConfiguration conf) {
public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
this.store = FateLogger.wrap(store, toLogStrFunc);
this.environment = environment;
final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf,
Expand Down Expand Up @@ -360,7 +375,19 @@ public Fate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStr
}
}
}, 3, SECONDS));
this.executor = pool;
this.transactionExecutor = pool;

ScheduledExecutorService deadResCleanerExecutor = null;
if (runDeadResCleaner) {
// Create a dead reservation cleaner for this store that will periodically clean up
// reservations held by dead processes, if they exist.
deadResCleanerExecutor = ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
store.type() + "-dead-reservation-cleaner-pool");
ScheduledFuture<?> deadReservationCleaner = deadResCleanerExecutor
.scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 30, SECONDS);
ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
}
this.deadResCleanerExecutor = deadResCleanerExecutor;
keith-turner marked this conversation as resolved.
Show resolved Hide resolved

this.workFinder = Threads.createThread("Fate work finder", new WorkFinder());
this.workFinder.start();
Expand Down Expand Up @@ -530,21 +557,32 @@ public Stream<FateKey> list(FateKey.FateKeyType type) {
public void shutdown(long timeout, TimeUnit timeUnit) {
if (keepRunning.compareAndSet(true, false)) {
fatePoolWatcher.shutdown();
executor.shutdown();
transactionExecutor.shutdown();
workFinder.interrupt();
if (deadResCleanerExecutor != null) {
deadResCleanerExecutor.shutdown();
}
}

if (timeout > 0) {
long start = System.nanoTime();

while ((System.nanoTime() - start) < timeUnit.toNanos(timeout)
&& (workFinder.isAlive() || !executor.isTerminated())) {
&& (workFinder.isAlive() || !transactionExecutor.isTerminated()
|| (deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated()))) {
try {
if (!executor.awaitTermination(1, SECONDS)) {
if (!transactionExecutor.awaitTermination(1, SECONDS)) {
log.debug("Fate {} is waiting for worker threads to terminate", store.type());
continue;
}

if (deadResCleanerExecutor != null
&& !deadResCleanerExecutor.awaitTermination(1, SECONDS)) {
log.debug("Fate {} is waiting for dead reservation cleaner thread to terminate",
store.type());
continue;
}

workFinder.join(1_000);
if (workFinder.isAlive()) {
log.debug("Fate {} is waiting for work finder thread to terminate", store.type());
Expand All @@ -555,15 +593,20 @@ public void shutdown(long timeout, TimeUnit timeUnit) {
}
}

if (workFinder.isAlive() || !executor.isTerminated()) {
if (workFinder.isAlive() || !transactionExecutor.isTerminated()
|| (deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated())) {
log.warn(
"Waited for {}ms for all fate {} background threads to stop, but some are still running. workFinder:{} executor:{}",
"Waited for {}ms for all fate {} background threads to stop, but some are still running. workFinder:{} transactionExecutor:{} deadResCleanerExecutor:{}",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), store.type(),
workFinder.isAlive(), !executor.isTerminated());
workFinder.isAlive(), !transactionExecutor.isTerminated(),
(deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated()));
}
}

// interrupt the background threads
executor.shutdownNow();
transactionExecutor.shutdownNow();
if (deadResCleanerExecutor != null) {
deadResCleanerExecutor.shutdownNow();
}
}
}
125 changes: 124 additions & 1 deletion core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,20 @@
*/
package org.apache.accumulo.core.fate;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

import org.apache.accumulo.core.fate.user.FateMutatorImpl;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.hadoop.io.DataInputBuffer;

/**
* Transaction Store: a place to save transactions
Expand Down Expand Up @@ -107,11 +118,123 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
void unreserve(Duration deferTime);
}

/**
* The value stored to indicate a FATE transaction ID ({@link FateId}) has been reserved
*/
class FateReservation {

// The LockID (provided by the Manager running the FATE which uses this store) which is used for
// identifying dead Managers, so their reservations can be deleted and picked up again since
// they can no longer be worked on.
private final ZooUtil.LockID lockID;
// The UUID generated on a reservation attempt (tryReserve()) used to uniquely identify that
// attempt. This is useful for the edge case where the reservation is sent to the server
// (Tablet Server for UserFateStore and the ZooKeeper Server for MetaFateStore), but the server
// dies before the store receives the response. It allows us to determine if the reservation
// was successful and was written by this reservation attempt (could have been successfully
// reserved by another attempt or not reserved at all, in which case, we wouldn't want to
// expose a FateTxStore).
private final UUID reservationUUID;
private final byte[] serialized;

private FateReservation(ZooUtil.LockID lockID, UUID reservationUUID) {
this.lockID = Objects.requireNonNull(lockID);
this.reservationUUID = Objects.requireNonNull(reservationUUID);
this.serialized = serialize(lockID, reservationUUID);
}

public static FateReservation from(ZooUtil.LockID lockID, UUID reservationUUID) {
return new FateReservation(lockID, reservationUUID);
}

/**
* @param serializedFateRes the value present in the table for the reservation column
* @return true if the array represents a valid serialized FateReservation object, false if it
* represents an unreserved value, error otherwise
*/
public static boolean isFateReservation(byte[] serializedFateRes) {
if (Arrays.equals(serializedFateRes, FateMutatorImpl.NOT_RESERVED)) {
return false;
}
deserialize(serializedFateRes);
return true;
}

public ZooUtil.LockID getLockID() {
return lockID;
}

public UUID getReservationUUID() {
return reservationUUID;
}

public byte[] getSerialized() {
return serialized;
}

private static byte[] serialize(ZooUtil.LockID lockID, UUID reservationUUID) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
dos.writeUTF(lockID.serialize("/"));
dos.writeUTF(reservationUUID.toString());
dos.close();
DomGarguilo marked this conversation as resolved.
Show resolved Hide resolved
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static FateReservation deserialize(byte[] serialized) {
try (DataInputBuffer buffer = new DataInputBuffer()) {
buffer.reset(serialized, serialized.length);
ZooUtil.LockID lockID = new ZooUtil.LockID("", buffer.readUTF());
UUID reservationUUID = UUID.fromString(buffer.readUTF());
return new FateReservation(lockID, reservationUUID);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static boolean locksAreEqual(ZooUtil.LockID lockID1, ZooUtil.LockID lockID2) {
return lockID1.serialize("/").equals(lockID2.serialize("/"));
}

@Override
public String toString() {
return lockID.serialize("/") + ":" + reservationUUID;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof FateReservation) {
FateReservation other = (FateReservation) obj;
return Arrays.equals(this.getSerialized(), other.getSerialized());
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(lockID, reservationUUID);
}
}

/**
* Deletes the current reservations which were reserved by a now dead Manager. These reservations
* can no longer be worked on so their reservation should be deleted, so they can be picked up and
* worked on again.
*/
void deleteDeadReservations();

/**
* Attempt to reserve the fate transaction.
*
* @param fateId The FateId
* @return true if reserved by this call, false if already reserved
* @return An Optional containing the {@link FateTxStore} if the transaction was successfully
* reserved, or an empty Optional if the transaction was not able to be reserved.
*/
Optional<FateTxStore<T>> tryReserve(FateId fateId);

Expand Down
Loading