From 90a276f8bb4d0ba39d471dc740b10da8096c2362 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 14 Aug 2022 10:02:41 -0700 Subject: [PATCH] Make FileReader pass schema as shared_ptr to Project (#100) --- cpp/src/lance/io/limit_test.cc | 1 + cpp/src/lance/io/project.cc | 14 ++++++-------- cpp/src/lance/io/project.h | 6 ++---- cpp/src/lance/io/record_batch_reader.cc | 5 +---- cpp/src/lance/io/record_batch_reader.h | 1 - 5 files changed, 10 insertions(+), 17 deletions(-) diff --git a/cpp/src/lance/io/limit_test.cc b/cpp/src/lance/io/limit_test.cc index db63ce241e..45465284d8 100644 --- a/cpp/src/lance/io/limit_test.cc +++ b/cpp/src/lance/io/limit_test.cc @@ -25,6 +25,7 @@ #include "lance/arrow/stl.h" #include "lance/arrow/writer.h" +#include "lance/format/schema.h" #include "lance/io/reader.h" TEST_CASE("LIMIT 100") { diff --git a/cpp/src/lance/io/project.cc b/cpp/src/lance/io/project.cc index 31b68cf6e1..6da6534b4d 100644 --- a/cpp/src/lance/io/project.cc +++ b/cpp/src/lance/io/project.cc @@ -26,36 +26,34 @@ namespace lance::io { -Project::Project(std::shared_ptr dataset_schema, - std::shared_ptr projected_schema, +Project::Project(std::shared_ptr projected_schema, std::shared_ptr scan_schema, std::unique_ptr filter, std::optional limit, int32_t offset) - : dataset_schema_(dataset_schema), - projected_schema_(projected_schema), + : projected_schema_(projected_schema), scan_schema_(scan_schema), filter_(std::move(filter)), limit_(limit.has_value() ? new Limit(limit.value(), offset) : nullptr) {} ::arrow::Result> Project::Make( - std::shared_ptr schema, + const format::Schema& schema, std::shared_ptr<::arrow::dataset::ScanOptions> scan_options, std::optional limit, int32_t offset) { - ARROW_ASSIGN_OR_RAISE(auto filter, Filter::Make(*schema, scan_options->filter)); + ARROW_ASSIGN_OR_RAISE(auto filter, Filter::Make(schema, scan_options->filter)); auto projected_arrow_schema = scan_options->projected_schema; if (projected_arrow_schema->num_fields() == 0) { projected_arrow_schema = scan_options->dataset_schema; } - ARROW_ASSIGN_OR_RAISE(auto projected_schema, schema->Project(*projected_arrow_schema)); + ARROW_ASSIGN_OR_RAISE(auto projected_schema, schema.Project(*projected_arrow_schema)); auto scan_schema = projected_schema; if (filter) { // Remove the columns in filter from the project schema, to avoid duplicated scan ARROW_ASSIGN_OR_RAISE(scan_schema, projected_schema->Exclude(filter->schema())); } return std::unique_ptr( - new Project(schema, projected_schema, scan_schema, std::move(filter), limit, offset)); + new Project(projected_schema, scan_schema, std::move(filter), limit, offset)); } const std::shared_ptr& Project::schema() const { return projected_schema_; } diff --git a/cpp/src/lance/io/project.h b/cpp/src/lance/io/project.h index 6c69591bce..873f6aab46 100644 --- a/cpp/src/lance/io/project.h +++ b/cpp/src/lance/io/project.h @@ -46,7 +46,7 @@ class Project { /// \return Project if success. Returns the error status otherwise. /// static ::arrow::Result> Make( - std::shared_ptr schema, + const format::Schema& schema, std::shared_ptr<::arrow::dataset::ScanOptions> scan_options, std::optional limit = std::nullopt, int32_t offset = 0); @@ -68,14 +68,12 @@ class Project { bool CanParallelScan() const; private: - Project(std::shared_ptr dataset_schema, - std::shared_ptr projected_schema, + Project(std::shared_ptr projected_schema, std::shared_ptr scan_schema, std::unique_ptr filter, std::optional limit = std::nullopt, int32_t offset = 0); - std::shared_ptr dataset_schema_; std::shared_ptr projected_schema_; /// scan_schema_ equals to projected_schema_ - filters_.schema() /// It includes the columns that are not read from the filters yet. diff --git a/cpp/src/lance/io/record_batch_reader.cc b/cpp/src/lance/io/record_batch_reader.cc index 646f2d92df..b516197718 100644 --- a/cpp/src/lance/io/record_batch_reader.cc +++ b/cpp/src/lance/io/record_batch_reader.cc @@ -52,7 +52,6 @@ RecordBatchReader::RecordBatchReader(const RecordBatchReader& other) noexcept options_(other.options_), limit_(other.limit_), offset_(other.offset_), - schema_(other.schema_), project_(other.project_), thread_pool_(other.thread_pool_), current_batch_(int(other.current_batch_)) {} @@ -62,15 +61,13 @@ RecordBatchReader::RecordBatchReader(RecordBatchReader&& other) noexcept options_(std::move(other.options_)), limit_(other.limit_), offset_(other.offset_), - schema_(std::move(other.schema_)), project_(std::move(other.project_)), thread_pool_(std::move(other.thread_pool_)), current_batch_(int(other.current_batch_)), readahead_queue_(std::move(other.readahead_queue_)) {} ::arrow::Status RecordBatchReader::Open() { - schema_ = std::make_shared(reader_->schema()); - ARROW_ASSIGN_OR_RAISE(project_, Project::Make(schema_, options_, limit_, offset_)); + ARROW_ASSIGN_OR_RAISE(project_, Project::Make(reader_->schema(), options_, limit_, offset_)); return ::arrow::Status::OK(); } diff --git a/cpp/src/lance/io/record_batch_reader.h b/cpp/src/lance/io/record_batch_reader.h index 3f9fec1657..ccf5d30d25 100644 --- a/cpp/src/lance/io/record_batch_reader.h +++ b/cpp/src/lance/io/record_batch_reader.h @@ -74,7 +74,6 @@ class RecordBatchReader : ::arrow::RecordBatchReader { std::shared_ptr<::arrow::dataset::ScanOptions> options_; std::optional limit_ = std::nullopt; int64_t offset_ = 0; - std::shared_ptr schema_; /// Projection over the dataset. std::shared_ptr project_;