Skip to content

Commit

Permalink
TEZ-4397: Open Tez Input splits asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
shameersss1 committed Feb 13, 2023
1 parent e236f51 commit 01a5a1b
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
Expand Down Expand Up @@ -129,14 +138,69 @@ public class TezGroupedSplitsRecordReader implements RecordReader<K, V> {
int idx = 0;
long progress;
RecordReader<K, V> curReader;

private final AtomicInteger initIndex;
private final int numReaders;
private ExecutorService initReaderExecService;
private BlockingDeque<Future<RecordReader<K, V>>> initedReaders;
private AtomicBoolean failureOccurred = new AtomicBoolean(false);

public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job,
Reporter reporter) throws IOException {
this.groupedSplit = split;
this.job = job;
this.reporter = reporter;
this.initIndex = new AtomicInteger(0);
int numThreads = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS,
TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT);
this.numReaders = Math.min(groupedSplit.wrappedSplits.size(),
conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS,
TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT));
// skip multi-threaded split opening when number of readers is less than 1
if (numReaders > 1) {
this.initReaderExecService = Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MAX_PRIORITY)
.setNameFormat("TEZ-Split-Init-Thread-%d")
.build());
this.initedReaders = new LinkedBlockingDeque<>();
}
initNextRecordReader();
}

private void preInitReaders() {
if (initReaderExecService == null) {
return;
}
for (int i = 0; i < numReaders; i++) {
initedReaders.offer(this.initReaderExecService.submit(() -> {
if (failureOccurred.get()) {
return null;
}
try {
int index = initIndex.getAndIncrement();
if (index >= groupedSplit.wrappedSplits.size()) {
return null;
}
InputSplit s = groupedSplit.wrappedSplits.get(index);
RecordReader<K, V> reader = wrappedInputFormat.getRecordReader(s, job, reporter);
LOG.debug("Init Thread processed reader number {} initialization", index);
return reader;
} catch (Exception e) {
failureOccurred.set(true);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
cancelsFutures();
throw new RuntimeException(e);
}
}));
}
}

public RecordReader<K, V> getCurReader() {
return curReader;
}

@Override
public boolean next(K key, V value) throws IOException {
Expand Down Expand Up @@ -171,7 +235,7 @@ public void close() throws IOException {
curReader = null;
}
}

protected boolean initNextRecordReader() throws IOException {
if (curReader != null) {
curReader.close();
Expand All @@ -183,23 +247,45 @@ protected boolean initNextRecordReader() throws IOException {

// if all chunks have been processed, nothing more to do.
if (idx == groupedSplit.wrappedSplits.size()) {
if (initReaderExecService != null) {
LOG.info("Shutting down the init record reader threadpool");
initReaderExecService.shutdownNow();
}
return false;
}

if (LOG.isDebugEnabled()) {
LOG.debug("Init record reader for index " + idx + " of " +
LOG.debug("Init record reader for index " + idx + " of " +
groupedSplit.wrappedSplits.size());
}

// get a record reader for the idx-th chunk
try {
curReader = wrappedInputFormat.getRecordReader(
groupedSplit.wrappedSplits.get(idx), job, reporter);
// get the cur reader directly when async split opening is disabled
if (initReaderExecService == null) {
curReader = wrappedInputFormat.getRecordReader(groupedSplit.wrappedSplits.get(idx), job, reporter);
} else {
preInitReaders();
curReader = initedReaders.take().get();
}
} catch (Exception e) {
throw new RuntimeException (e);
failureOccurred.set(true);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (initedReaders != null) {
cancelsFutures();
}
throw new RuntimeException(e);
}
idx++;
return true;
return curReader != null;
}

private void cancelsFutures() {
for (Future<RecordReader<K, V>> f : initedReaders) {
f.cancel(true);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.TreeMap;

import org.apache.hadoop.classification.InterfaceStability;
import org.apache.tez.common.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.util.RackResolver;
Expand Down Expand Up @@ -102,6 +103,20 @@ public abstract class TezSplitGrouper {
public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only";
public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false;

/**
* Number of threads used to initialize the grouped splits, to asynchronously open the readers.
*/
public static final String TEZ_GROUPING_SPLIT_INIT_THREADS = "tez.grouping.split.init-threads";
public static final int TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT = 4;

/**
* Number of record readers to asynchronously and proactively init.
* In order for upstream apps to use this feature, the objects created in the
* upstream apps as part TezGroupedSplitsRecordReader call should be thread safe.
*/
@InterfaceStability.Unstable
public static final String TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS = "tez.grouping.split.init.num-recordreaders";
public static final int TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT = 1;

static class LocationHolder {
List<SplitContainer> splits;
Expand Down

0 comments on commit 01a5a1b

Please sign in to comment.