Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modifies FATE to use a single thread to find work #4042

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -148,11 +150,6 @@ public long create() {
return txid;
}

@Override
public FateTxStore<T> reserve() {
return new AgeOffFateTxStore(store.reserve());
}

@Override
public FateTxStore<T> reserve(long tid) {
return new AgeOffFateTxStore(store.reserve(tid));
Expand Down Expand Up @@ -204,4 +201,9 @@ public ReadOnlyFateTxStore<T> read(long tid) {
public List<Long> list() {
return store.list();
}

@Override
public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
return store.runnable(keepWaiting);
}
}
125 changes: 123 additions & 2 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.core.fate;

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
Expand All @@ -32,6 +33,8 @@

import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -48,10 +51,13 @@
import org.apache.accumulo.core.util.ShutdownUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.thrift.TApplicationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

/**
* Fault tolerant executor
*/
Expand All @@ -68,25 +74,128 @@ public class Fate<T> {
private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);

private final AtomicBoolean keepRunning = new AtomicBoolean(true);
private final BlockingQueue<Long> workQueue;
private final SignalCount idleWorkerCount = new SignalCount();
private final Thread workFinder;

public enum TxInfo {
TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE
}

private class SignalCount {
long count;

synchronized void increment() {
count++;
this.notifyAll();
}

synchronized void decrement() {
Preconditions.checkState(count > 0);
count--;
this.notifyAll();
}

synchronized void waitTillNonZero() {
while (count == 0 && keepRunning.get()) {
try {
wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
}

}

/**
* A single thread that finds transactions to work on and queues them up. Do not want each worker
* thread going to the store and looking for work as it would place more load on the store.
*/
private class WorkFinder implements Runnable {

@Override
public void run() {

try {

while (keepRunning.get()) {

while (!workQueue.isEmpty() && keepRunning.get()) {
// wait till there is at least one worker that is looking for work and the queue is
// empty
idleWorkerCount.waitTillNonZero();
}

var iter = store.runnable(keepRunning);

while (iter.hasNext() && keepRunning.get()) {
Long txid = iter.next();
try {
while (keepRunning.get()) {
if (workQueue.offer(txid, 100, MILLISECONDS)) {
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
}
} catch (Exception e) {
if (keepRunning.get()) {
log.warn("Failure while attempting to find work for fate", e);
} else {
log.debug("Failure while attempting to find work for fate", e);
}

workQueue.clear();
}
}
}

private class TransactionRunner implements Runnable {

private Optional<FateTxStore<T>> reserveFateTx() throws InterruptedException {
idleWorkerCount.increment();
try {
while (keepRunning.get()) {
var unreservedTid = workQueue.poll(100, MILLISECONDS);

if (unreservedTid == null) {
continue;
}
var optionalopStore = store.tryReserve(unreservedTid);
if (optionalopStore.isPresent()) {
return optionalopStore;
}
}

return Optional.empty();
} finally {
idleWorkerCount.decrement();
}
}

@Override
public void run() {
while (keepRunning.get()) {
long deferTime = 0;
FateTxStore<T> txStore = null;
try {
txStore = store.reserve();
var optionalopStore = reserveFateTx();
if (optionalopStore.isPresent()) {
txStore = optionalopStore.orElseThrow();
} else {
continue;
}
TStatus status = txStore.getStatus();
Repo<T> op = txStore.top();
if (status == FAILED_IN_PROGRESS) {
processFailed(txStore, op);
} else {
} else if (status == SUBMITTED || status == IN_PROGRESS) {
Repo<T> prevOp = null;
try {
deferTime = op.isReady(txStore.getID(), environment);
Expand Down Expand Up @@ -231,6 +340,9 @@ public Fate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStr
this.environment = environment;
final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf,
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
// TODO this queue does not resize when config changes
this.workQueue =
new ArrayBlockingQueue<>(conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) * 4);
this.fatePoolWatcher =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> {
Expand All @@ -257,6 +369,9 @@ public Fate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStr
}
}, 3, SECONDS));
this.executor = pool;

this.workFinder = Threads.createThread("Fate work finder", new WorkFinder());
this.workFinder.start();
}

// get a transaction id back to the requester before doing any work
Expand Down Expand Up @@ -399,6 +514,12 @@ public void shutdown() {
if (executor != null) {
executor.shutdown();
}
workFinder.interrupt();
try {
workFinder.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}
17 changes: 5 additions & 12 deletions core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
*/
long create();

/**
* An interface that allows read/write access to the data related to a single fate operation.
*/
interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
@Override
Repo<T> top();
Expand Down Expand Up @@ -81,8 +84,8 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
* upon successful return the store now controls the referenced transaction id. caller should no
* longer interact with it.
*
* @param deferTime time in millis to keep this transaction out of the pool used in the
* {@link #reserve() reserve} method. must be non-negative.
* @param deferTime time in millis to keep this transaction from being returned by
* {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. Must be non-negative.
*/
void unreserve(long deferTime);
}
Expand All @@ -104,14 +107,4 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
*/
FateTxStore<T> reserve(long tid);

/**
* Reserve a transaction that is IN_PROGRESS or FAILED_IN_PROGRESS.
*
* Reserving a transaction id ensures that nothing else in-process interacting via the same
* instance will be operating on that transaction id.
*
* @return a transaction id that is safe to interact with, chosen by the store.
*/
FateTxStore<T> reserve();

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.io.Serializable;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Read only access to a Transaction Store.
Expand Down Expand Up @@ -121,4 +123,11 @@ interface ReadOnlyFateTxStore<T> {
* @return all outstanding transactions, including those reserved by others.
*/
List<Long> list();

/**
* @return an iterator over fate op ids that are (IN_PROGRESS or FAILED_IN_PROGRESS) and
* unreserved. This method will block until it finds something that is runnable or until
* the keepWaiting parameter is false.
*/
Iterator<Long> runnable(AtomicBoolean keepWaiting);
}
Loading