diff --git a/cpp/src/lance/arrow/file_lance.cc b/cpp/src/lance/arrow/file_lance.cc index a18549f9e9..8d17dbc45b 100644 --- a/cpp/src/lance/arrow/file_lance.cc +++ b/cpp/src/lance/arrow/file_lance.cc @@ -63,18 +63,8 @@ ::arrow::Result<::arrow::RecordBatchGenerator> LanceFileFormat::ScanBatchesAsync auto reader = std::make_shared(infile); ARROW_RETURN_NOT_OK(reader->Open()); - std::optional limit = std::nullopt; - int64_t offset = 0; - if (options->fragment_scan_options && - options->fragment_scan_options->type_name() == kLanceFormatTypeName) { - auto lance_fragment_scan_options = - std::dynamic_pointer_cast(options->fragment_scan_options); - limit = lance_fragment_scan_options->limit; - offset = lance_fragment_scan_options->offset; - } - - auto batch_reader = lance::io::RecordBatchReader( - reader, options, ::arrow::internal::GetCpuThreadPool(), limit, offset); + auto batch_reader = + lance::io::RecordBatchReader(reader, options, ::arrow::internal::GetCpuThreadPool()); ARROW_RETURN_NOT_OK(batch_reader.Open()); auto generator = ::arrow::RecordBatchGenerator(std::move(batch_reader)); return generator; @@ -105,4 +95,8 @@ ::arrow::Status FileWriteOptions::Validate() const { std::string LanceFragmentScanOptions::type_name() const { return kLanceFormatTypeName; } +bool IsLanceFragmentScanOptions(const ::arrow::dataset::FragmentScanOptions& fso) { + return fso.type_name() == kLanceFormatTypeName; +} + } // namespace lance::arrow \ No newline at end of file diff --git a/cpp/src/lance/arrow/file_lance_ext.h b/cpp/src/lance/arrow/file_lance_ext.h index 4a810b382c..f443f342cf 100644 --- a/cpp/src/lance/arrow/file_lance_ext.h +++ b/cpp/src/lance/arrow/file_lance_ext.h @@ -32,4 +32,7 @@ class LanceFragmentScanOptions : public ::arrow::dataset::FragmentScanOptions { int64_t offset = 0; }; +/// Check if the fragment scan option is LanceFragmentScanOptions. +bool IsLanceFragmentScanOptions(const ::arrow::dataset::FragmentScanOptions& fso); + } // namespace lance::arrow \ No newline at end of file diff --git a/cpp/src/lance/io/exec/filter.h b/cpp/src/lance/io/exec/filter.h index 5f850d4f84..16053a7232 100644 --- a/cpp/src/lance/io/exec/filter.h +++ b/cpp/src/lance/io/exec/filter.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include #include diff --git a/cpp/src/lance/io/exec/project.cc b/cpp/src/lance/io/exec/project.cc index fc7b67c371..d55c75b8cf 100644 --- a/cpp/src/lance/io/exec/project.cc +++ b/cpp/src/lance/io/exec/project.cc @@ -18,6 +18,7 @@ #include #include +#include "lance/arrow/file_lance_ext.h" #include "lance/arrow/utils.h" #include "lance/io/exec/filter.h" #include "lance/io/exec/limit.h" @@ -31,9 +32,7 @@ Project::Project(std::unique_ptr child) : child_(std::move(child)) {} ::arrow::Result> Project::Make( std::shared_ptr reader, - std::shared_ptr<::arrow::dataset::ScanOptions> scan_options, - std::optional limit, - int32_t offset) { + std::shared_ptr<::arrow::dataset::ScanOptions> scan_options) { auto& schema = reader->schema(); auto projected_arrow_schema = scan_options->projected_schema; if (projected_arrow_schema->num_fields() == 0) { @@ -41,6 +40,15 @@ ::arrow::Result> Project::Make( } ARROW_ASSIGN_OR_RAISE(auto projected_schema, schema.Project(*projected_arrow_schema)); + std::optional limit; + int64_t offset; + if (scan_options->fragment_scan_options && + lance::arrow::IsLanceFragmentScanOptions(*scan_options->fragment_scan_options)) { + auto fso = std::dynamic_pointer_cast( + scan_options->fragment_scan_options); + limit = fso->limit; + offset = fso->offset; + } std::unique_ptr child; if (Filter::HasFilter(scan_options->filter)) { /// Build a chain of: diff --git a/cpp/src/lance/io/exec/project.h b/cpp/src/lance/io/exec/project.h index 537ced5ce0..8f55245113 100644 --- a/cpp/src/lance/io/exec/project.h +++ b/cpp/src/lance/io/exec/project.h @@ -45,9 +45,7 @@ class Project : ExecNode { /// static ::arrow::Result> Make( std::shared_ptr reader, - std::shared_ptr<::arrow::dataset::ScanOptions> scan_options, - std::optional limit = std::nullopt, - int32_t offset = 0); + std::shared_ptr<::arrow::dataset::ScanOptions> scan_options); ::arrow::Result Next() override; diff --git a/cpp/src/lance/io/record_batch_reader.cc b/cpp/src/lance/io/record_batch_reader.cc index 24c7222425..062508ea7d 100644 --- a/cpp/src/lance/io/record_batch_reader.cc +++ b/cpp/src/lance/io/record_batch_reader.cc @@ -36,38 +36,25 @@ namespace lance::io { RecordBatchReader::RecordBatchReader(std::shared_ptr reader, std::shared_ptr<::arrow::dataset::ScanOptions> options, - ::arrow::internal::ThreadPool* thread_pool, - std::optional limit, - int64_t offset) noexcept - : reader_(reader), - options_(options), - limit_(limit), - offset_(offset), - thread_pool_(thread_pool) { + ::arrow::internal::ThreadPool* thread_pool) noexcept + : reader_(reader), options_(options), thread_pool_(thread_pool) { assert(thread_pool_); } RecordBatchReader::RecordBatchReader(const RecordBatchReader& other) noexcept : reader_(other.reader_), options_(other.options_), - limit_(other.limit_), - offset_(other.offset_), project_(other.project_), - thread_pool_(other.thread_pool_), - current_batch_(int(other.current_batch_)) {} + thread_pool_(other.thread_pool_) {} RecordBatchReader::RecordBatchReader(RecordBatchReader&& other) noexcept : reader_(std::move(other.reader_)), options_(std::move(other.options_)), - limit_(other.limit_), - offset_(other.offset_), project_(std::move(other.project_)), - thread_pool_(std::move(other.thread_pool_)), - current_batch_(int(other.current_batch_)), - readahead_queue_(std::move(other.readahead_queue_)) {} + thread_pool_(std::move(other.thread_pool_)) {} ::arrow::Status RecordBatchReader::Open() { - ARROW_ASSIGN_OR_RAISE(project_, exec::Project::Make(reader_, options_, limit_, offset_)); + ARROW_ASSIGN_OR_RAISE(project_, exec::Project::Make(reader_, options_)); return ::arrow::Status::OK(); } @@ -81,34 +68,19 @@ ::arrow::Status RecordBatchReader::ReadNext(std::shared_ptr<::arrow::RecordBatch return ::arrow::Status::OK(); } -::arrow::Result> RecordBatchReader::ReadBatch( - [[maybe_unused]] int32_t batch_id) const { +::arrow::Result> RecordBatchReader::ReadBatch() const { ARROW_ASSIGN_OR_RAISE(auto batch, project_->Next()); return batch.batch; } ::arrow::Future> RecordBatchReader::operator()() { - int total_batches = reader_->metadata().num_batches(); - while (static_cast(readahead_queue_.size()) < options_->batch_readahead && - current_batch_ < total_batches) { - int32_t batch_id = current_batch_++; - auto result = thread_pool_->Submit( - [&](int32_t batch_id) { - return ::arrow::Future>::MakeFinished( - this->ReadBatch(batch_id)); - }, - batch_id); - if (!result.ok()) { - return result.status(); - } - readahead_queue_.emplace(std::move(result.ValueOrDie())); + auto result = thread_pool_->Submit([&]() { + return ::arrow::Future>::MakeFinished(this->ReadBatch()); + }); + if (!result.ok()) { + return result.status(); } - if (readahead_queue_.empty()) { - return std::shared_ptr<::arrow::RecordBatch>(nullptr); - } - auto first = readahead_queue_.front(); - readahead_queue_.pop(); - return first; + return ::arrow::Future>(result.ValueOrDie()); } } // namespace lance::io \ No newline at end of file diff --git a/cpp/src/lance/io/record_batch_reader.h b/cpp/src/lance/io/record_batch_reader.h index c75f606372..2daaffa2bc 100644 --- a/cpp/src/lance/io/record_batch_reader.h +++ b/cpp/src/lance/io/record_batch_reader.h @@ -46,9 +46,7 @@ class RecordBatchReader : ::arrow::RecordBatchReader { /// Constructor. RecordBatchReader(std::shared_ptr reader, std::shared_ptr<::arrow::dataset::ScanOptions> options, - ::arrow::internal::ThreadPool* thread_pool_, - std::optional limit = std::nullopt, - int64_t offset = 0) noexcept; + ::arrow::internal::ThreadPool* thread_pool_) noexcept; /// Copy constructor. RecordBatchReader(const RecordBatchReader& other) noexcept; @@ -71,18 +69,13 @@ class RecordBatchReader : ::arrow::RecordBatchReader { private: RecordBatchReader() = delete; - ::arrow::Result> ReadBatch(int32_t batch_id) const; + ::arrow::Result> ReadBatch() const; std::shared_ptr reader_; std::shared_ptr<::arrow::dataset::ScanOptions> options_; - std::optional limit_ = std::nullopt; - int64_t offset_ = 0; /// Projection over the dataset. std::shared_ptr project_; - ::arrow::internal::ThreadPool* thread_pool_; - std::atomic_int32_t current_batch_ = 0; - std::queue<::arrow::Future>> readahead_queue_; }; } // namespace lance::io