Skip to content

Commit

Permalink
fixes to multi-threading reader
Browse files Browse the repository at this point in the history
  • Loading branch information
Anonymous committed Jul 27, 2023
1 parent 1d0c173 commit 5bab08b
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
*/
public abstract class BaseTableReader
{
public static final long DEFAULT_LIMIT = 20L;
public static final int DEFAULT_LIMIT = 20;

protected final String tablePath;
protected final TableClient tableClient;
Expand All @@ -54,7 +54,7 @@ public BaseTableReader(String tablePath)
* @throws IOException
*/
public abstract void show(
long limit,
int limit,
Optional<List<String>> columnsOpt
) throws TableNotFoundException, IOException;

Expand Down Expand Up @@ -147,7 +147,8 @@ private static String formatter(int length)
/**
* Minimum command line options for any implementation of this reader.
*/
protected static Options baseOptions() {
protected static Options baseOptions()
{
return new Options()
.addRequiredOption("t", "table", true, "Fully qualified table path")
.addOption("c", "columns", true,
Expand Down Expand Up @@ -184,5 +185,14 @@ protected static CommandLine parseArgs(Options options, String[] args)
System.exit(-1);
return null;
}

protected static int parseInt(CommandLine cli, String optionName, int defaultValue)
throws ParseException
{
return Optional.ofNullable(cli.getParsedOptionValue(optionName))
.map(Number.class::cast)
.map(Number::intValue)
.orElse(defaultValue);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

import io.delta.kernel.Scan;
import io.delta.kernel.Snapshot;
Expand All @@ -36,7 +40,7 @@
*
* <p>
* Usage: java io.delta.kernel.examples.MultiThreadedTableReader \
* [-c <arg>] [-l <arg>] [-p <arg>] -t <arg>
* [-c <arg>] [-l <arg>] [-p <arg>] -t <arg>
* <p>
* -c,--columns <arg> Comma separated list of columns to read from the
* table. Ex. --columns=id,name,address
Expand All @@ -61,28 +65,19 @@ public MultiThreadedTableReader(int numThreads, String tablePath)
public static void main(String[] args)
throws Exception
{
CommandLine commandLine = parseArgs(
baseOptions().addOption(
Option.builder()
.option("p")
.longOpt("parallelism")
.hasArg(true)
.desc("Number of parallel readers to use (default 3).")
.type(Number.class)
.build()),
args);
Options cliOptions = baseOptions().addOption(
Option.builder()
.option("p")
.longOpt("parallelism")
.hasArg(true)
.desc("Number of parallel readers to use (default 3).")
.type(Number.class)
.build());
CommandLine commandLine = parseArgs(cliOptions, args);

String tablePath = commandLine.getOptionValue("table");
long limit =
Optional.ofNullable(commandLine.getParsedOptionValue("limit"))
.map(Number.class::cast)
.map(Number::longValue)
.orElse(DEFAULT_LIMIT);

int numThreads = Optional.ofNullable(commandLine.getParsedOptionValue("parallelism"))
.map(Number.class::cast)
.map(Number::intValue)
.orElse(DEFAULT_NUM_THREADS);
int limit = parseInt(commandLine, "limit", DEFAULT_LIMIT);
int numThreads = parseInt(commandLine, "parallelism", DEFAULT_NUM_THREADS);

Optional<List<String>> columns =
Optional.ofNullable(commandLine.getOptionValue("columns"))
Expand All @@ -92,126 +87,147 @@ public static void main(String[] args)
.show(limit, columns);
}

public void show(long limit, Optional<List<String>> columnsOpt)
public void show(int limit, Optional<List<String>> columnsOpt)
throws TableNotFoundException, IOException
{
Table table = Table.forPath(tablePath);
Snapshot snapshot = table.getLatestSnapshot(tableClient);

StructType readSchema = pruneSchema(snapshot.getSchema(tableClient), columnsOpt);

List<DataReadResult> data = readSnapshot(readSchema, snapshot, limit, numThreads);
List<DataReadResult> data = new Reader(limit)
.readSnapshot(readSchema, snapshot);
printData(readSchema, data, limit);
}

/**
* Utility method to read the data from the given {@code snapshot}.
*
* @param readSchema Subset of columns to read from the snapshot.
* @param snapshot Table snapshot object
* @param limit maximum number of rows to show.
* @param maxRowCount Not a hard limit but use this limit to stop reading more columnar batches
* once the already read columnar batches have at least these many rows.
* @return
* @throws Exception
*/
private List<DataReadResult> readSnapshot(
StructType readSchema,
Snapshot snapshot,
long limit,
long maxRowCount) throws IOException
{
Scan scan = snapshot.getScanBuilder(tableClient)
.withReadSchema(tableClient, readSchema)
.build();

Row scanState = scan.getScanState(tableClient);
CloseableIterator<ColumnarBatch> scanFileIter = scan.getScanFiles(tableClient);

ConcurrentLinkedQueue<DataReadResult> dataReadQueue = new ConcurrentLinkedQueue<>();
AtomicLong readRowCount = new AtomicLong(0);
AtomicBoolean noMoreWorkSignal = new AtomicBoolean(false);
ExecutorService executorService = Executors.newFixedThreadPool(numThreads + 1);
try {
BlockingQueue<ScanFile> queue = instantiateWorkGenerator(
executorService,
noMoreWorkSignal,
scanState,
scanFileIter
);

for (int i = 0; i < numThreads; i++) {
instantiateWorkExecutor(executorService, noMoreWorkSignal, dataReadQueue, queue);
}
}
finally {
noMoreWorkSignal.set(true);
executorService.shutdownNow();
private class Reader {
private final int limit;
private final AtomicBoolean stopSignal = new AtomicBoolean(false);
private final ExecutorService executorService =
Executors.newFixedThreadPool(numThreads + 1);
private final BlockingQueue<ScanFile> workQueue = new ArrayBlockingQueue<>(20);

// Data read
private final List<DataReadResult> dataReadResults = new ArrayList<>();
private int readRecordCount;

Reader(int limit) {
this.limit = limit;
}

return dataReadQueue.stream().collect(Collectors.toList());
}
/**
* Utility method to read the data from the given {@code snapshot}.
*
* @param readSchema Subset of columns to read from the snapshot.
* @param snapshot Table snapshot object
* once the already read columnar batches have at least these many rows.
* @return
* @throws Exception
*/
List<DataReadResult> readSnapshot(StructType readSchema, Snapshot snapshot)
{
Scan scan = snapshot.getScanBuilder(tableClient)
.withReadSchema(tableClient, readSchema)
.build();

private BlockingQueue<ScanFile> instantiateWorkGenerator(
ExecutorService executorService,
AtomicBoolean noMoreWorkSignal,
Row scanStateRow,
CloseableIterator<ColumnarBatch> scanFileIter)
{
BlockingQueue<ScanFile> queue = new ArrayBlockingQueue<>(20);
List<Future<Void>> futures = new ArrayList<>();
BlockingQueue<ScanFile> workQueue = new ArrayBlockingQueue<>(20);
try {
futures.add(executorService.submit(workGenerator(stopSignal, scan)));
for (int i = 0; i < numThreads; i++) {
futures.add(executorService.submit(workConsumer(i)));
}

synchronized (stopSignal) {
stopSignal.wait();
}
return dataReadResults;
} catch (InterruptedException ie) {
System.out.println("Interrupted exiting now..");
throw new RuntimeException(ie);
}
finally {
stopSignal.set(true);
executorService.shutdownNow();
}
}

executorService.submit(
() -> {
while (scanFileIter.hasNext() && !noMoreWorkSignal.get()) {
private Callable<Void> workGenerator(
AtomicBoolean stopSignal,
Scan scan)
{
return (() -> {
Row scanStateRow = scan.getScanState(tableClient);
CloseableIterator<ColumnarBatch> scanFileIter = scan.getScanFiles(tableClient);

while (scanFileIter.hasNext() && !stopSignal.get()) {
ColumnarBatch scanFileBatch = scanFileIter.next();
try (CloseableIterator<Row> scanFileRows = scanFileBatch.getRows()) {
while (scanFileRows.hasNext() && !noMoreWorkSignal.get()) {
queue.put(new ScanFile(scanStateRow, scanFileRows.next()));
while (scanFileRows.hasNext() && !stopSignal.get()) {
workQueue.put(new ScanFile(scanStateRow, scanFileRows.next()));
}
}
catch (IOException | InterruptedException ioe) {
catch (IOException ioe) {
throw new RuntimeException(ioe);
} catch (InterruptedException ie) {
if (!stopSignal.get()) {
System.out.print("Work generator is interrupted");
throw ie;
}
}
}
noMoreWorkSignal.set(true);
}
);

return queue;
}
return null;
});
}

private void instantiateWorkExecutor(
ExecutorService executorService,
AtomicBoolean noMoreWorkSignal,
ConcurrentLinkedQueue<DataReadResult> dataReadQueue,
BlockingQueue<ScanFile> workQueue)
{
executorService.submit(
() -> {
while (!noMoreWorkSignal.get()) {
try {
ScanFile work = workQueue.take();
try (CloseableIterator<DataReadResult> dataIter = Scan.readData(
tableClient,
work.getScanRow(tableClient),
Utils.singletonCloseableIterator(work.getScanFileRow(tableClient)),
Optional.empty())) {
while(dataIter.hasNext()) {
dataReadQueue.add(dataIter.next());

}
}
// TODO: add the scan results to a queue.
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
} catch (InterruptedException ioe) {
// continue
// TODO: usually interrupted when programming is trying to finish, but
// also need to check in case when the work is not done yet.
}
}
}
);
private Callable<Void> workConsumer(int workerId)
{
return (() -> {
try {
while (!stopSignal.get()) {
ScanFile work = workQueue.take();
try (CloseableIterator<DataReadResult> dataIter = Scan.readData(
tableClient,
work.getScanRow(tableClient),
Utils.singletonCloseableIterator(work.getScanFileRow(tableClient)),
Optional.empty())) {
while (dataIter.hasNext() && !stopSignal.get()) {
if (addData(dataIter.next())) {
// Have enough records, exit now.
break;
}
}
}
}
}
catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
catch (InterruptedException ie) {
if (!stopSignal.get()) {
System.out.printf("Worker %d is interrupted." + workerId);
throw ie;
}
}
return null;
});
}

private boolean addData(DataReadResult dataReadResult) {
synchronized (dataReadResults) {
readRecordCount += dataReadResult.getData().getSize();
dataReadResults.add(dataReadResult);
boolean hasEnoughData = readRecordCount >= limit;
if (hasEnoughData) {
stopSignal.set(true);
synchronized (stopSignal) {
stopSignal.notify();
}
}
return hasEnoughData;
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ public static void main(String[] args)
CommandLine commandLine = parseArgs(baseOptions(), args);

String tablePath = commandLine.getOptionValue("table");
long limit =
Optional.ofNullable(commandLine.getParsedOptionValue("limit"))
.map(Number.class::cast)
.map(Number::longValue)
.orElse(DEFAULT_LIMIT);
int limit = parseInt(commandLine, "limit", DEFAULT_LIMIT);

Optional<List<String>> columns =
Optional.ofNullable(commandLine.getOptionValue("columns"))
Expand All @@ -58,7 +54,8 @@ public static void main(String[] args)
.show(limit, columns);
}

public void show(long limit, Optional<List<String>> columnsOpt)
@Override
public void show(int limit, Optional<List<String>> columnsOpt)
throws TableNotFoundException, IOException
{
Table table = Table.forPath(tablePath);
Expand Down

0 comments on commit 5bab08b

Please sign in to comment.