diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java index 61ba560300..e1e297a040 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java @@ -20,7 +20,16 @@ import java.io.IOException; import java.util.Objects; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.tez.mapreduce.grouper.TezSplitGrouper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -129,14 +138,69 @@ public class TezGroupedSplitsRecordReader implements RecordReader { int idx = 0; long progress; RecordReader curReader; - + private final AtomicInteger initIndex; + private final int numReaders; + private ExecutorService initReaderExecService; + private BlockingDeque>> initedReaders; + private AtomicBoolean failureOccurred = new AtomicBoolean(false); + public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job, Reporter reporter) throws IOException { this.groupedSplit = split; this.job = job; this.reporter = reporter; + this.initIndex = new AtomicInteger(0); + int numThreads = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS, + TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT); + this.numReaders = Math.min(groupedSplit.wrappedSplits.size(), + conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS, + TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT)); + // skip multi-threaded split opening when number of readers is less than 1 + if (numReaders > 1) { + this.initReaderExecService = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MAX_PRIORITY) + .setNameFormat("TEZ-Split-Init-Thread-%d") + .build()); + this.initedReaders = new LinkedBlockingDeque<>(); + } initNextRecordReader(); } + + private void preInitReaders() { + if (initReaderExecService == null) { + return; + } + for (int i = 0; i < numReaders; i++) { + initedReaders.offer(this.initReaderExecService.submit(() -> { + if (failureOccurred.get()) { + return null; + } + try { + int index = initIndex.getAndIncrement(); + if (index >= groupedSplit.wrappedSplits.size()) { + return null; + } + InputSplit s = groupedSplit.wrappedSplits.get(index); + RecordReader reader = wrappedInputFormat.getRecordReader(s, job, reporter); + LOG.debug("Init Thread processed reader number {} initialization", index); + return reader; + } catch (Exception e) { + failureOccurred.set(true); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + cancelsFutures(); + throw new RuntimeException(e); + } + })); + } + } + + public RecordReader getCurReader() { + return curReader; + } @Override public boolean next(K key, V value) throws IOException { @@ -171,7 +235,7 @@ public void close() throws IOException { curReader = null; } } - + protected boolean initNextRecordReader() throws IOException { if (curReader != null) { curReader.close(); @@ -183,23 +247,45 @@ protected boolean initNextRecordReader() throws IOException { // if all chunks have been processed, nothing more to do. if (idx == groupedSplit.wrappedSplits.size()) { + if (initReaderExecService != null) { + LOG.info("Shutting down the init record reader threadpool"); + initReaderExecService.shutdownNow(); + } return false; } if (LOG.isDebugEnabled()) { - LOG.debug("Init record reader for index " + idx + " of " + + LOG.debug("Init record reader for index " + idx + " of " + groupedSplit.wrappedSplits.size()); } // get a record reader for the idx-th chunk try { - curReader = wrappedInputFormat.getRecordReader( - groupedSplit.wrappedSplits.get(idx), job, reporter); + // get the cur reader directly when async split opening is disabled + if (initReaderExecService == null) { + curReader = wrappedInputFormat.getRecordReader(groupedSplit.wrappedSplits.get(idx), job, reporter); + } else { + preInitReaders(); + curReader = initedReaders.take().get(); + } } catch (Exception e) { - throw new RuntimeException (e); + failureOccurred.set(true); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + if (initedReaders != null) { + cancelsFutures(); + } + throw new RuntimeException(e); } idx++; - return true; + return curReader != null; + } + + private void cancelsFutures() { + for (Future> f : initedReaders) { + f.cancel(true); + } } @Override diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java index a1d6b6c806..472ff6139f 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.TreeMap; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.tez.common.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.util.RackResolver; @@ -102,6 +103,20 @@ public abstract class TezSplitGrouper { public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only"; public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false; + /** + * Number of threads used to initialize the grouped splits, to asynchronously open the readers. + */ + public static final String TEZ_GROUPING_SPLIT_INIT_THREADS = "tez.grouping.split.init-threads"; + public static final int TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT = 4; + + /** + * Number of record readers to asynchronously and proactively init. + * In order for upstream apps to use this feature, the objects created in the + * upstream apps as part TezGroupedSplitsRecordReader call should be thread safe. + */ + @InterfaceStability.Unstable + public static final String TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS = "tez.grouping.split.init.num-recordreaders"; + public static final int TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT = 1; static class LocationHolder { List splits;