Skip to content

Commit

Permalink
[C++] Scan Node reads multiple files (#300)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Nov 8, 2022
1 parent a6e788c commit 39e47a8
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 51 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
run:
working-directory: ./cpp
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: ccache
uses: hendrikmuhs/ccache-action@v1
- name: Run lint
Expand All @@ -40,7 +40,7 @@ jobs:
run:
working-directory: ./cpp
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install dependencies
run: |
brew update
Expand Down
46 changes: 33 additions & 13 deletions cpp/src/lance/arrow/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
#include <fmt/ranges.h>

#include <concepts>
#include <string>
#include <range/v3/action.hpp>
#include <range/v3/view.hpp>
#include <vector>

#include "lance/arrow/file_lance.h"
#include "lance/arrow/type.h"

namespace views = ranges::views;

namespace lance::arrow {

::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> MergeRecordBatches(
Expand All @@ -40,6 +43,25 @@ ::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> MergeRecordBatches(
return ::arrow::RecordBatch::FromStructArray(struct_arr);
}

::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> MergeRecordBatches(
const std::vector<std::shared_ptr<::arrow::RecordBatch>>& batches, ::arrow::MemoryPool* pool) {
if (batches.empty()) {
return nullptr;
}
auto batch = batches[0];
for (auto& b : batches | views::drop(1)) {
if (b->num_rows() != batch->num_rows()) {
return ::arrow::Status::Invalid(
"MergeRecordBatches: attempt to merge batches with different length: ",
b->num_rows(),
" != ",
batch->num_rows());
}
ARROW_ASSIGN_OR_RAISE(batch, MergeRecordBatches(batch, b, pool));
}
return batch;
}

::arrow::Result<std::shared_ptr<::arrow::Array>> MergeListArrays(
const std::shared_ptr<::arrow::Array>& lhs,
const std::shared_ptr<::arrow::Array>& rhs,
Expand Down Expand Up @@ -97,18 +119,17 @@ ::arrow::Result<std::shared_ptr<::arrow::StructArray>> MergeStructArrays(
arrays.emplace_back(left_arr);
}

for (auto& field : rhs->struct_type()->fields()) {
auto& name = field->name();
if (lhs->GetFieldByName(name)) {
// We've seen each other before
continue;
}
for (auto name :
rhs->struct_type()->fields() //
| views::filter([&lhs](auto& f) { return !lhs->GetFieldByName(f->name()); }) //
| views::transform([](auto& f) { return f->name(); })) {
names.emplace_back(name);
arrays.emplace_back(rhs->GetFieldByName(name));
}
return ::arrow::StructArray::Make(arrays, names);
}

/// Concept of a class that has ".fields()" method.
template <typename T>
concept HasFields = (std::same_as<T, ::arrow::Schema> || std::same_as<T, ::arrow::StructType>);

Expand All @@ -129,12 +150,11 @@ ::arrow::Result<std::vector<std::shared_ptr<::arrow::Field>>> MergeFieldWithChil
fields.emplace_back(merged);
}
}
for (const auto& field : rhs.fields()) {
auto left_field = lhs.GetFieldByName(field->name());
if (!left_field) {
fields.emplace_back(field);
}
}
ranges::actions::insert(
fields,
std::end(fields), //
rhs.fields() //
| views::filter([&lhs](auto& f) { return !lhs.GetFieldByName(f->name()); }));
return fields;
};

Expand Down
11 changes: 11 additions & 0 deletions cpp/src/lance/arrow/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <arrow/result.h>

#include <memory>
#include <vector>

#include "lance/format/schema.h"

Expand All @@ -31,6 +32,16 @@ ::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> MergeRecordBatches(
const std::shared_ptr<::arrow::RecordBatch>& rhs,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

/// Merge a list of record batches that represent the different columns of the same rows,
/// into a single record batch.
///
/// \param batches A list of record batches. Must have the same length.
/// \param pool memory pool.
/// \return the merged record batch. Or nullptr if batches is empty.
::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> MergeRecordBatches(
const std::vector<std::shared_ptr<::arrow::RecordBatch>>& batches,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

::arrow::Result<std::shared_ptr<::arrow::StructArray>> MergeStructArrays(
const std::shared_ptr<::arrow::StructArray>& lhs,
const std::shared_ptr<::arrow::StructArray>& rhs,
Expand Down
30 changes: 30 additions & 0 deletions cpp/src/lance/arrow/utils_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,34 @@ TEST_CASE("Test merge extension types") {
auto result = MergeSchema(*::arrow::schema({::arrow::field("ann", lance::testing::box2d())}),
*::arrow::schema({::arrow::field("ann", lance::testing::image())}));
CHECK(result.status().IsInvalid());
}

TEST_CASE("Merge record batches") {
const int kNumRows = 3;
auto batch1 =
::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col1", ::arrow::int32())}),
kNumRows,
{ToArray({1, 2, 3}).ValueOrDie()});
auto batch2 =
::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col2", ::arrow::uint32())}),
kNumRows,
{ToArray({10, 20, 30}).ValueOrDie()});
auto batch3 =
::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col3", ::arrow::utf8())}),
kNumRows,
{ToArray({"1", "2", "3"}).ValueOrDie()});
std::vector<std::shared_ptr<::arrow::RecordBatch>> batches({batch1, batch2, batch3});

auto merged = lance::arrow::MergeRecordBatches(batches).ValueOrDie();
CHECK(merged->num_rows() == kNumRows);

auto expected =
::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col1", ::arrow::int32()),
::arrow::field("col2", ::arrow::uint32()),
::arrow::field("col3", ::arrow::utf8())}),
kNumRows,
{ToArray({1, 2, 3}).ValueOrDie(),
ToArray({10, 20, 30}).ValueOrDie(),
ToArray({"1", "2", "3"}).ValueOrDie()});
CHECK(merged->Equals(*expected));
}
5 changes: 3 additions & 2 deletions cpp/src/lance/io/exec/limit_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ TEST_CASE("Read limit multiple times") {
auto infile = make_shared<arrow::io::BufferReader>(sink->Finish().ValueOrDie());
auto reader = std::make_shared<lance::io::FileReader>(infile);
CHECK(reader->Open().ok());
auto scan = lance::io::exec::Scan::Make(reader, std::make_shared<Schema>(reader->schema()), 100)
.ValueOrDie();
auto scan =
lance::io::exec::Scan::Make({{reader, std::make_shared<Schema>(reader->schema())}}, 100)
.ValueOrDie();
auto counter = std::make_shared<Counter>(5, 10);
auto limit = Limit(counter, std::move(scan));
auto batch = limit.Next().ValueOrDie();
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/lance/io/exec/project.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ::arrow::Result<std::unique_ptr<Project>> Project::Make(
/// Take -> (optionally) Limit -> Filter -> Scan
ARROW_ASSIGN_OR_RAISE(auto filter_scan_schema, schema.Project(scan_options->filter));
ARROW_ASSIGN_OR_RAISE(auto filter_scan_node,
Scan::Make(reader, filter_scan_schema, scan_options->batch_size));
Scan::Make({{reader, filter_scan_schema}}, scan_options->batch_size));
ARROW_ASSIGN_OR_RAISE(child, Filter::Make(scan_options->filter, std::move(filter_scan_node)));
if (counter) {
ARROW_ASSIGN_OR_RAISE(child, Limit::Make(counter, std::move(child)));
Expand All @@ -64,7 +64,8 @@ ::arrow::Result<std::unique_ptr<Project>> Project::Make(
ARROW_ASSIGN_OR_RAISE(child, Take::Make(reader, take_schema, std::move(child)));
} else {
/// (*optionally) Limit -> Scan
ARROW_ASSIGN_OR_RAISE(child, Scan::Make(reader, projected_schema, scan_options->batch_size));
ARROW_ASSIGN_OR_RAISE(
child, Scan::Make({{std::move(reader), projected_schema}}, scan_options->batch_size));
if (counter) {
ARROW_ASSIGN_OR_RAISE(child, Limit::Make(counter, std::move(child)));
}
Expand Down
72 changes: 55 additions & 17 deletions cpp/src/lance/io/exec/scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,90 @@

#include "lance/io/exec/scan.h"

#include <fmt/format.h>

#include <memory>

#include "lance/arrow/utils.h"
#include "lance/format/metadata.h"
#include "lance/io/reader.h"

namespace lance::io::exec {

::arrow::Result<std::unique_ptr<Scan>> Scan::Make(std::shared_ptr<FileReader> reader,
std::shared_ptr<lance::format::Schema> schema,
constexpr int kMinimalIOThreads = 4;

::arrow::Result<std::unique_ptr<Scan>> Scan::Make(const std::vector<FileReaderWithSchema>& readers,
int64_t batch_size) {
auto scan = std::unique_ptr<Scan>(new Scan(reader, schema, batch_size));
if (reader->metadata().num_batches() == 0) {
if (readers.empty()) {
return ::arrow::Status::Invalid("Scan::Make: can not accept zero readers");
}
if (std::get<0>(readers[0])->metadata().num_batches() == 0) {
return ::arrow::Status::IOError("Can not open Scan on empty file");
}
scan->current_batch_page_length_ = reader->metadata().GetBatchLength(0);
return scan;
return std::unique_ptr<Scan>(new Scan(readers, batch_size));
}

Scan::Scan(std::shared_ptr<FileReader> reader,
std::shared_ptr<lance::format::Schema> schema,
int64_t batch_size)
: reader_(std::move(reader)), schema_(std::move(schema)), batch_size_(batch_size) {}
Scan::Scan(const std::vector<FileReaderWithSchema>& readers, int64_t batch_size)
: readers_(std::begin(readers), std::end(readers)),
batch_size_(batch_size),
current_batch_page_length_(std::get<0>(readers_[0])->metadata().GetBatchLength(0)) {}

::arrow::Result<ScanBatch> Scan::Next() {
assert(!readers_.empty());
int32_t offset;
int32_t batch_id;

auto& first_reader = std::get<0>(readers_[0]);
{
// Make the plan to how much data to read next.
std::lock_guard guard(lock_);
batch_id = current_batch_id_;
offset = current_offset_;
current_offset_ += batch_size_;
if (current_offset_ >= current_batch_page_length_) {
current_batch_id_++;
current_offset_ = 0;
if (current_batch_id_ < reader_->metadata().num_batches()) {
current_batch_page_length_ = reader_->metadata().GetBatchLength(current_batch_id_);
if (current_batch_id_ < first_reader->metadata().num_batches()) {
current_batch_page_length_ = first_reader->metadata().GetBatchLength(current_batch_id_);
}
}
// Lock released after scope.
}
if (batch_id >= reader_->metadata().num_batches()) {

if (batch_id >= first_reader->metadata().num_batches()) {
// Reach EOF
return ScanBatch::Null();
}

ARROW_ASSIGN_OR_RAISE(auto batch, reader_->ReadBatch(*schema_, batch_id, offset, batch_size_));
if (::arrow::GetCpuThreadPoolCapacity() < kMinimalIOThreads) {
// Keep a minimal number of threads, preventing live lock on low CPU count (<=2) machines,
// i.e., Github Action runners.
ARROW_RETURN_NOT_OK(::arrow::SetCpuThreadPoolCapacity(kMinimalIOThreads));
}
auto executor = ::arrow::internal::GetCpuThreadPool();
std::vector<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>> futs;
for (auto [reader, schema] : readers_) {
ARROW_ASSIGN_OR_RAISE(
auto fut,
executor->Submit(
[batch_id, offset](
std::shared_ptr<lance::io::FileReader> r,
std::shared_ptr<lance::format::Schema> s,
auto batch_size) -> ::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> {
ARROW_ASSIGN_OR_RAISE(auto batch, r->ReadBatch(*s, batch_id, offset, batch_size));
return batch;
},
reader,
schema,
batch_size_));
futs.emplace_back(std::move(fut));
}

std::vector<std::shared_ptr<::arrow::RecordBatch>> batches;
for (auto& fut : futs) {
ARROW_ASSIGN_OR_RAISE(auto& b, fut.result());
batches.emplace_back(b);
}

ARROW_ASSIGN_OR_RAISE(auto batch, lance::arrow::MergeRecordBatches(batches));
return ScanBatch{
batch,
batch_id,
Expand All @@ -70,7 +106,9 @@ ::arrow::Result<ScanBatch> Scan::Next() {
}

::arrow::Status Scan::Seek(int32_t offset) {
ARROW_ASSIGN_OR_RAISE(auto batch_and_offset, reader_->metadata().LocateBatch(offset));
assert(!readers_.empty());
auto& reader = std::get<0>(readers_[0]);
ARROW_ASSIGN_OR_RAISE(auto batch_and_offset, reader->metadata().LocateBatch(offset));
current_batch_id_ = std::get<0>(batch_and_offset);
current_offset_ = std::get<1>(batch_and_offset);
return ::arrow::Status::OK();
Expand Down
30 changes: 18 additions & 12 deletions cpp/src/lance/io/exec/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include <memory>
#include <mutex>
#include <tuple>
#include <vector>

#include "lance/io/exec/base.h"

Expand All @@ -36,21 +38,28 @@ namespace lance::io::exec {
/// Leaf scan node.
class Scan : public ExecNode {
public:
using FileReaderWithSchema =
std::tuple<std::shared_ptr<FileReader>, std::shared_ptr<lance::format::Schema>>;

/// Factory method.
static ::arrow::Result<std::unique_ptr<Scan>> Make(std::shared_ptr<FileReader> reader,
std::shared_ptr<lance::format::Schema> schema,
int64_t batch_size);
///
/// \param readers a vector of the tuples of `[reader, schema]`, including opened file reader
/// and projection schema.
/// \param batch_size batch size.
/// \return a Scan node if succeed.
static ::arrow::Result<std::unique_ptr<Scan>> Make(
const std::vector<FileReaderWithSchema>& readers, int64_t batch_size);

Scan() = delete;

virtual ~Scan() = default;
~Scan() override = default;

constexpr Type type() const override { return Type::kScan; }

/// Returns the next available batch in the file. Or returns nullptr if EOF.
::arrow::Result<ScanBatch> Next() override;

const lance::format::Schema& schema();

/// Debug String
std::string ToString() const override;

/// Seek to a particular row.
Expand All @@ -63,15 +72,12 @@ class Scan : public ExecNode {
private:
/// Constructor
///
/// \param reader An opened file reader.
/// \param schema The scan schema, used to select column to scan.
/// \param readers A vector of opened readers with the projected schema.
/// \param batch_size scan batch size.
Scan(std::shared_ptr<FileReader> reader,
std::shared_ptr<lance::format::Schema> schema,
Scan(const std::vector<FileReaderWithSchema>& readers,
int64_t batch_size);

const std::shared_ptr<FileReader> reader_;
const std::shared_ptr<lance::format::Schema> schema_;
std::vector<FileReaderWithSchema> readers_;
const int64_t batch_size_;

/// Keep track of the progress.
Expand Down
Loading

0 comments on commit 39e47a8

Please sign in to comment.