Skip to content

Commit

Permalink
Make FileReader pass schema as shared_ptr to Project (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Aug 14, 2022
1 parent b5b2992 commit 90a276f
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 17 deletions.
1 change: 1 addition & 0 deletions cpp/src/lance/io/limit_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "lance/arrow/stl.h"
#include "lance/arrow/writer.h"
#include "lance/format/schema.h"
#include "lance/io/reader.h"

TEST_CASE("LIMIT 100") {
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/lance/io/project.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,34 @@

namespace lance::io {

Project::Project(std::shared_ptr<format::Schema> dataset_schema,
std::shared_ptr<format::Schema> projected_schema,
Project::Project(std::shared_ptr<format::Schema> projected_schema,
std::shared_ptr<format::Schema> scan_schema,
std::unique_ptr<Filter> filter,
std::optional<int32_t> limit,
int32_t offset)
: dataset_schema_(dataset_schema),
projected_schema_(projected_schema),
: projected_schema_(projected_schema),
scan_schema_(scan_schema),
filter_(std::move(filter)),
limit_(limit.has_value() ? new Limit(limit.value(), offset) : nullptr) {}

::arrow::Result<std::unique_ptr<Project>> Project::Make(
std::shared_ptr<format::Schema> schema,
const format::Schema& schema,
std::shared_ptr<::arrow::dataset::ScanOptions> scan_options,
std::optional<int32_t> limit,
int32_t offset) {
ARROW_ASSIGN_OR_RAISE(auto filter, Filter::Make(*schema, scan_options->filter));
ARROW_ASSIGN_OR_RAISE(auto filter, Filter::Make(schema, scan_options->filter));
auto projected_arrow_schema = scan_options->projected_schema;
if (projected_arrow_schema->num_fields() == 0) {
projected_arrow_schema = scan_options->dataset_schema;
}
ARROW_ASSIGN_OR_RAISE(auto projected_schema, schema->Project(*projected_arrow_schema));
ARROW_ASSIGN_OR_RAISE(auto projected_schema, schema.Project(*projected_arrow_schema));
auto scan_schema = projected_schema;
if (filter) {
// Remove the columns in filter from the project schema, to avoid duplicated scan
ARROW_ASSIGN_OR_RAISE(scan_schema, projected_schema->Exclude(filter->schema()));
}
return std::unique_ptr<Project>(
new Project(schema, projected_schema, scan_schema, std::move(filter), limit, offset));
new Project(projected_schema, scan_schema, std::move(filter), limit, offset));
}

const std::shared_ptr<format::Schema>& Project::schema() const { return projected_schema_; }
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/lance/io/project.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Project {
/// \return Project if success. Returns the error status otherwise.
///
static ::arrow::Result<std::unique_ptr<Project>> Make(
std::shared_ptr<format::Schema> schema,
const format::Schema& schema,
std::shared_ptr<::arrow::dataset::ScanOptions> scan_options,
std::optional<int32_t> limit = std::nullopt,
int32_t offset = 0);
Expand All @@ -68,14 +68,12 @@ class Project {
bool CanParallelScan() const;

private:
Project(std::shared_ptr<format::Schema> dataset_schema,
std::shared_ptr<format::Schema> projected_schema,
Project(std::shared_ptr<format::Schema> projected_schema,
std::shared_ptr<format::Schema> scan_schema,
std::unique_ptr<Filter> filter,
std::optional<int32_t> limit = std::nullopt,
int32_t offset = 0);

std::shared_ptr<format::Schema> dataset_schema_;
std::shared_ptr<format::Schema> projected_schema_;
/// scan_schema_ equals to projected_schema_ - filters_.schema()
/// It includes the columns that are not read from the filters yet.
Expand Down
5 changes: 1 addition & 4 deletions cpp/src/lance/io/record_batch_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ RecordBatchReader::RecordBatchReader(const RecordBatchReader& other) noexcept
options_(other.options_),
limit_(other.limit_),
offset_(other.offset_),
schema_(other.schema_),
project_(other.project_),
thread_pool_(other.thread_pool_),
current_batch_(int(other.current_batch_)) {}
Expand All @@ -62,15 +61,13 @@ RecordBatchReader::RecordBatchReader(RecordBatchReader&& other) noexcept
options_(std::move(other.options_)),
limit_(other.limit_),
offset_(other.offset_),
schema_(std::move(other.schema_)),
project_(std::move(other.project_)),
thread_pool_(std::move(other.thread_pool_)),
current_batch_(int(other.current_batch_)),
readahead_queue_(std::move(other.readahead_queue_)) {}

::arrow::Status RecordBatchReader::Open() {
schema_ = std::make_shared<lance::format::Schema>(reader_->schema());
ARROW_ASSIGN_OR_RAISE(project_, Project::Make(schema_, options_, limit_, offset_));
ARROW_ASSIGN_OR_RAISE(project_, Project::Make(reader_->schema(), options_, limit_, offset_));
return ::arrow::Status::OK();
}

Expand Down
1 change: 0 additions & 1 deletion cpp/src/lance/io/record_batch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class RecordBatchReader : ::arrow::RecordBatchReader {
std::shared_ptr<::arrow::dataset::ScanOptions> options_;
std::optional<int64_t> limit_ = std::nullopt;
int64_t offset_ = 0;
std::shared_ptr<lance::format::Schema> schema_;
/// Projection over the dataset.
std::shared_ptr<Project> project_;

Expand Down

0 comments on commit 90a276f

Please sign in to comment.