From b02e062724036c4faaca9174eb55105a6b4e95b7 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 17:34:03 -0800 Subject: [PATCH 01/26] add merge vectors --- cpp/src/lance/arrow/utils.cc | 12 ++++++++++++ cpp/src/lance/arrow/utils.h | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/cpp/src/lance/arrow/utils.cc b/cpp/src/lance/arrow/utils.cc index 4cd6cf90c8..4c980789a4 100644 --- a/cpp/src/lance/arrow/utils.cc +++ b/cpp/src/lance/arrow/utils.cc @@ -40,6 +40,18 @@ ::arrow::Result> MergeRecordBatches( return ::arrow::RecordBatch::FromStructArray(struct_arr); } +::arrow::Result> MergeRecordBatches( + const std::vector>& batches, ::arrow::MemoryPool* pool) { + if (batches.empty()) { + return nullptr; + } + auto batch = batches[0]; + for (std::size_t i = 1; i < batches.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(batch, MergeRecordBatches(batch, batches[i], pool)); + } + return batch; +} + ::arrow::Result> MergeListArrays( const std::shared_ptr<::arrow::Array>& lhs, const std::shared_ptr<::arrow::Array>& rhs, diff --git a/cpp/src/lance/arrow/utils.h b/cpp/src/lance/arrow/utils.h index 6d8f4778d8..d3853668aa 100644 --- a/cpp/src/lance/arrow/utils.h +++ b/cpp/src/lance/arrow/utils.h @@ -20,6 +20,7 @@ #include #include +#include #include "lance/format/schema.h" @@ -31,6 +32,11 @@ ::arrow::Result> MergeRecordBatches( const std::shared_ptr<::arrow::RecordBatch>& rhs, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); +/// Merge a list of record batches into one. +::arrow::Result> MergeRecordBatches( + const std::vector>& batches, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + ::arrow::Result> MergeStructArrays( const std::shared_ptr<::arrow::StructArray>& lhs, const std::shared_ptr<::arrow::StructArray>& rhs, From cad48cb665e35e95e28042d65d8415107d72d2f1 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 18:18:50 -0800 Subject: [PATCH 02/26] merge mulitple --- cpp/src/lance/arrow/utils.cc | 17 +++++++++-------- cpp/src/lance/arrow/utils.h | 4 ++++ cpp/src/lance/arrow/utils_test.cc | 31 +++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/cpp/src/lance/arrow/utils.cc b/cpp/src/lance/arrow/utils.cc index 4c980789a4..da3912a041 100644 --- a/cpp/src/lance/arrow/utils.cc +++ b/cpp/src/lance/arrow/utils.cc @@ -22,12 +22,15 @@ #include #include -#include +#include +#include #include #include "lance/arrow/file_lance.h" #include "lance/arrow/type.h" +namespace views = ranges::views; + namespace lance::arrow { ::arrow::Result> MergeRecordBatches( @@ -46,8 +49,8 @@ ::arrow::Result> MergeRecordBatches( return nullptr; } auto batch = batches[0]; - for (std::size_t i = 1; i < batches.size(); ++i) { - ARROW_ASSIGN_OR_RAISE(batch, MergeRecordBatches(batch, batches[i], pool)); + for (auto& b : batches | ranges::views::drop(1)) { + ARROW_ASSIGN_OR_RAISE(batch, MergeRecordBatches(batch, b, pool)); } return batch; } @@ -109,12 +112,10 @@ ::arrow::Result> MergeStructArrays( arrays.emplace_back(left_arr); } - for (auto& field : rhs->struct_type()->fields()) { + for (auto& field : rhs->struct_type()->fields() | views::filter([&lhs](auto& f) { + return !lhs->GetFieldByName(f->name()); + })) { auto& name = field->name(); - if (lhs->GetFieldByName(name)) { - // We've seen each other before - continue; - } names.emplace_back(name); arrays.emplace_back(rhs->GetFieldByName(name)); } diff --git a/cpp/src/lance/arrow/utils.h b/cpp/src/lance/arrow/utils.h index d3853668aa..5711121b47 100644 --- a/cpp/src/lance/arrow/utils.h +++ b/cpp/src/lance/arrow/utils.h @@ -33,6 +33,10 @@ ::arrow::Result> MergeRecordBatches( ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); /// Merge a list of record batches into one. +/// +/// \param batches A list of record batches. +/// \param pool memory pool. +/// \return the merged record batch. Or nullptr if batches is empty. ::arrow::Result> MergeRecordBatches( const std::vector>& batches, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); diff --git a/cpp/src/lance/arrow/utils_test.cc b/cpp/src/lance/arrow/utils_test.cc index 3c6b416e20..d031d04f52 100644 --- a/cpp/src/lance/arrow/utils_test.cc +++ b/cpp/src/lance/arrow/utils_test.cc @@ -206,4 +206,35 @@ TEST_CASE("Test merge extension types") { auto result = MergeSchema(*::arrow::schema({::arrow::field("ann", lance::testing::box2d())}), *::arrow::schema({::arrow::field("ann", lance::testing::image())})); CHECK(result.status().IsInvalid()); +} + +TEST_CASE("Merge record batches") { + const int kNumRows = 3; + auto batch1 = + ::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col1", ::arrow::int32())}), + kNumRows, + {ToArray({1, 2, 3}).ValueOrDie()}); + auto batch2 = + ::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col2", ::arrow::uint32())}), + kNumRows, + {ToArray({10, 20, 30}).ValueOrDie()}); + auto batch3 = + ::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col3", ::arrow::utf8())}), + kNumRows, + {ToArray({"1", "2", "3"}).ValueOrDie()}); + std::vector> batches({batch1, batch2, batch3}); + + auto merged = lance::arrow::MergeRecordBatches(batches).ValueOrDie(); + CHECK(merged->num_rows() == kNumRows); + + auto expected = + ::arrow::RecordBatch::Make(::arrow::schema({::arrow::field("col1", ::arrow::int32()), + ::arrow::field("col2", ::arrow::uint32()), + ::arrow::field("col3", ::arrow::utf8())}), + kNumRows, + {ToArray({1, 2, 3}).ValueOrDie(), + ToArray({10, 20, 30}).ValueOrDie(), + ToArray({"1", "2", "3"}).ValueOrDie()}); + fmt::print("Merge : {}\n", merged->ToString()); + CHECK(merged->Equals(*expected)); } \ No newline at end of file From 697b880102309b9ad8b1bafce03c1f24e38e1286 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 18:22:43 -0800 Subject: [PATCH 03/26] add test --- cpp/src/lance/arrow/utils.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/lance/arrow/utils.cc b/cpp/src/lance/arrow/utils.cc index da3912a041..36056d5ad3 100644 --- a/cpp/src/lance/arrow/utils.cc +++ b/cpp/src/lance/arrow/utils.cc @@ -112,10 +112,9 @@ ::arrow::Result> MergeStructArrays( arrays.emplace_back(left_arr); } - for (auto& field : rhs->struct_type()->fields() | views::filter([&lhs](auto& f) { - return !lhs->GetFieldByName(f->name()); - })) { - auto& name = field->name(); + for (auto name : rhs->struct_type()->fields() | views::filter([&lhs](auto& f) { + return !lhs->GetFieldByName(f->name()); + }) | views::transform([](auto& f) { return f->name(); })) { names.emplace_back(name); arrays.emplace_back(rhs->GetFieldByName(name)); } From 7b818de9db57c5ec4e8cea6b911144eb920dfcd7 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 18:33:26 -0800 Subject: [PATCH 04/26] functional merge --- cpp/src/lance/arrow/utils.cc | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/src/lance/arrow/utils.cc b/cpp/src/lance/arrow/utils.cc index 36056d5ad3..1080f961f2 100644 --- a/cpp/src/lance/arrow/utils.cc +++ b/cpp/src/lance/arrow/utils.cc @@ -112,9 +112,10 @@ ::arrow::Result> MergeStructArrays( arrays.emplace_back(left_arr); } - for (auto name : rhs->struct_type()->fields() | views::filter([&lhs](auto& f) { - return !lhs->GetFieldByName(f->name()); - }) | views::transform([](auto& f) { return f->name(); })) { + for (auto name : + rhs->struct_type()->fields() // + | views::filter([&lhs](auto& f) { return !lhs->GetFieldByName(f->name()); }) // + | views::transform([](auto& f) { return f->name(); })) { names.emplace_back(name); arrays.emplace_back(rhs->GetFieldByName(name)); } @@ -141,12 +142,11 @@ ::arrow::Result>> MergeFieldWithChil fields.emplace_back(merged); } } - for (const auto& field : rhs.fields()) { - auto left_field = lhs.GetFieldByName(field->name()); - if (!left_field) { - fields.emplace_back(field); - } - } + ranges::actions::insert( + fields, + std::end(fields), // + rhs.fields() // + | views::filter([&lhs](auto& f) { return !lhs.GetFieldByName(f->name()); })); return fields; }; From d2f28eb78cc90e7adf4d2eaf9480e64d3dbeb929 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 19:04:34 -0800 Subject: [PATCH 05/26] allow scanner node to take mulitple files --- cpp/src/lance/io/exec/scan.cc | 45 +++++++++++++++++++++++------------ cpp/src/lance/io/exec/scan.h | 22 ++++++++++------- 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index 879223ade0..64d9498fe2 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -14,10 +14,9 @@ #include "lance/io/exec/scan.h" -#include - #include +#include "lance/arrow/utils.h" #include "lance/format/metadata.h" #include "lance/io/reader.h" @@ -26,23 +25,31 @@ namespace lance::io::exec { ::arrow::Result> Scan::Make(std::shared_ptr reader, std::shared_ptr schema, int64_t batch_size) { - auto scan = std::unique_ptr(new Scan(reader, schema, batch_size)); - if (reader->metadata().num_batches() == 0) { + return Make({{reader, schema}}, batch_size); +} + +::arrow::Result> Scan::Make(const std::vector& readers, + int64_t batch_size) { + if (readers.empty()) { + return ::arrow::Status::Invalid("Scan::Make: can not accept zero readers"); + } + if (std::get<0>(readers[0])->metadata().num_batches() == 0) { return ::arrow::Status::IOError("Can not open Scan on empty file"); } - scan->current_batch_page_length_ = reader->metadata().GetBatchLength(0); - return scan; + return std::unique_ptr(new Scan(readers, batch_size)); } -Scan::Scan(std::shared_ptr reader, - std::shared_ptr schema, - int64_t batch_size) - : reader_(std::move(reader)), schema_(std::move(schema)), batch_size_(batch_size) {} +Scan::Scan(const std::vector& readers, int64_t batch_size) + : readers_(std::begin(readers), std::end(readers)), + batch_size_(batch_size), + current_batch_page_length_(std::get<0>(readers_[0])->metadata().GetBatchLength(0)) {} ::arrow::Result Scan::Next() { + assert(!readers_.empty()); int32_t offset; int32_t batch_id; + auto& first_reader = std::get<0>(readers_[0]); { std::lock_guard guard(lock_); batch_id = current_batch_id_; @@ -51,17 +58,23 @@ ::arrow::Result Scan::Next() { if (current_offset_ >= current_batch_page_length_) { current_batch_id_++; current_offset_ = 0; - if (current_batch_id_ < reader_->metadata().num_batches()) { - current_batch_page_length_ = reader_->metadata().GetBatchLength(current_batch_id_); + if (current_batch_id_ < first_reader->metadata().num_batches()) { + current_batch_page_length_ = first_reader->metadata().GetBatchLength(current_batch_id_); } } } - if (batch_id >= reader_->metadata().num_batches()) { + if (batch_id >= first_reader->metadata().num_batches()) { // Reach EOF return ScanBatch::Null(); } - ARROW_ASSIGN_OR_RAISE(auto batch, reader_->ReadBatch(*schema_, batch_id, offset, batch_size_)); + std::vector> batches; + for (auto& [reader, schema] : readers_) { + ARROW_ASSIGN_OR_RAISE(auto batch, reader->ReadBatch(*schema, batch_id, offset, batch_size_)); + batches.emplace_back(std::move(batch)); + } + + ARROW_ASSIGN_OR_RAISE(auto batch, lance::arrow::MergeRecordBatches(batches)); return ScanBatch{ batch, batch_id, @@ -70,7 +83,9 @@ ::arrow::Result Scan::Next() { } ::arrow::Status Scan::Seek(int32_t offset) { - ARROW_ASSIGN_OR_RAISE(auto batch_and_offset, reader_->metadata().LocateBatch(offset)); + assert(!readers_.empty()); + auto& reader = std::get<0>(readers_[0]); + ARROW_ASSIGN_OR_RAISE(auto batch_and_offset, reader->metadata().LocateBatch(offset)); current_batch_id_ = std::get<0>(batch_and_offset); current_offset_ = std::get<1>(batch_and_offset); return ::arrow::Status::OK(); diff --git a/cpp/src/lance/io/exec/scan.h b/cpp/src/lance/io/exec/scan.h index f5b171a625..764f3514c1 100644 --- a/cpp/src/lance/io/exec/scan.h +++ b/cpp/src/lance/io/exec/scan.h @@ -20,6 +20,8 @@ #include #include +#include +#include #include "lance/io/exec/base.h" @@ -36,21 +38,28 @@ namespace lance::io::exec { /// Leaf scan node. class Scan : public ExecNode { public: + using FileReaderWithSchema = + std::tuple, std::shared_ptr>; + /// Factory method. static ::arrow::Result> Make(std::shared_ptr reader, std::shared_ptr schema, int64_t batch_size); + /// Factory method. + /// + static ::arrow::Result> Make( + const std::vector& readers, int64_t batch_size); + Scan() = delete; virtual ~Scan() = default; constexpr Type type() const override { return Type::kScan; } + /// Returns the next available batch in the file. Or returns nullptr if EOF. ::arrow::Result Next() override; - const lance::format::Schema& schema(); - std::string ToString() const override; /// Seek to a particular row. @@ -63,15 +72,12 @@ class Scan : public ExecNode { private: /// Constructor /// - /// \param reader An opened file reader. - /// \param schema The scan schema, used to select column to scan. + /// \param readers A vector of opened readers with the projected schema. /// \param batch_size scan batch size. - Scan(std::shared_ptr reader, - std::shared_ptr schema, + Scan(const std::vector& readers, int64_t batch_size); - const std::shared_ptr reader_; - const std::shared_ptr schema_; + std::vector readers_; const int64_t batch_size_; /// Keep track of the progress. From 345a7a343647d37de00ed30777351b8282f0dd2d Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 19:07:12 -0800 Subject: [PATCH 06/26] clean up --- cpp/src/lance/arrow/utils_test.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/lance/arrow/utils_test.cc b/cpp/src/lance/arrow/utils_test.cc index d031d04f52..9d4a17846f 100644 --- a/cpp/src/lance/arrow/utils_test.cc +++ b/cpp/src/lance/arrow/utils_test.cc @@ -235,6 +235,5 @@ TEST_CASE("Merge record batches") { {ToArray({1, 2, 3}).ValueOrDie(), ToArray({10, 20, 30}).ValueOrDie(), ToArray({"1", "2", "3"}).ValueOrDie()}); - fmt::print("Merge : {}\n", merged->ToString()); CHECK(merged->Equals(*expected)); } \ No newline at end of file From 78550871fbb79279cee6bab68b47193da86a60fb Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 19:08:15 -0800 Subject: [PATCH 07/26] simplify --- cpp/src/lance/arrow/utils.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/lance/arrow/utils.cc b/cpp/src/lance/arrow/utils.cc index 1080f961f2..481633ab17 100644 --- a/cpp/src/lance/arrow/utils.cc +++ b/cpp/src/lance/arrow/utils.cc @@ -49,7 +49,7 @@ ::arrow::Result> MergeRecordBatches( return nullptr; } auto batch = batches[0]; - for (auto& b : batches | ranges::views::drop(1)) { + for (auto& b : batches | views::drop(1)) { ARROW_ASSIGN_OR_RAISE(batch, MergeRecordBatches(batch, b, pool)); } return batch; From 93933f18b5fd194ec979b0d19691363ddf017430 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 19:13:25 -0800 Subject: [PATCH 08/26] more docs --- cpp/src/lance/io/exec/scan.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cpp/src/lance/io/exec/scan.h b/cpp/src/lance/io/exec/scan.h index 764f3514c1..3c6fcc0970 100644 --- a/cpp/src/lance/io/exec/scan.h +++ b/cpp/src/lance/io/exec/scan.h @@ -48,18 +48,23 @@ class Scan : public ExecNode { /// Factory method. /// + /// \param readers a vector of the tuples of `[reader, schema]`, including opened file reader + /// and projection schema. + /// \param batch_size batch size. + /// \return a Scan node if succeed. static ::arrow::Result> Make( const std::vector& readers, int64_t batch_size); Scan() = delete; - virtual ~Scan() = default; + ~Scan() override = default; constexpr Type type() const override { return Type::kScan; } /// Returns the next available batch in the file. Or returns nullptr if EOF. ::arrow::Result Next() override; + /// Debug String std::string ToString() const override; /// Seek to a particular row. From e6e6b5e7d149629a95b363fcb42bcd2716eca3b3 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 20:20:45 -0800 Subject: [PATCH 09/26] parallel read --- cpp/src/lance/io/exec/scan.cc | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index 64d9498fe2..c041ce90c9 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -51,6 +51,7 @@ ::arrow::Result Scan::Next() { auto& first_reader = std::get<0>(readers_[0]); { + // Make the plan to how much data to read next. std::lock_guard guard(lock_); batch_id = current_batch_id_; offset = current_offset_; @@ -67,11 +68,32 @@ ::arrow::Result Scan::Next() { // Reach EOF return ScanBatch::Null(); } + + auto executor = ::arrow::internal::GetCpuThreadPool(); + std::vector<::arrow::Future>> futs; + for (auto& [reader, schema] : readers_) { + ARROW_ASSIGN_OR_RAISE( + auto fut, + executor->Submit( + [](auto& r, auto& s, auto batch_id, auto offset, auto batch_size) + -> ::arrow::Result> { + ARROW_ASSIGN_OR_RAISE(auto batch, + r->ReadBatch(*s, batch_id, offset, batch_size)); + return batch; + }, + reader, + schema, + batch_id, + offset, + batch_size_)); + futs.emplace_back(std::move(fut)); + } std::vector> batches; - for (auto& [reader, schema] : readers_) { - ARROW_ASSIGN_OR_RAISE(auto batch, reader->ReadBatch(*schema, batch_id, offset, batch_size_)); - batches.emplace_back(std::move(batch)); + for (auto& fut : futs) { + fut.Wait(); + ARROW_ASSIGN_OR_RAISE(auto b, fut.MoveResult()); + batches.emplace_back(b); } ARROW_ASSIGN_OR_RAISE(auto batch, lance::arrow::MergeRecordBatches(batches)); From 807ef493e286c238e5f079b4e4be287c555195a6 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 20:23:21 -0800 Subject: [PATCH 10/26] clean format --- cpp/src/lance/io/exec/scan.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index c041ce90c9..6e842ebbcc 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -68,7 +68,7 @@ ::arrow::Result Scan::Next() { // Reach EOF return ScanBatch::Null(); } - + auto executor = ::arrow::internal::GetCpuThreadPool(); std::vector<::arrow::Future>> futs; for (auto& [reader, schema] : readers_) { From 0c3fbae4e9adb2dbe057207e2451e69c5841081d Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 20:25:00 -0800 Subject: [PATCH 11/26] better comments --- cpp/src/lance/arrow/utils.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/lance/arrow/utils.cc b/cpp/src/lance/arrow/utils.cc index 481633ab17..d6fa51c604 100644 --- a/cpp/src/lance/arrow/utils.cc +++ b/cpp/src/lance/arrow/utils.cc @@ -122,6 +122,7 @@ ::arrow::Result> MergeStructArrays( return ::arrow::StructArray::Make(arrays, names); } +/// Concept of a class that has ".fields()" method. template concept HasFields = (std::same_as || std::same_as); From 49302c84646f6aa2c24e2d42d575bbd11f6bb656 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 20:39:41 -0800 Subject: [PATCH 12/26] remove old Scan::Make interface --- cpp/src/lance/io/exec/limit_test.cc | 5 +++-- cpp/src/lance/io/exec/project.cc | 5 +++-- cpp/src/lance/io/exec/scan.cc | 33 ++++++++++++----------------- cpp/src/lance/io/exec/scan.h | 5 ----- cpp/src/lance/io/exec/scan_test.cc | 4 ++-- 5 files changed, 21 insertions(+), 31 deletions(-) diff --git a/cpp/src/lance/io/exec/limit_test.cc b/cpp/src/lance/io/exec/limit_test.cc index e540ff225b..ec1cfa755a 100644 --- a/cpp/src/lance/io/exec/limit_test.cc +++ b/cpp/src/lance/io/exec/limit_test.cc @@ -49,8 +49,9 @@ TEST_CASE("Read limit multiple times") { auto infile = make_shared(sink->Finish().ValueOrDie()); auto reader = std::make_shared(infile); CHECK(reader->Open().ok()); - auto scan = lance::io::exec::Scan::Make(reader, std::make_shared(reader->schema()), 100) - .ValueOrDie(); + auto scan = + lance::io::exec::Scan::Make({{reader, std::make_shared(reader->schema())}}, 100) + .ValueOrDie(); auto counter = std::make_shared(5, 10); auto limit = Limit(counter, std::move(scan)); auto batch = limit.Next().ValueOrDie(); diff --git a/cpp/src/lance/io/exec/project.cc b/cpp/src/lance/io/exec/project.cc index 6ffbf76220..f09200f633 100644 --- a/cpp/src/lance/io/exec/project.cc +++ b/cpp/src/lance/io/exec/project.cc @@ -55,7 +55,7 @@ ::arrow::Result> Project::Make( /// Take -> (optionally) Limit -> Filter -> Scan ARROW_ASSIGN_OR_RAISE(auto filter_scan_schema, schema.Project(scan_options->filter)); ARROW_ASSIGN_OR_RAISE(auto filter_scan_node, - Scan::Make(reader, filter_scan_schema, scan_options->batch_size)); + Scan::Make({{reader, filter_scan_schema}}, scan_options->batch_size)); ARROW_ASSIGN_OR_RAISE(child, Filter::Make(scan_options->filter, std::move(filter_scan_node))); if (counter) { ARROW_ASSIGN_OR_RAISE(child, Limit::Make(counter, std::move(child))); @@ -64,7 +64,8 @@ ::arrow::Result> Project::Make( ARROW_ASSIGN_OR_RAISE(child, Take::Make(reader, take_schema, std::move(child))); } else { /// (*optionally) Limit -> Scan - ARROW_ASSIGN_OR_RAISE(child, Scan::Make(reader, projected_schema, scan_options->batch_size)); + ARROW_ASSIGN_OR_RAISE( + child, Scan::Make({{std::move(reader), projected_schema}}, scan_options->batch_size)); if (counter) { ARROW_ASSIGN_OR_RAISE(child, Limit::Make(counter, std::move(child))); } diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index 6e842ebbcc..9283638854 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -22,12 +22,6 @@ namespace lance::io::exec { -::arrow::Result> Scan::Make(std::shared_ptr reader, - std::shared_ptr schema, - int64_t batch_size) { - return Make({{reader, schema}}, batch_size); -} - ::arrow::Result> Scan::Make(const std::vector& readers, int64_t batch_size) { if (readers.empty()) { @@ -72,20 +66,19 @@ ::arrow::Result Scan::Next() { auto executor = ::arrow::internal::GetCpuThreadPool(); std::vector<::arrow::Future>> futs; for (auto& [reader, schema] : readers_) { - ARROW_ASSIGN_OR_RAISE( - auto fut, - executor->Submit( - [](auto& r, auto& s, auto batch_id, auto offset, auto batch_size) - -> ::arrow::Result> { - ARROW_ASSIGN_OR_RAISE(auto batch, - r->ReadBatch(*s, batch_id, offset, batch_size)); - return batch; - }, - reader, - schema, - batch_id, - offset, - batch_size_)); + ARROW_ASSIGN_OR_RAISE(auto fut, + executor->Submit( + [](auto& r, auto& s, auto batch_id, auto offset, auto batch_size) + -> ::arrow::Result> { + ARROW_ASSIGN_OR_RAISE( + auto batch, r->ReadBatch(*s, batch_id, offset, batch_size)); + return batch; + }, + reader, + schema, + batch_id, + offset, + batch_size_)); futs.emplace_back(std::move(fut)); } diff --git a/cpp/src/lance/io/exec/scan.h b/cpp/src/lance/io/exec/scan.h index 3c6fcc0970..2022469967 100644 --- a/cpp/src/lance/io/exec/scan.h +++ b/cpp/src/lance/io/exec/scan.h @@ -41,11 +41,6 @@ class Scan : public ExecNode { using FileReaderWithSchema = std::tuple, std::shared_ptr>; - /// Factory method. - static ::arrow::Result> Make(std::shared_ptr reader, - std::shared_ptr schema, - int64_t batch_size); - /// Factory method. /// /// \param readers a vector of the tuples of `[reader, schema]`, including opened file reader diff --git a/cpp/src/lance/io/exec/scan_test.cc b/cpp/src/lance/io/exec/scan_test.cc index e7655b6a41..ef7a9bcd24 100644 --- a/cpp/src/lance/io/exec/scan_test.cc +++ b/cpp/src/lance/io/exec/scan_test.cc @@ -45,7 +45,7 @@ TEST_CASE("Test Scan::Next") { const int32_t kBatchSize = 8; auto scan = - Scan::Make(reader, std::make_shared(reader->schema()), kBatchSize).ValueOrDie(); + Scan::Make({{reader, std::make_shared(reader->schema())}}, kBatchSize).ValueOrDie(); auto batch = scan->Next().ValueOrDie(); CHECK(batch.batch_id == 0); CHECK(batch.batch->num_rows() == kBatchSize); @@ -91,7 +91,7 @@ TEST_CASE("Scan move to the next batch") { // A batch size that is aligned with the page boundary. const int32_t kBatchSize = 4; auto scan = - Scan::Make(reader, std::make_shared(reader->schema()), kBatchSize).ValueOrDie(); + Scan::Make({{reader, std::make_shared(reader->schema())}}, kBatchSize).ValueOrDie(); auto num_batches = 0; while (true) { auto batch = scan->Next().ValueOrDie(); From a8311ad4c74cc814ab8ef4199b9446a5892ea063 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 20:54:59 -0800 Subject: [PATCH 13/26] remove unnedesary wait --- cpp/src/lance/io/exec/scan.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index 9283638854..0347954490 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -84,7 +84,6 @@ ::arrow::Result Scan::Next() { std::vector> batches; for (auto& fut : futs) { - fut.Wait(); ARROW_ASSIGN_OR_RAISE(auto b, fut.MoveResult()); batches.emplace_back(b); } From bd6cc3836cb79c5f2bcde95936c7dbbe9aa46fe5 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 21:06:06 -0800 Subject: [PATCH 14/26] move local copy of file reader --- cpp/src/lance/io/exec/scan.cc | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index 0347954490..a4c96b5087 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -65,20 +65,20 @@ ::arrow::Result Scan::Next() { auto executor = ::arrow::internal::GetCpuThreadPool(); std::vector<::arrow::Future>> futs; - for (auto& [reader, schema] : readers_) { - ARROW_ASSIGN_OR_RAISE(auto fut, - executor->Submit( - [](auto& r, auto& s, auto batch_id, auto offset, auto batch_size) - -> ::arrow::Result> { - ARROW_ASSIGN_OR_RAISE( - auto batch, r->ReadBatch(*s, batch_id, offset, batch_size)); - return batch; - }, - reader, - schema, - batch_id, - offset, - batch_size_)); + for (auto [reader, schema] : readers_) { + ARROW_ASSIGN_OR_RAISE( + auto fut, + executor->Submit( + [batch_id, offset]( + std::shared_ptr r, + std::shared_ptr s, + auto batch_size) -> ::arrow::Result> { + ARROW_ASSIGN_OR_RAISE(auto batch, r->ReadBatch(*s, batch_id, offset, batch_size)); + return batch; + }, + std::move(reader), + std::move(schema), + batch_size_)); futs.emplace_back(std::move(fut)); } From 31c36bea3b9f103acf3306c9b6defd5e4626cd42 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 22:09:04 -0800 Subject: [PATCH 15/26] move --- cpp/src/lance/io/exec/scan.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index a4c96b5087..c12f481a05 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -76,16 +76,16 @@ ::arrow::Result Scan::Next() { ARROW_ASSIGN_OR_RAISE(auto batch, r->ReadBatch(*s, batch_id, offset, batch_size)); return batch; }, - std::move(reader), - std::move(schema), + reader, + schema, batch_size_)); futs.emplace_back(std::move(fut)); } std::vector> batches; - for (auto& fut : futs) { - ARROW_ASSIGN_OR_RAISE(auto b, fut.MoveResult()); - batches.emplace_back(b); + for (auto fut : futs) { + ARROW_ASSIGN_OR_RAISE(auto b, fut.result()); + batches.emplace_back(std::move(b)); } ARROW_ASSIGN_OR_RAISE(auto batch, lance::arrow::MergeRecordBatches(batches)); From 6f70176a0920a15fe40e9bba1f0912d5c04b4287 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 22:25:28 -0800 Subject: [PATCH 16/26] ref to future --- cpp/src/lance/io/exec/scan.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index c12f481a05..dea4a39fad 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -83,7 +83,7 @@ ::arrow::Result Scan::Next() { } std::vector> batches; - for (auto fut : futs) { + for (auto& fut : futs) { ARROW_ASSIGN_OR_RAISE(auto b, fut.result()); batches.emplace_back(std::move(b)); } From 1b8cc414829302de9f71a9c0fa7c559c1ec91c91 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 22:37:49 -0800 Subject: [PATCH 17/26] ref --- cpp/src/lance/io/exec/scan.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index dea4a39fad..8cb9982390 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -84,7 +84,7 @@ ::arrow::Result Scan::Next() { std::vector> batches; for (auto& fut : futs) { - ARROW_ASSIGN_OR_RAISE(auto b, fut.result()); + ARROW_ASSIGN_OR_RAISE(auto& b, fut.result()); batches.emplace_back(std::move(b)); } From c293d7a55177d708e76b91b6c51d3f1448b10651 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 22:46:11 -0800 Subject: [PATCH 18/26] test --- cpp/src/lance/io/exec/scan.cc | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index 8cb9982390..61489ce6dd 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -57,7 +57,9 @@ ::arrow::Result Scan::Next() { current_batch_page_length_ = first_reader->metadata().GetBatchLength(current_batch_id_); } } + // Lock released after scope. } + if (batch_id >= first_reader->metadata().num_batches()) { // Reach EOF return ScanBatch::Null(); @@ -83,10 +85,10 @@ ::arrow::Result Scan::Next() { } std::vector> batches; - for (auto& fut : futs) { - ARROW_ASSIGN_OR_RAISE(auto& b, fut.result()); - batches.emplace_back(std::move(b)); - } +// for (auto& fut : futs) { +// ARROW_ASSIGN_OR_RAISE(auto& b, fut.result()); +// batches.emplace_back(std::move(b)); +// } ARROW_ASSIGN_OR_RAISE(auto batch, lance::arrow::MergeRecordBatches(batches)); return ScanBatch{ From ad123fec5fd8ff7626e242568d025c1673aa0224 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 7 Nov 2022 22:52:14 -0800 Subject: [PATCH 19/26] set more cpu --- cpp/src/lance/io/exec/scan.cc | 10 +++++----- cpp/src/lance/io/exec/scan_test.cc | 4 ++++ 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index 61489ce6dd..f179f7b47d 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -59,7 +59,7 @@ ::arrow::Result Scan::Next() { } // Lock released after scope. } - + if (batch_id >= first_reader->metadata().num_batches()) { // Reach EOF return ScanBatch::Null(); @@ -85,10 +85,10 @@ ::arrow::Result Scan::Next() { } std::vector> batches; -// for (auto& fut : futs) { -// ARROW_ASSIGN_OR_RAISE(auto& b, fut.result()); -// batches.emplace_back(std::move(b)); -// } + for (auto& fut : futs) { + ARROW_ASSIGN_OR_RAISE(auto& b, fut.result()); + batches.emplace_back(std::move(b)); + } ARROW_ASSIGN_OR_RAISE(auto batch, lance::arrow::MergeRecordBatches(batches)); return ScanBatch{ diff --git a/cpp/src/lance/io/exec/scan_test.cc b/cpp/src/lance/io/exec/scan_test.cc index ef7a9bcd24..3a416d6dd9 100644 --- a/cpp/src/lance/io/exec/scan_test.cc +++ b/cpp/src/lance/io/exec/scan_test.cc @@ -30,6 +30,8 @@ using lance::io::exec::Scan; using lance::testing::MakeReader; TEST_CASE("Test Scan::Next") { + CHECK(::arrow::SetCpuThreadPoolCapacity(8).ok()); + std::vector ints(20); std::iota(ints.begin(), ints.end(), 0); auto schema = ::arrow::schema({ @@ -73,6 +75,8 @@ TEST_CASE("Test Scan::Next") { } TEST_CASE("Scan move to the next batch") { + CHECK(::arrow::SetCpuThreadPoolCapacity(8).ok()); + const auto kNumBatches = 3; const auto kPageLength = 20; ::arrow::ArrayVector arrs; From 3a100cb5b1d5f89076215a95fa049bf2398130b0 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 8 Nov 2022 09:24:37 -0800 Subject: [PATCH 20/26] do not set cpu --- cpp/src/lance/io/exec/scan.cc | 2 +- cpp/src/lance/io/exec/scan_test.cc | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index f179f7b47d..92a6e4d3b0 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -87,7 +87,7 @@ ::arrow::Result Scan::Next() { std::vector> batches; for (auto& fut : futs) { ARROW_ASSIGN_OR_RAISE(auto& b, fut.result()); - batches.emplace_back(std::move(b)); + batches.emplace_back(b); } ARROW_ASSIGN_OR_RAISE(auto batch, lance::arrow::MergeRecordBatches(batches)); diff --git a/cpp/src/lance/io/exec/scan_test.cc b/cpp/src/lance/io/exec/scan_test.cc index 3a416d6dd9..ef7a9bcd24 100644 --- a/cpp/src/lance/io/exec/scan_test.cc +++ b/cpp/src/lance/io/exec/scan_test.cc @@ -30,8 +30,6 @@ using lance::io::exec::Scan; using lance::testing::MakeReader; TEST_CASE("Test Scan::Next") { - CHECK(::arrow::SetCpuThreadPoolCapacity(8).ok()); - std::vector ints(20); std::iota(ints.begin(), ints.end(), 0); auto schema = ::arrow::schema({ @@ -75,8 +73,6 @@ TEST_CASE("Test Scan::Next") { } TEST_CASE("Scan move to the next batch") { - CHECK(::arrow::SetCpuThreadPoolCapacity(8).ok()); - const auto kNumBatches = 3; const auto kPageLength = 20; ::arrow::ArrayVector arrs; From 7a9154c74fdf75757c76ca38d95079d85f2bc233 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 8 Nov 2022 09:32:35 -0800 Subject: [PATCH 21/26] keep minimal thread counts --- cpp/src/lance/io/exec/scan.cc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index 92a6e4d3b0..5c6f747846 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -22,6 +22,8 @@ namespace lance::io::exec { +constexpr int kMinimalIOThreads = 4; + ::arrow::Result> Scan::Make(const std::vector& readers, int64_t batch_size) { if (readers.empty()) { @@ -66,6 +68,11 @@ ::arrow::Result Scan::Next() { } auto executor = ::arrow::internal::GetCpuThreadPool(); + if (::arrow::GetCpuThreadPoolCapacity() < kMinimalIOThreads) { + // Keep a minimal number of threads, preventing live lock on low CPU count (<=2) machines, + // i.e., Github Action runners. + ARROW_RETURN_NOT_OK(::arrow::SetCpuThreadPoolCapacity(kMinimalIOThreads)); + } std::vector<::arrow::Future>> futs; for (auto [reader, schema] : readers_) { ARROW_ASSIGN_OR_RAISE( From 8d40c9b7fca0484852c5085c37d5c928715f342c Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 8 Nov 2022 09:35:53 -0800 Subject: [PATCH 22/26] cleanup --- cpp/src/lance/io/exec/scan.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index 5c6f747846..b96b6c0544 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -67,12 +67,12 @@ ::arrow::Result Scan::Next() { return ScanBatch::Null(); } - auto executor = ::arrow::internal::GetCpuThreadPool(); if (::arrow::GetCpuThreadPoolCapacity() < kMinimalIOThreads) { // Keep a minimal number of threads, preventing live lock on low CPU count (<=2) machines, // i.e., Github Action runners. ARROW_RETURN_NOT_OK(::arrow::SetCpuThreadPoolCapacity(kMinimalIOThreads)); } + auto executor = ::arrow::internal::GetCpuThreadPool(); std::vector<::arrow::Future>> futs; for (auto [reader, schema] : readers_) { ARROW_ASSIGN_OR_RAISE( From 00751404ce499d47d05b824a6e2c9210dbbe7385 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 8 Nov 2022 09:59:26 -0800 Subject: [PATCH 23/26] add test for reading several files --- cpp/src/lance/io/exec/scan_test.cc | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/cpp/src/lance/io/exec/scan_test.cc b/cpp/src/lance/io/exec/scan_test.cc index ef7a9bcd24..563b4660e4 100644 --- a/cpp/src/lance/io/exec/scan_test.cc +++ b/cpp/src/lance/io/exec/scan_test.cc @@ -25,6 +25,7 @@ #include "lance/io/exec/base.h" #include "lance/testing/io.h" +using lance::arrow::ToArray; using lance::format::Schema; using lance::io::exec::Scan; using lance::testing::MakeReader; @@ -102,4 +103,30 @@ TEST_CASE("Scan move to the next batch") { CHECK(batch.batch->num_rows() == kBatchSize); } CHECK(num_batches == kPageLength / kBatchSize * kNumBatches); -} \ No newline at end of file +} + +TEST_CASE("Scan with multiple readers") { + auto ints = lance::arrow::ToArray({1, 2, 3, 4, 5}).ValueOrDie(); + auto strs = ToArray({"1", "2", "3", "4", "5"}).ValueOrDie(); + auto int_table = + arrow::Table::Make(arrow::schema({arrow::field("ints", arrow::int32())}), {ints}); + auto strs_table = + arrow::Table::Make(arrow::schema({arrow::field("strs", arrow::utf8())}), {strs}); + + auto ints_reader = MakeReader(int_table).ValueOrDie(); + auto ints_schema = std::make_shared(int_table->schema()); + auto strs_reader = MakeReader(strs_table).ValueOrDie(); + auto strs_schema = std::make_shared(strs_table->schema()); + + auto scan = Scan::Make({{std::move(ints_reader), std::move(ints_schema)}, + {std::move(strs_reader), std::move(strs_schema)}}, + 100) + .ValueOrDie(); + auto expected = arrow::RecordBatch::Make( + arrow::schema({arrow::field("ints", arrow::int32()), arrow::field("strs", arrow::utf8())}), + 5, + {ints, strs}); + auto batch = scan->Next().ValueOrDie(); + + CHECK(batch.batch->Equals(*expected)); +} From 283adf94fe61ffe2a4f3b3ff5e40a3ed14dc3a75 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 8 Nov 2022 10:25:39 -0800 Subject: [PATCH 24/26] simply test dont move --- cpp/src/lance/io/exec/scan_test.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cpp/src/lance/io/exec/scan_test.cc b/cpp/src/lance/io/exec/scan_test.cc index 563b4660e4..f52b6fe472 100644 --- a/cpp/src/lance/io/exec/scan_test.cc +++ b/cpp/src/lance/io/exec/scan_test.cc @@ -118,10 +118,8 @@ TEST_CASE("Scan with multiple readers") { auto strs_reader = MakeReader(strs_table).ValueOrDie(); auto strs_schema = std::make_shared(strs_table->schema()); - auto scan = Scan::Make({{std::move(ints_reader), std::move(ints_schema)}, - {std::move(strs_reader), std::move(strs_schema)}}, - 100) - .ValueOrDie(); + auto scan = + Scan::Make({{ints_reader, ints_schema}, {strs_reader, strs_schema}}, 100).ValueOrDie(); auto expected = arrow::RecordBatch::Make( arrow::schema({arrow::field("ints", arrow::int32()), arrow::field("strs", arrow::utf8())}), 5, From 638523e1add932a92679a7c5504f6164912f3620 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 8 Nov 2022 11:23:43 -0800 Subject: [PATCH 25/26] address comments --- cpp/src/lance/arrow/utils.cc | 7 +++++++ cpp/src/lance/arrow/utils.h | 5 +++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/cpp/src/lance/arrow/utils.cc b/cpp/src/lance/arrow/utils.cc index d6fa51c604..78554c1569 100644 --- a/cpp/src/lance/arrow/utils.cc +++ b/cpp/src/lance/arrow/utils.cc @@ -50,6 +50,13 @@ ::arrow::Result> MergeRecordBatches( } auto batch = batches[0]; for (auto& b : batches | views::drop(1)) { + if (b->num_rows() != batch->num_rows()) { + return ::arrow::Status::Invalid( + "MergeRecordBatches: attempt to merge batches with different length: ", + b->num_rows(), + " != ", + batch->num_rows()); + } ARROW_ASSIGN_OR_RAISE(batch, MergeRecordBatches(batch, b, pool)); } return batch; diff --git a/cpp/src/lance/arrow/utils.h b/cpp/src/lance/arrow/utils.h index 5711121b47..6f5b4a9d04 100644 --- a/cpp/src/lance/arrow/utils.h +++ b/cpp/src/lance/arrow/utils.h @@ -32,9 +32,10 @@ ::arrow::Result> MergeRecordBatches( const std::shared_ptr<::arrow::RecordBatch>& rhs, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); -/// Merge a list of record batches into one. +/// Merge a list of record batches that represent the different columns of the same rows, +/// into a single record batch. /// -/// \param batches A list of record batches. +/// \param batches A list of record batches. Must have the same length. /// \param pool memory pool. /// \return the merged record batch. Or nullptr if batches is empty. ::arrow::Result> MergeRecordBatches( From 0d61063a4fb3fd2cd547c7ffd43b0da65e87ca43 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 8 Nov 2022 11:44:56 -0800 Subject: [PATCH 26/26] use checkout GHA v3 --- .github/workflows/cpp.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cpp.yml b/.github/workflows/cpp.yml index 26356ba217..5651af933e 100644 --- a/.github/workflows/cpp.yml +++ b/.github/workflows/cpp.yml @@ -14,7 +14,7 @@ jobs: run: working-directory: ./cpp steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: ccache uses: hendrikmuhs/ccache-action@v1 - name: Run lint @@ -40,7 +40,7 @@ jobs: run: working-directory: ./cpp steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install dependencies run: | brew update