Skip to content

Commit

Permalink
Add LogSorter to Compactor and ScanServer (#4239)
Browse files Browse the repository at this point in the history
Adds LogSorter to ScanServer and Compactor such that
all available processes will participate when a failure
occurs that leaves a Tablet with walogs. Modified LogSorter
and DistributedWorkQueue so that the Compactor could call
the LogSorter and have its tasks execute serially in the
Compactor thread.

Fixes #4232
  • Loading branch information
dlmarion authored Feb 14, 2024
1 parent 101ae81 commit 5d12597
Show file tree
Hide file tree
Showing 9 changed files with 371 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ public enum Property {
@Experimental
SSERV_THREADCHECK("sserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION,
"The time between adjustments of the thrift server thread pool.", "2.1.0"),
@Experimental
SSERV_WAL_SORT_MAX_CONCURRENT("sserver.wal.sort.concurrent.max", "2", PropertyType.COUNT,
"The maximum number of threads to use to sort logs during recovery.", "4.0.0"),
// properties that are specific to tablet server behavior
TSERV_PREFIX("tserver.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the tablet servers.", "1.3.5"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,4 +519,8 @@ public void kill(ServerType server, String hostname) throws IOException {
stop(server, hostname);
}

public List<Process> getTabletServers(String resourceGroup) {
return tabletServerProcesses.get(resourceGroup);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -57,20 +58,27 @@ public class DistributedWorkQueue {

private static final Logger log = LoggerFactory.getLogger(DistributedWorkQueue.class);

private ThreadPoolExecutor threadPool;
private ZooReaderWriter zoo;
private String path;
private ServerContext context;
private long timerInitialDelay, timerPeriod;

private AtomicInteger numTask = new AtomicInteger(0);

private void lookForWork(final Processor processor, List<String> children) {
/**
* Finds a child in {@code children} that is not currently being processed and adds a Runnable to
* the {@code executor} that invokes the {@code processor}. The Runnable will recursively call
* {@code lookForWork} after it invokes the {@code processor} such that it will continue to look
* for children that need work until that condition is exhausted. This method will return early if
* the number of currently running tasks is larger than {@code maxThreads}.
*/
private void lookForWork(final Processor processor, final List<String> children,
final ExecutorService executor, final int maxThreads) {
if (children.isEmpty()) {
return;
}

if (numTask.get() >= threadPool.getCorePoolSize()) {
if (numTask.get() >= maxThreads) {
return;
}

Expand Down Expand Up @@ -102,7 +110,7 @@ private void lookForWork(final Processor processor, List<String> children) {
}

// Great... we got the lock, but maybe we're too busy
if (numTask.get() >= threadPool.getCorePoolSize()) {
if (numTask.get() >= maxThreads) {
zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
break;
}
Expand Down Expand Up @@ -143,7 +151,7 @@ public void run() {

try {
// its important that this is called after numTask is decremented
lookForWork(processor, zoo.getChildren(path));
lookForWork(processor, zoo.getChildren(path), executor, maxThreads);
} catch (KeeperException e) {
log.error("Failed to look for work", e);
} catch (InterruptedException e) {
Expand All @@ -153,7 +161,7 @@ public void run() {
};

numTask.incrementAndGet();
threadPool.execute(task);
executor.execute(task);

}
} catch (Exception t) {
Expand Down Expand Up @@ -186,40 +194,62 @@ public ServerContext getContext() {
return context;
}

public void startProcessing(final Processor processor, ThreadPoolExecutor executorService)
throws KeeperException, InterruptedException {
public long getCheckInterval() {
return this.timerPeriod;
}

threadPool = executorService;
/**
* Finds the children at the path passed in the constructor and calls {@code lookForWork} which
* will attempt to process all of the currently available work
*/
public void processExistingWork(final Processor processor, ExecutorService executor,
final int maxThreads, boolean setWatch) throws KeeperException, InterruptedException {

zoo.mkdirs(path);
zoo.mkdirs(path + "/" + LOCKS_NODE);

List<String> children = zoo.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case NodeChildrenChanged:
if (event.getPath().equals(path)) {
try {
lookForWork(processor, zoo.getChildren(path, this));
} catch (KeeperException e) {
log.error("Failed to look for work at path {}; {}", path, event, e);
} catch (InterruptedException e) {
log.info("Interrupted looking for work at path {}; {}", path, event, e);
List<String> children = null;
if (setWatch) {
children = zoo.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case NodeChildrenChanged:
if (event.getPath().equals(path)) {
try {
lookForWork(processor, zoo.getChildren(path, this), executor, maxThreads);
} catch (KeeperException e) {
log.error("Failed to look for work at path {}; {}", path, event, e);
} catch (InterruptedException e) {
log.info("Interrupted looking for work at path {}; {}", path, event, e);
}
} else {
log.info("Unexpected path for NodeChildrenChanged event watching path {}; {}", path,
event);
}
} else {
log.info("Unexpected path for NodeChildrenChanged event watching path {}; {}", path,
event);
}
break;
default:
log.info("Unexpected event watching path {}; {}", path, event);
break;
break;
default:
log.info("Unexpected event watching path {}; {}", path, event);
break;
}
}
}
});
});
} else {
children = zoo.getChildren(path);
}

lookForWork(processor, children, executor, maxThreads);

}

/**
* Calls {@code runOne} to attempt to process all currently available work, then adds a background
* thread that looks for work in the future.
*/
public void processExistingAndFuture(final Processor processor,
ThreadPoolExecutor executorService) throws KeeperException, InterruptedException {

lookForWork(processor, children);
processExistingWork(processor, executorService, executorService.getCorePoolSize(), true);

// Add a little jitter to avoid all the tservers slamming zookeeper at once
ThreadPools.watchCriticalScheduledTask(
Expand All @@ -228,7 +258,8 @@ public void process(WatchedEvent event) {
public void run() {
log.debug("Looking for work in {}", path);
try {
lookForWork(processor, zoo.getChildren(path));
lookForWork(processor, zoo.getChildren(path), executorService,
executorService.getCorePoolSize());
} catch (KeeperException e) {
log.error("Failed to look for work", e);
} catch (InterruptedException e) {
Expand Down
4 changes: 4 additions & 0 deletions server/compactor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-start</artifactId>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-tserver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.tserver.log.LogSorter;
import org.apache.hadoop.fs.Path;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
Expand Down Expand Up @@ -635,6 +636,8 @@ public void run() {

final AtomicReference<Throwable> err = new AtomicReference<>();
final AtomicLong timeSinceLastCompletion = new AtomicLong(0L);
final LogSorter logSorter = new LogSorter(getContext(), getConfiguration());
long nextSortLogsCheckTime = System.currentTimeMillis();

while (!shutdown) {

Expand All @@ -649,6 +652,14 @@ public void run() {
err.set(null);
JOB_HOLDER.reset();

if (System.currentTimeMillis() > nextSortLogsCheckTime) {
// Attempt to process all existing log sorting work serially in this thread.
// When no work remains, this call will return so that we can look for compaction
// work.
LOG.debug("Checking to see if any recovery logs need sorting");
nextSortLogsCheckTime = logSorter.sortLogsIfNeeded();
}

TExternalCompactionJob job;
try {
job = getNextJob(getNextId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
import org.apache.accumulo.tserver.log.LogSorter;
import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
import org.apache.accumulo.tserver.session.MultiScanSession;
import org.apache.accumulo.tserver.session.ScanSession;
Expand Down Expand Up @@ -390,6 +391,22 @@ public void run() {

ServiceLock lock = announceExistence();

int threadPoolSize = getConfiguration().getCount(Property.SSERV_WAL_SORT_MAX_CONCURRENT);
if (threadPoolSize > 0) {
final LogSorter logSorter = new LogSorter(context, getConfiguration());
try {
// Attempt to process all existing log sorting work and start a background
// thread to look for log sorting work in the future
logSorter.startWatchingForRecoveryLogs(threadPoolSize);
} catch (Exception ex) {
log.error("Error starting LogSorter");
throw new RuntimeException(ex);
}
} else {
log.info(
"Log sorting for tablet recovery is disabled, SSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
}

try {
while (!serverStopRequested) {
UtilWaitThread.sleep(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,12 +602,22 @@ public void run() {
throw new RuntimeException(e);
}

try {
logSorter.startWatchingForRecoveryLogs();
} catch (Exception ex) {
log.error("Error setting watches for recoveries");
throw new RuntimeException(ex);
int threadPoolSize =
getContext().getConfiguration().getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
if (threadPoolSize > 0) {
try {
// Attempt to process all existing log sorting work and start a background
// thread to look for log sorting work in the future
logSorter.startWatchingForRecoveryLogs(threadPoolSize);
} catch (Exception ex) {
log.error("Error starting LogSorter");
throw new RuntimeException(ex);
}
} else {
log.info(
"Log sorting for tablet recovery is disabled, TSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
}

final AccumuloConfiguration aconf = getConfiguration();

final long onDemandUnloaderInterval =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;

public class LogSorter {

Expand Down Expand Up @@ -290,12 +291,30 @@ void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, in
}
}

public void startWatchingForRecoveryLogs() throws KeeperException, InterruptedException {
int threadPoolSize = this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
/**
* Sort any logs that need sorting in the current thread.
*
* @return The time in millis when the next check can be done.
*/
public long sortLogsIfNeeded() throws KeeperException, InterruptedException {
DistributedWorkQueue dwq = new DistributedWorkQueue(
context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, context);
dwq.processExistingWork(new LogProcessor(), MoreExecutors.newDirectExecutorService(), 1, false);
return System.currentTimeMillis() + dwq.getCheckInterval();
}

/**
* Sort any logs that need sorting in a ThreadPool using
* {@link Property#TSERV_WAL_SORT_MAX_CONCURRENT} threads. This method will start a background
* thread to look for log sorting work in the future that will be processed by the
* ThreadPoolExecutor
*/
public void startWatchingForRecoveryLogs(int threadPoolSize)
throws KeeperException, InterruptedException {
ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools()
.createFixedThreadPool(threadPoolSize, this.getClass().getName(), true);
new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf,
context).startProcessing(new LogProcessor(), threadPool);
context).processExistingAndFuture(new LogProcessor(), threadPool);
}

public List<RecoveryStatus> getLogSorts() {
Expand Down
Loading

0 comments on commit 5d12597

Please sign in to comment.