Skip to content

Commit

Permalink
Fix race in Scanner::ToTable
Browse files Browse the repository at this point in the history
  • Loading branch information
fsaintjacques committed May 20, 2020
1 parent b8a6a16 commit fd5a4a3
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
38 changes: 22 additions & 16 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,32 +177,38 @@ static inline RecordBatchVector FlattenRecordBatchVector(
return flattened;
}

struct TableAssemblyState {
/// Protecting mutating accesses to batches
std::mutex mutex{};
std::vector<RecordBatchVector> batches{};

void Emplace(RecordBatchVector b, size_t position) {
std::lock_guard<std::mutex> lock(mutex);
if (batches.size() <= position) {
batches.resize(position + 1);
}
batches[position] = std::move(b);
}
};

Result<std::shared_ptr<Table>> Scanner::ToTable() {
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
auto task_group = scan_context_->TaskGroup();

// Protecting mutating accesses to batches
std::mutex mutex;
std::vector<RecordBatchVector> batches;
/// Wraps the state in a shared_ptr to ensure that a failing ScanTask don't
/// invalidate the concurrent running tasks because Finish() early returns
/// and the mutex/batches may got out of scope.
auto state = std::make_shared<TableAssemblyState>();

size_t scan_task_id = 0;
for (auto maybe_scan_task : scan_task_it) {
ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));

auto id = scan_task_id++;
task_group->Append([&batches, &mutex, id, scan_task] {
task_group->Append([state, id, scan_task] {
ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());

ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector());

{
// Move into global batches.
std::lock_guard<std::mutex> lock(mutex);
if (batches.size() <= id) {
batches.resize(id + 1);
}
batches[id] = std::move(local);
}

state->Emplace(std::move(local), id);
return Status::OK();
});
}
Expand All @@ -211,7 +217,7 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
RETURN_NOT_OK(task_group->Finish());

return Table::FromRecordBatches(scan_options_->schema(),
FlattenRecordBatchVector(std::move(batches)));
FlattenRecordBatchVector(std::move(state->batches)));
}

} // namespace dataset
Expand Down
7 changes: 2 additions & 5 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1485,11 +1485,8 @@ def test_parquet_dataset_factory_invalid(tempdir):
dataset = ds.parquet_dataset(str(root_path / '_metadata'))
assert dataset.schema.equals(table.schema)
assert len(dataset.files) == 4
# TODO this segfaults with
# terminate called after throwing an instance of 'std::system_error'
# what(): Invalid argument
# with pytest.raises(ValueError):
# dataset.to_table()
with pytest.raises(FileNotFoundError):
dataset.to_table()


def _create_metadata_file(root_path):
Expand Down

0 comments on commit fd5a4a3

Please sign in to comment.