diff --git a/.github/workflows/cpp.yml b/.github/workflows/cpp.yml index 26356ba217..5651af933e 100644 --- a/.github/workflows/cpp.yml +++ b/.github/workflows/cpp.yml @@ -14,7 +14,7 @@ jobs: run: working-directory: ./cpp steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: ccache uses: hendrikmuhs/ccache-action@v1 - name: Run lint @@ -40,7 +40,7 @@ jobs: run: working-directory: ./cpp steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install dependencies run: | brew update diff --git a/cpp/src/lance/arrow/utils.cc b/cpp/src/lance/arrow/utils.cc index 4cd6cf90c8..78554c1569 100644 --- a/cpp/src/lance/arrow/utils.cc +++ b/cpp/src/lance/arrow/utils.cc @@ -22,12 +22,15 @@ #include #include -#include +#include +#include #include #include "lance/arrow/file_lance.h" #include "lance/arrow/type.h" +namespace views = ranges::views; + namespace lance::arrow { ::arrow::Result> MergeRecordBatches( @@ -40,6 +43,25 @@ ::arrow::Result> MergeRecordBatches( return ::arrow::RecordBatch::FromStructArray(struct_arr); } +::arrow::Result> MergeRecordBatches( + const std::vector>& batches, ::arrow::MemoryPool* pool) { + if (batches.empty()) { + return nullptr; + } + auto batch = batches[0]; + for (auto& b : batches | views::drop(1)) { + if (b->num_rows() != batch->num_rows()) { + return ::arrow::Status::Invalid( + "MergeRecordBatches: attempt to merge batches with different length: ", + b->num_rows(), + " != ", + batch->num_rows()); + } + ARROW_ASSIGN_OR_RAISE(batch, MergeRecordBatches(batch, b, pool)); + } + return batch; +} + ::arrow::Result> MergeListArrays( const std::shared_ptr<::arrow::Array>& lhs, const std::shared_ptr<::arrow::Array>& rhs, @@ -97,18 +119,17 @@ ::arrow::Result> MergeStructArrays( arrays.emplace_back(left_arr); } - for (auto& field : rhs->struct_type()->fields()) { - auto& name = field->name(); - if (lhs->GetFieldByName(name)) { - // We've seen each other before - continue; - } + for (auto name : + rhs->struct_type()->fields() // + | views::filter([&lhs](auto& f) { return !lhs->GetFieldByName(f->name()); }) // + | views::transform([](auto& f) { return f->name(); })) { names.emplace_back(name); arrays.emplace_back(rhs->GetFieldByName(name)); } return ::arrow::StructArray::Make(arrays, names); } +/// Concept of a class that has ".fields()" method. template concept HasFields = (std::same_as || std::same_as); @@ -129,12 +150,11 @@ ::arrow::Result>> MergeFieldWithChil fields.emplace_back(merged); } } - for (const auto& field : rhs.fields()) { - auto left_field = lhs.GetFieldByName(field->name()); - if (!left_field) { - fields.emplace_back(field); - } - } + ranges::actions::insert( + fields, + std::end(fields), // + rhs.fields() // + | views::filter([&lhs](auto& f) { return !lhs.GetFieldByName(f->name()); })); return fields; }; diff --git a/cpp/src/lance/arrow/utils.h b/cpp/src/lance/arrow/utils.h index 6d8f4778d8..6f5b4a9d04 100644 --- a/cpp/src/lance/arrow/utils.h +++ b/cpp/src/lance/arrow/utils.h @@ -20,6 +20,7 @@ #include #include +#include #include "lance/format/schema.h" @@ -31,6 +32,16 @@ ::arrow::Result> MergeRecordBatches( const std::shared_ptr<::arrow::RecordBatch>& rhs, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); +/// Merge a list of record batches that represent the different columns of the same rows, +/// into a single record batch. +/// +/// \param batches A list of record batches. Must have the same length. +/// \param pool memory pool. +/// \return the merged record batch. Or nullptr if batches is empty. +::arrow::Result> MergeRecordBatches( + const std::vector>& batches, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + ::arrow::Result> MergeStructArrays( const std::shared_ptr<::arrow::StructArray>& lhs, const std::shared_ptr<::arrow::StructArray>& rhs, diff --git a/cpp/src/lance/arrow/utils_test.cc b/cpp/src/lance/arrow/utils_test.cc index 3c6b416e20..9d4a17846f 100644 --- a/cpp/src/lance/arrow/utils_test.cc +++ b/cpp/src/lance/arrow/utils_test.cc @@ -206,4 +206,34 @@ TEST_CASE("Test merge extension types") { auto result = MergeSchema(*::arrow::schema({::arrow::field("ann", lance::testing::box2d())}), *::arrow::schema({::arrow::field("ann", lance::testing::image())})); CHECK(result.status().IsInvalid()); +} + +TEST_CASE("Merge record batches") { + const int kNumRows = 3; + auto batch1 = + ::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col1", ::arrow::int32())}), + kNumRows, + {ToArray({1, 2, 3}).ValueOrDie()}); + auto batch2 = + ::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col2", ::arrow::uint32())}), + kNumRows, + {ToArray({10, 20, 30}).ValueOrDie()}); + auto batch3 = + ::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col3", ::arrow::utf8())}), + kNumRows, + {ToArray({"1", "2", "3"}).ValueOrDie()}); + std::vector> batches({batch1, batch2, batch3}); + + auto merged = lance::arrow::MergeRecordBatches(batches).ValueOrDie(); + CHECK(merged->num_rows() == kNumRows); + + auto expected = + ::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col1", ::arrow::int32()), + ::arrow::field("col2", ::arrow::uint32()), + ::arrow::field("col3", ::arrow::utf8())}), + kNumRows, + {ToArray({1, 2, 3}).ValueOrDie(), + ToArray({10, 20, 30}).ValueOrDie(), + ToArray({"1", "2", "3"}).ValueOrDie()}); + CHECK(merged->Equals(*expected)); } \ No newline at end of file diff --git a/cpp/src/lance/io/exec/limit_test.cc b/cpp/src/lance/io/exec/limit_test.cc index e540ff225b..ec1cfa755a 100644 --- a/cpp/src/lance/io/exec/limit_test.cc +++ b/cpp/src/lance/io/exec/limit_test.cc @@ -49,8 +49,9 @@ TEST_CASE("Read limit multiple times") { auto infile = make_shared(sink->Finish().ValueOrDie()); auto reader = std::make_shared(infile); CHECK(reader->Open().ok()); - auto scan = lance::io::exec::Scan::Make(reader, std::make_shared(reader->schema()), 100) - .ValueOrDie(); + auto scan = + lance::io::exec::Scan::Make({{reader, std::make_shared(reader->schema())}}, 100) + .ValueOrDie(); auto counter = std::make_shared(5, 10); auto limit = Limit(counter, std::move(scan)); auto batch = limit.Next().ValueOrDie(); diff --git a/cpp/src/lance/io/exec/project.cc b/cpp/src/lance/io/exec/project.cc index 6ffbf76220..f09200f633 100644 --- a/cpp/src/lance/io/exec/project.cc +++ b/cpp/src/lance/io/exec/project.cc @@ -55,7 +55,7 @@ ::arrow::Result> Project::Make( /// Take -> (optionally) Limit -> Filter -> Scan ARROW_ASSIGN_OR_RAISE(auto filter_scan_schema, schema.Project(scan_options->filter)); ARROW_ASSIGN_OR_RAISE(auto filter_scan_node, - Scan::Make(reader, filter_scan_schema, scan_options->batch_size)); + Scan::Make({{reader, filter_scan_schema}}, scan_options->batch_size)); ARROW_ASSIGN_OR_RAISE(child, Filter::Make(scan_options->filter, std::move(filter_scan_node))); if (counter) { ARROW_ASSIGN_OR_RAISE(child, Limit::Make(counter, std::move(child))); @@ -64,7 +64,8 @@ ::arrow::Result> Project::Make( ARROW_ASSIGN_OR_RAISE(child, Take::Make(reader, take_schema, std::move(child))); } else { /// (*optionally) Limit -> Scan - ARROW_ASSIGN_OR_RAISE(child, Scan::Make(reader, projected_schema, scan_options->batch_size)); + ARROW_ASSIGN_OR_RAISE( + child, Scan::Make({{std::move(reader), projected_schema}}, scan_options->batch_size)); if (counter) { ARROW_ASSIGN_OR_RAISE(child, Limit::Make(counter, std::move(child))); } diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index 879223ade0..b96b6c0544 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -14,36 +14,40 @@ #include "lance/io/exec/scan.h" -#include - #include +#include "lance/arrow/utils.h" #include "lance/format/metadata.h" #include "lance/io/reader.h" namespace lance::io::exec { -::arrow::Result> Scan::Make(std::shared_ptr reader, - std::shared_ptr schema, +constexpr int kMinimalIOThreads = 4; + +::arrow::Result> Scan::Make(const std::vector& readers, int64_t batch_size) { - auto scan = std::unique_ptr(new Scan(reader, schema, batch_size)); - if (reader->metadata().num_batches() == 0) { + if (readers.empty()) { + return ::arrow::Status::Invalid("Scan::Make: can not accept zero readers"); + } + if (std::get<0>(readers[0])->metadata().num_batches() == 0) { return ::arrow::Status::IOError("Can not open Scan on empty file"); } - scan->current_batch_page_length_ = reader->metadata().GetBatchLength(0); - return scan; + return std::unique_ptr(new Scan(readers, batch_size)); } -Scan::Scan(std::shared_ptr reader, - std::shared_ptr schema, - int64_t batch_size) - : reader_(std::move(reader)), schema_(std::move(schema)), batch_size_(batch_size) {} +Scan::Scan(const std::vector& readers, int64_t batch_size) + : readers_(std::begin(readers), std::end(readers)), + batch_size_(batch_size), + current_batch_page_length_(std::get<0>(readers_[0])->metadata().GetBatchLength(0)) {} ::arrow::Result Scan::Next() { + assert(!readers_.empty()); int32_t offset; int32_t batch_id; + auto& first_reader = std::get<0>(readers_[0]); { + // Make the plan to how much data to read next. std::lock_guard guard(lock_); batch_id = current_batch_id_; offset = current_offset_; @@ -51,17 +55,49 @@ ::arrow::Result Scan::Next() { if (current_offset_ >= current_batch_page_length_) { current_batch_id_++; current_offset_ = 0; - if (current_batch_id_ < reader_->metadata().num_batches()) { - current_batch_page_length_ = reader_->metadata().GetBatchLength(current_batch_id_); + if (current_batch_id_ < first_reader->metadata().num_batches()) { + current_batch_page_length_ = first_reader->metadata().GetBatchLength(current_batch_id_); } } + // Lock released after scope. } - if (batch_id >= reader_->metadata().num_batches()) { + + if (batch_id >= first_reader->metadata().num_batches()) { // Reach EOF return ScanBatch::Null(); } - ARROW_ASSIGN_OR_RAISE(auto batch, reader_->ReadBatch(*schema_, batch_id, offset, batch_size_)); + if (::arrow::GetCpuThreadPoolCapacity() < kMinimalIOThreads) { + // Keep a minimal number of threads, preventing live lock on low CPU count (<=2) machines, + // i.e., Github Action runners. + ARROW_RETURN_NOT_OK(::arrow::SetCpuThreadPoolCapacity(kMinimalIOThreads)); + } + auto executor = ::arrow::internal::GetCpuThreadPool(); + std::vector<::arrow::Future>> futs; + for (auto [reader, schema] : readers_) { + ARROW_ASSIGN_OR_RAISE( + auto fut, + executor->Submit( + [batch_id, offset]( + std::shared_ptr r, + std::shared_ptr s, + auto batch_size) -> ::arrow::Result> { + ARROW_ASSIGN_OR_RAISE(auto batch, r->ReadBatch(*s, batch_id, offset, batch_size)); + return batch; + }, + reader, + schema, + batch_size_)); + futs.emplace_back(std::move(fut)); + } + + std::vector> batches; + for (auto& fut : futs) { + ARROW_ASSIGN_OR_RAISE(auto& b, fut.result()); + batches.emplace_back(b); + } + + ARROW_ASSIGN_OR_RAISE(auto batch, lance::arrow::MergeRecordBatches(batches)); return ScanBatch{ batch, batch_id, @@ -70,7 +106,9 @@ ::arrow::Result Scan::Next() { } ::arrow::Status Scan::Seek(int32_t offset) { - ARROW_ASSIGN_OR_RAISE(auto batch_and_offset, reader_->metadata().LocateBatch(offset)); + assert(!readers_.empty()); + auto& reader = std::get<0>(readers_[0]); + ARROW_ASSIGN_OR_RAISE(auto batch_and_offset, reader->metadata().LocateBatch(offset)); current_batch_id_ = std::get<0>(batch_and_offset); current_offset_ = std::get<1>(batch_and_offset); return ::arrow::Status::OK(); diff --git a/cpp/src/lance/io/exec/scan.h b/cpp/src/lance/io/exec/scan.h index f5b171a625..2022469967 100644 --- a/cpp/src/lance/io/exec/scan.h +++ b/cpp/src/lance/io/exec/scan.h @@ -20,6 +20,8 @@ #include #include +#include +#include #include "lance/io/exec/base.h" @@ -36,21 +38,28 @@ namespace lance::io::exec { /// Leaf scan node. class Scan : public ExecNode { public: + using FileReaderWithSchema = + std::tuple, std::shared_ptr>; + /// Factory method. - static ::arrow::Result> Make(std::shared_ptr reader, - std::shared_ptr schema, - int64_t batch_size); + /// + /// \param readers a vector of the tuples of `[reader, schema]`, including opened file reader + /// and projection schema. + /// \param batch_size batch size. + /// \return a Scan node if succeed. + static ::arrow::Result> Make( + const std::vector& readers, int64_t batch_size); Scan() = delete; - virtual ~Scan() = default; + ~Scan() override = default; constexpr Type type() const override { return Type::kScan; } + /// Returns the next available batch in the file. Or returns nullptr if EOF. ::arrow::Result Next() override; - const lance::format::Schema& schema(); - + /// Debug String std::string ToString() const override; /// Seek to a particular row. @@ -63,15 +72,12 @@ class Scan : public ExecNode { private: /// Constructor /// - /// \param reader An opened file reader. - /// \param schema The scan schema, used to select column to scan. + /// \param readers A vector of opened readers with the projected schema. /// \param batch_size scan batch size. - Scan(std::shared_ptr reader, - std::shared_ptr schema, + Scan(const std::vector& readers, int64_t batch_size); - const std::shared_ptr reader_; - const std::shared_ptr schema_; + std::vector readers_; const int64_t batch_size_; /// Keep track of the progress. diff --git a/cpp/src/lance/io/exec/scan_test.cc b/cpp/src/lance/io/exec/scan_test.cc index e7655b6a41..f52b6fe472 100644 --- a/cpp/src/lance/io/exec/scan_test.cc +++ b/cpp/src/lance/io/exec/scan_test.cc @@ -25,6 +25,7 @@ #include "lance/io/exec/base.h" #include "lance/testing/io.h" +using lance::arrow::ToArray; using lance::format::Schema; using lance::io::exec::Scan; using lance::testing::MakeReader; @@ -45,7 +46,7 @@ TEST_CASE("Test Scan::Next") { const int32_t kBatchSize = 8; auto scan = - Scan::Make(reader, std::make_shared(reader->schema()), kBatchSize).ValueOrDie(); + Scan::Make({{reader, std::make_shared(reader->schema())}}, kBatchSize).ValueOrDie(); auto batch = scan->Next().ValueOrDie(); CHECK(batch.batch_id == 0); CHECK(batch.batch->num_rows() == kBatchSize); @@ -91,7 +92,7 @@ TEST_CASE("Scan move to the next batch") { // A batch size that is aligned with the page boundary. const int32_t kBatchSize = 4; auto scan = - Scan::Make(reader, std::make_shared(reader->schema()), kBatchSize).ValueOrDie(); + Scan::Make({{reader, std::make_shared(reader->schema())}}, kBatchSize).ValueOrDie(); auto num_batches = 0; while (true) { auto batch = scan->Next().ValueOrDie(); @@ -102,4 +103,28 @@ TEST_CASE("Scan move to the next batch") { CHECK(batch.batch->num_rows() == kBatchSize); } CHECK(num_batches == kPageLength / kBatchSize * kNumBatches); -} \ No newline at end of file +} + +TEST_CASE("Scan with multiple readers") { + auto ints = lance::arrow::ToArray({1, 2, 3, 4, 5}).ValueOrDie(); + auto strs = ToArray({"1", "2", "3", "4", "5"}).ValueOrDie(); + auto int_table = + arrow::Table::Make(arrow::schema({arrow::field("ints", arrow::int32())}), {ints}); + auto strs_table = + arrow::Table::Make(arrow::schema({arrow::field("strs", arrow::utf8())}), {strs}); + + auto ints_reader = MakeReader(int_table).ValueOrDie(); + auto ints_schema = std::make_shared(int_table->schema()); + auto strs_reader = MakeReader(strs_table).ValueOrDie(); + auto strs_schema = std::make_shared(strs_table->schema()); + + auto scan = + Scan::Make({{ints_reader, ints_schema}, {strs_reader, strs_schema}}, 100).ValueOrDie(); + auto expected = arrow::RecordBatch::Make( + arrow::schema({arrow::field("ints", arrow::int32()), arrow::field("strs", arrow::utf8())}), + 5, + {ints, strs}); + auto batch = scan->Next().ValueOrDie(); + + CHECK(batch.batch->Equals(*expected)); +}