Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Scan Node reads multiple files #300

Merged
merged 26 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
run:
working-directory: ./cpp
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: ccache
uses: hendrikmuhs/ccache-action@v1
- name: Run lint
Expand All @@ -40,7 +40,7 @@ jobs:
run:
working-directory: ./cpp
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install dependencies
run: |
brew update
Expand Down
46 changes: 33 additions & 13 deletions cpp/src/lance/arrow/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
#include <fmt/ranges.h>

#include <concepts>
#include <string>
#include <range/v3/action.hpp>
#include <range/v3/view.hpp>
#include <vector>

#include "lance/arrow/file_lance.h"
#include "lance/arrow/type.h"

namespace views = ranges::views;

namespace lance::arrow {

::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> MergeRecordBatches(
Expand All @@ -40,6 +43,25 @@ ::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> MergeRecordBatches(
return ::arrow::RecordBatch::FromStructArray(struct_arr);
}

::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> MergeRecordBatches(
const std::vector<std::shared_ptr<::arrow::RecordBatch>>& batches, ::arrow::MemoryPool* pool) {
if (batches.empty()) {
return nullptr;
}
auto batch = batches[0];
for (auto& b : batches | views::drop(1)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this require that all RecordBatches being merged are of the same length? if so, should it be checked here or in the caller or somewhere else altogether?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good �point. Added a check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It checks the length deep down the stack https://github.com/eto-ai/lance/blob/a6e788c37ea41f37cf0c5b993e041f3b262dafa6/cpp/src/lance/arrow/utils.cc#L69-L71

It is nice to raise here with clear context for sure.

if (b->num_rows() != batch->num_rows()) {
return ::arrow::Status::Invalid(
"MergeRecordBatches: attempt to merge batches with different length: ",
b->num_rows(),
" != ",
batch->num_rows());
}
ARROW_ASSIGN_OR_RAISE(batch, MergeRecordBatches(batch, b, pool));
}
return batch;
}

::arrow::Result<std::shared_ptr<::arrow::Array>> MergeListArrays(
const std::shared_ptr<::arrow::Array>& lhs,
const std::shared_ptr<::arrow::Array>& rhs,
Expand Down Expand Up @@ -97,18 +119,17 @@ ::arrow::Result<std::shared_ptr<::arrow::StructArray>> MergeStructArrays(
arrays.emplace_back(left_arr);
}

for (auto& field : rhs->struct_type()->fields()) {
auto& name = field->name();
if (lhs->GetFieldByName(name)) {
// We've seen each other before
continue;
}
for (auto name :
rhs->struct_type()->fields() //
| views::filter([&lhs](auto& f) { return !lhs->GetFieldByName(f->name()); }) //
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you feel about functional C++?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wish it is simpler like rust.

It simplifies a few for for loops, but not all , 😮‍💨

| views::transform([](auto& f) { return f->name(); })) {
names.emplace_back(name);
arrays.emplace_back(rhs->GetFieldByName(name));
}
return ::arrow::StructArray::Make(arrays, names);
}

/// Concept of a class that has ".fields()" method.
template <typename T>
concept HasFields = (std::same_as<T, ::arrow::Schema> || std::same_as<T, ::arrow::StructType>);

Expand All @@ -129,12 +150,11 @@ ::arrow::Result<std::vector<std::shared_ptr<::arrow::Field>>> MergeFieldWithChil
fields.emplace_back(merged);
}
}
for (const auto& field : rhs.fields()) {
auto left_field = lhs.GetFieldByName(field->name());
if (!left_field) {
fields.emplace_back(field);
}
}
ranges::actions::insert(
fields,
std::end(fields), //
rhs.fields() //
| views::filter([&lhs](auto& f) { return !lhs.GetFieldByName(f->name()); }));
return fields;
};

Expand Down
11 changes: 11 additions & 0 deletions cpp/src/lance/arrow/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <arrow/result.h>

#include <memory>
#include <vector>

#include "lance/format/schema.h"

Expand All @@ -31,6 +32,16 @@ ::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> MergeRecordBatches(
const std::shared_ptr<::arrow::RecordBatch>& rhs,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

/// Merge a list of record batches that represent the different columns of the same rows,
/// into a single record batch.
///
/// \param batches A list of record batches. Must have the same length.
/// \param pool memory pool.
/// \return the merged record batch. Or nullptr if batches is empty.
::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> MergeRecordBatches(
const std::vector<std::shared_ptr<::arrow::RecordBatch>>& batches,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

::arrow::Result<std::shared_ptr<::arrow::StructArray>> MergeStructArrays(
const std::shared_ptr<::arrow::StructArray>& lhs,
const std::shared_ptr<::arrow::StructArray>& rhs,
Expand Down
30 changes: 30 additions & 0 deletions cpp/src/lance/arrow/utils_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,34 @@ TEST_CASE("Test merge extension types") {
auto result = MergeSchema(*::arrow::schema({::arrow::field("ann", lance::testing::box2d())}),
*::arrow::schema({::arrow::field("ann", lance::testing::image())}));
CHECK(result.status().IsInvalid());
}

TEST_CASE("Merge record batches") {
const int kNumRows = 3;
auto batch1 =
::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col1", ::arrow::int32())}),
kNumRows,
{ToArray({1, 2, 3}).ValueOrDie()});
auto batch2 =
::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col2", ::arrow::uint32())}),
kNumRows,
{ToArray({10, 20, 30}).ValueOrDie()});
auto batch3 =
::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col3", ::arrow::utf8())}),
kNumRows,
{ToArray({"1", "2", "3"}).ValueOrDie()});
std::vector<std::shared_ptr<::arrow::RecordBatch>> batches({batch1, batch2, batch3});

auto merged = lance::arrow::MergeRecordBatches(batches).ValueOrDie();
CHECK(merged->num_rows() == kNumRows);

auto expected =
::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col1", ::arrow::int32()),
::arrow::field("col2", ::arrow::uint32()),
::arrow::field("col3", ::arrow::utf8())}),
kNumRows,
{ToArray({1, 2, 3}).ValueOrDie(),
ToArray({10, 20, 30}).ValueOrDie(),
ToArray({"1", "2", "3"}).ValueOrDie()});
CHECK(merged->Equals(*expected));
}
5 changes: 3 additions & 2 deletions cpp/src/lance/io/exec/limit_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ TEST_CASE("Read limit multiple times") {
auto infile = make_shared<arrow::io::BufferReader>(sink->Finish().ValueOrDie());
auto reader = std::make_shared<lance::io::FileReader>(infile);
CHECK(reader->Open().ok());
auto scan = lance::io::exec::Scan::Make(reader, std::make_shared<Schema>(reader->schema()), 100)
.ValueOrDie();
auto scan =
lance::io::exec::Scan::Make({{reader, std::make_shared<Schema>(reader->schema())}}, 100)
.ValueOrDie();
auto counter = std::make_shared<Counter>(5, 10);
auto limit = Limit(counter, std::move(scan));
auto batch = limit.Next().ValueOrDie();
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/lance/io/exec/project.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ::arrow::Result<std::unique_ptr<Project>> Project::Make(
/// Take -> (optionally) Limit -> Filter -> Scan
ARROW_ASSIGN_OR_RAISE(auto filter_scan_schema, schema.Project(scan_options->filter));
ARROW_ASSIGN_OR_RAISE(auto filter_scan_node,
Scan::Make(reader, filter_scan_schema, scan_options->batch_size));
Scan::Make({{reader, filter_scan_schema}}, scan_options->batch_size));
ARROW_ASSIGN_OR_RAISE(child, Filter::Make(scan_options->filter, std::move(filter_scan_node)));
if (counter) {
ARROW_ASSIGN_OR_RAISE(child, Limit::Make(counter, std::move(child)));
Expand All @@ -64,7 +64,8 @@ ::arrow::Result<std::unique_ptr<Project>> Project::Make(
ARROW_ASSIGN_OR_RAISE(child, Take::Make(reader, take_schema, std::move(child)));
} else {
/// (*optionally) Limit -> Scan
ARROW_ASSIGN_OR_RAISE(child, Scan::Make(reader, projected_schema, scan_options->batch_size));
ARROW_ASSIGN_OR_RAISE(
child, Scan::Make({{std::move(reader), projected_schema}}, scan_options->batch_size));
if (counter) {
ARROW_ASSIGN_OR_RAISE(child, Limit::Make(counter, std::move(child)));
}
Expand Down
72 changes: 55 additions & 17 deletions cpp/src/lance/io/exec/scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,90 @@

#include "lance/io/exec/scan.h"

#include <fmt/format.h>

#include <memory>

#include "lance/arrow/utils.h"
#include "lance/format/metadata.h"
#include "lance/io/reader.h"

namespace lance::io::exec {

::arrow::Result<std::unique_ptr<Scan>> Scan::Make(std::shared_ptr<FileReader> reader,
std::shared_ptr<lance::format::Schema> schema,
constexpr int kMinimalIOThreads = 4;

::arrow::Result<std::unique_ptr<Scan>> Scan::Make(const std::vector<FileReaderWithSchema>& readers,
int64_t batch_size) {
auto scan = std::unique_ptr<Scan>(new Scan(reader, schema, batch_size));
if (reader->metadata().num_batches() == 0) {
if (readers.empty()) {
return ::arrow::Status::Invalid("Scan::Make: can not accept zero readers");
}
if (std::get<0>(readers[0])->metadata().num_batches() == 0) {
return ::arrow::Status::IOError("Can not open Scan on empty file");
}
scan->current_batch_page_length_ = reader->metadata().GetBatchLength(0);
return scan;
return std::unique_ptr<Scan>(new Scan(readers, batch_size));
}

Scan::Scan(std::shared_ptr<FileReader> reader,
std::shared_ptr<lance::format::Schema> schema,
int64_t batch_size)
: reader_(std::move(reader)), schema_(std::move(schema)), batch_size_(batch_size) {}
Scan::Scan(const std::vector<FileReaderWithSchema>& readers, int64_t batch_size)
: readers_(std::begin(readers), std::end(readers)),
batch_size_(batch_size),
current_batch_page_length_(std::get<0>(readers_[0])->metadata().GetBatchLength(0)) {}

::arrow::Result<ScanBatch> Scan::Next() {
assert(!readers_.empty());
int32_t offset;
int32_t batch_id;

auto& first_reader = std::get<0>(readers_[0]);
{
// Make the plan to how much data to read next.
std::lock_guard guard(lock_);
batch_id = current_batch_id_;
offset = current_offset_;
current_offset_ += batch_size_;
if (current_offset_ >= current_batch_page_length_) {
current_batch_id_++;
current_offset_ = 0;
if (current_batch_id_ < reader_->metadata().num_batches()) {
current_batch_page_length_ = reader_->metadata().GetBatchLength(current_batch_id_);
if (current_batch_id_ < first_reader->metadata().num_batches()) {
current_batch_page_length_ = first_reader->metadata().GetBatchLength(current_batch_id_);
}
}
// Lock released after scope.
}
if (batch_id >= reader_->metadata().num_batches()) {

if (batch_id >= first_reader->metadata().num_batches()) {
// Reach EOF
return ScanBatch::Null();
}

ARROW_ASSIGN_OR_RAISE(auto batch, reader_->ReadBatch(*schema_, batch_id, offset, batch_size_));
if (::arrow::GetCpuThreadPoolCapacity() < kMinimalIOThreads) {
// Keep a minimal number of threads, preventing live lock on low CPU count (<=2) machines,
// i.e., Github Action runners.
ARROW_RETURN_NOT_OK(::arrow::SetCpuThreadPoolCapacity(kMinimalIOThreads));
}
auto executor = ::arrow::internal::GetCpuThreadPool();
std::vector<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>> futs;
for (auto [reader, schema] : readers_) {
ARROW_ASSIGN_OR_RAISE(
auto fut,
executor->Submit(
[batch_id, offset](
std::shared_ptr<lance::io::FileReader> r,
std::shared_ptr<lance::format::Schema> s,
auto batch_size) -> ::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> {
ARROW_ASSIGN_OR_RAISE(auto batch, r->ReadBatch(*s, batch_id, offset, batch_size));
return batch;
},
reader,
schema,
batch_size_));
futs.emplace_back(std::move(fut));
}

std::vector<std::shared_ptr<::arrow::RecordBatch>> batches;
for (auto& fut : futs) {
ARROW_ASSIGN_OR_RAISE(auto& b, fut.result());
batches.emplace_back(b);
}

ARROW_ASSIGN_OR_RAISE(auto batch, lance::arrow::MergeRecordBatches(batches));
return ScanBatch{
batch,
batch_id,
Expand All @@ -70,7 +106,9 @@ ::arrow::Result<ScanBatch> Scan::Next() {
}

::arrow::Status Scan::Seek(int32_t offset) {
ARROW_ASSIGN_OR_RAISE(auto batch_and_offset, reader_->metadata().LocateBatch(offset));
assert(!readers_.empty());
auto& reader = std::get<0>(readers_[0]);
ARROW_ASSIGN_OR_RAISE(auto batch_and_offset, reader->metadata().LocateBatch(offset));
current_batch_id_ = std::get<0>(batch_and_offset);
current_offset_ = std::get<1>(batch_and_offset);
return ::arrow::Status::OK();
Expand Down
30 changes: 18 additions & 12 deletions cpp/src/lance/io/exec/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include <memory>
#include <mutex>
#include <tuple>
#include <vector>

#include "lance/io/exec/base.h"

Expand All @@ -36,21 +38,28 @@ namespace lance::io::exec {
/// Leaf scan node.
class Scan : public ExecNode {
public:
using FileReaderWithSchema =
std::tuple<std::shared_ptr<FileReader>, std::shared_ptr<lance::format::Schema>>;

/// Factory method.
static ::arrow::Result<std::unique_ptr<Scan>> Make(std::shared_ptr<FileReader> reader,
std::shared_ptr<lance::format::Schema> schema,
int64_t batch_size);
///
/// \param readers a vector of the tuples of `[reader, schema]`, including opened file reader
/// and projection schema.
/// \param batch_size batch size.
/// \return a Scan node if succeed.
static ::arrow::Result<std::unique_ptr<Scan>> Make(
const std::vector<FileReaderWithSchema>& readers, int64_t batch_size);

Scan() = delete;

virtual ~Scan() = default;
~Scan() override = default;

constexpr Type type() const override { return Type::kScan; }

/// Returns the next available batch in the file. Or returns nullptr if EOF.
::arrow::Result<ScanBatch> Next() override;

const lance::format::Schema& schema();

/// Debug String
std::string ToString() const override;

/// Seek to a particular row.
Expand All @@ -63,15 +72,12 @@ class Scan : public ExecNode {
private:
/// Constructor
///
/// \param reader An opened file reader.
/// \param schema The scan schema, used to select column to scan.
/// \param readers A vector of opened readers with the projected schema.
/// \param batch_size scan batch size.
Scan(std::shared_ptr<FileReader> reader,
std::shared_ptr<lance::format::Schema> schema,
Scan(const std::vector<FileReaderWithSchema>& readers,
int64_t batch_size);

const std::shared_ptr<FileReader> reader_;
const std::shared_ptr<lance::format::Schema> schema_;
std::vector<FileReaderWithSchema> readers_;
const int64_t batch_size_;

/// Keep track of the progress.
Expand Down
Loading