Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor I/O exec nodes #136

Merged
merged 24 commits into from
Sep 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ set(lance_objects
$<TARGET_OBJECTS:encodings>
$<TARGET_OBJECTS:format>
$<TARGET_OBJECTS:io>
$<TARGET_OBJECTS:lance_io_exec>
)

add_subdirectory(src)
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/lance/arrow/file_lance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
#include "lance/arrow/file_lance_ext.h"
#include "lance/arrow/reader.h"
#include "lance/format/schema.h"
#include "lance/io/filter.h"
#include "lance/io/project.h"
#include "lance/io/exec/filter.h"
#include "lance/io/exec/project.h"
#include "lance/io/reader.h"
#include "lance/io/record_batch_reader.h"
#include "lance/io/writer.h"
Expand Down
18 changes: 8 additions & 10 deletions cpp/src/lance/encodings/binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,28 +98,26 @@ ::arrow::Result<std::shared_ptr<::arrow::Scalar>> VarBinaryDecoder<T>::GetScalar
template <ArrowType T>
::arrow::Result<std::shared_ptr<::arrow::Array>> VarBinaryDecoder<T>::ToArray(
int32_t start, std::optional<int32_t> length) const {
if (!length.has_value()) {
length = length_ - start;
}
if (start + *length > length_) {
auto len = std::min(length.value_or(length_), length_ - start);
if (len < 0) {
return ::arrow::Status::IndexError(
fmt::format("VarBinaryDecoder::ToArray: out of range: start={} length={} page_length={}\n",
start,
*length,
length.value_or(-1),
length_));
}

auto offsets_buf =
infile_->ReadAt(position_ + start * sizeof(OffsetCType), (*length + 1) * sizeof(OffsetCType));
infile_->ReadAt(position_ + start * sizeof(OffsetCType), (len + 1) * sizeof(OffsetCType));
if (!offsets_buf.ok()) {
return ::arrow::Status::IOError(
fmt::format("VarBinaryDecoder::ToArray: failed to read offset: start={}, length={}: {}",
start,
*length,
len,
offsets_buf.status().message()));
}
auto positions = std::make_shared<typename ::arrow::TypeTraits<OffsetType>::ArrayType>(
*length + 1, *offsets_buf);
auto positions =
std::make_shared<typename ::arrow::TypeTraits<OffsetType>::ArrayType>(len + 1, *offsets_buf);
auto start_offset = positions->Value(0);

::arrow::Int32Builder builder;
Expand All @@ -133,7 +131,7 @@ ::arrow::Result<std::shared_ptr<::arrow::Array>> VarBinaryDecoder<T>::ToArray(
auto value_offsets = std::static_pointer_cast<::arrow::Int32Array>(*value_offsets_buf);
auto read_length = positions->Value(positions->length() - 1) - start_offset;
ARROW_ASSIGN_OR_RAISE(auto data_buf, infile_->ReadAt(start_offset, read_length));
return std::make_shared<ArrayType>(*length, value_offsets->values(), data_buf);
return std::make_shared<ArrayType>(len, value_offsets->values(), data_buf);
}

} // namespace lance::encodings
16 changes: 7 additions & 9 deletions cpp/src/lance/encodings/plain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,19 @@ class PlainDecoderImpl : public Decoder {

::arrow::Result<std::shared_ptr<::arrow::Array>> ToArray(
int32_t start, std::optional<int32_t> length) const override {
if (!length.has_value()) {
length = length_ - start;
}
if (start + length.value() > length_ || start > length_) {
auto len = std::min(length.value_or(length_), length_ - start);
if (len <= 0) {
return ::arrow::Status::IndexError(
fmt::format("{}::ToArray: out of range: start={}, length={}, page_length={}\n",
ToString(),
start,
length.value(),
length.value_or(-1),
changhiskhan marked this conversation as resolved.
Show resolved Hide resolved
length_));
}
auto byte_length = type_->byte_width();
ARROW_ASSIGN_OR_RAISE(
auto buf, infile_->ReadAt(position_ + start * byte_length, length.value() * byte_length));
return std::make_shared<ArrayType>(type_, length.value(), buf);
auto byte_width = type_->byte_width();
ARROW_ASSIGN_OR_RAISE(auto buf,
infile_->ReadAt(position_ + start * byte_width, len * byte_width));
return std::make_shared<ArrayType>(type_, len, buf);
}

::arrow::Result<std::shared_ptr<::arrow::Array>> Take(
Expand Down
21 changes: 20 additions & 1 deletion cpp/src/lance/encodings/plain_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,25 @@ TEST_CASE("Test Write Int32 array") {
}
}

TEST_CASE("Do not read full batch") {
auto arr = lance::arrow::ToArray({10, 20, 30, 40, 50, 60, 70, 80}).ValueOrDie();
CHECK(arr->length() == 8);

auto sink = arrow::io::BufferOutputStream::Create().ValueOrDie();
lance::encodings::PlainEncoder encoder(sink);
auto offset = encoder.Write(arr).ValueOrDie();

// Read it back
auto infile = make_shared<arrow::io::BufferReader>(sink->Finish().ValueOrDie());
lance::encodings::PlainDecoder decoder(infile, arrow::int32());
CHECK(decoder.Init().ok());
decoder.Reset(offset, arr->length());

// Decode arr[6:8]
auto values = decoder.ToArray(6, 8).ValueOrDie();
CHECK(values->Equals(lance::arrow::ToArray({70, 80}).ValueOrDie()));
}

TEST_CASE("Test take plain values") {
arrow::Int32Builder builder;
for (int i = 0; i < 100; i++) {
Expand Down Expand Up @@ -148,4 +167,4 @@ TEST_CASE("Write fixed size list") {
auto arr = builder.Finish().ValueOrDie();

TestWriteFixedSizeArray(arr);
}
}
6 changes: 3 additions & 3 deletions cpp/src/lance/format/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -512,15 +512,15 @@ ::arrow::Result<std::shared_ptr<Schema>> Schema::Project(
return Project(columns);
}

::arrow::Result<std::shared_ptr<Schema>> Schema::Exclude(std::shared_ptr<Schema> other) const {
::arrow::Result<std::shared_ptr<Schema>> Schema::Exclude(const Schema& other) const {
/// An visitor to remove fields in place.
class SchemaExcludeVisitor : public FieldVisitor {
public:
SchemaExcludeVisitor(std::shared_ptr<Schema> excluded) : excluded_(excluded) {}

::arrow::Status Visit(std::shared_ptr<Field> field) override {
for (auto& field : field->fields()) {
ARROW_RETURN_NOT_OK(Visit(field));
for (auto& f : field->fields()) {
ARROW_RETURN_NOT_OK(Visit(f));
}
auto excluded_field = excluded_->GetField(field->id());
if (!excluded_field) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/lance/format/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class Schema final {
///
/// \param other the schema to be excluded. It must to be a strict subset of this schema.
/// \return The newly created schema, excluding any column in "other".
::arrow::Result<std::shared_ptr<Schema>> Exclude(std::shared_ptr<Schema> other) const;
::arrow::Result<std::shared_ptr<Schema>> Exclude(const Schema& other) const;

/// Add a new parent field.
void AddField(std::shared_ptr<Field> f);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/lance/format/schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ TEST_CASE("Exclude schema") {
auto original = lance::format::Schema(arrow_schema);
auto projected = original.Project({"split", "annotations.box"}).ValueOrDie();
INFO("Projected schema: " << projected->ToString());
auto excluded = original.Exclude(projected).ValueOrDie();
auto excluded = original.Exclude(*projected).ValueOrDie();

auto excluded_arrow_schema = ::arrow::schema(
{::arrow::field("pk", ::arrow::utf8()),
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/lance/format/visitors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

namespace lance::format {

::arrow::Status FieldVisitor::VisitSchema(std::shared_ptr<Schema> schema) {
for (auto& field : schema->fields()) {
::arrow::Status FieldVisitor::VisitSchema(const Schema& schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the interface here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are a few places that does not hold std::shared_ptr<Schema> but Schema&. Changing the interface to const Schema& schema makes it available for more implementations; in the meantime, it avoids a new copy / reference counting for shared_ptr<> as a side effect.

for (auto& field : schema.fields()) {
ARROW_RETURN_NOT_OK(Visit(field));
}
return ::arrow::Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/lance/format/visitors.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class FieldVisitor {

virtual ::arrow::Status Visit(std::shared_ptr<Field> field) = 0;

::arrow::Status VisitSchema(std::shared_ptr<Schema> schema);
::arrow::Status VisitSchema(const Schema& schema);
};

/// A Visitor to convert Field / Schema to an arrow::Schema
Expand Down
14 changes: 4 additions & 10 deletions cpp/src/lance/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,15 @@

# Internal I/O

add_subdirectory(exec)

add_library(
io
OBJECT
endian.h
filter.cc
filter.h
limit.cc
limit.h

pb.cc
pb.h
project.cc
project.h
reader.cc
reader.h
record_batch_reader.cc
Expand All @@ -36,9 +33,6 @@ add_library(
)
target_include_directories(io SYSTEM PRIVATE ${Protobuf_INCLUDE_DIR})
# Depend on lance::format to generate protobuf
add_dependencies(io format)
add_dependencies(io format lance_io_exec)

add_lance_test(filter_test)
add_lance_test(limit_test)
add_lance_test(project_test)
add_lance_test(reader_test)
22 changes: 22 additions & 0 deletions cpp/src/lance/io/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
add_library(
lance_io_exec
OBJECT
base.h
filter.cc
filter.h
limit.cc
limit.h
project.cc
project.h
scan.cc
scan.h
take.cc
take.h
base.cc)
target_include_directories(lance_io_exec SYSTEM PRIVATE ${Protobuf_INCLUDE_DIR})
add_dependencies(lance_io_exec format)

add_lance_test(filter_test)
add_lance_test(project_test)
add_lance_test(limit_test)
add_lance_test(scan_test)
33 changes: 33 additions & 0 deletions cpp/src/lance/io/exec/base.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "lance/io/exec/base.h"

#include <memory>
namespace lance::io::exec {

ScanBatch ScanBatch::Null() { return ScanBatch(nullptr, -1); }

ScanBatch::ScanBatch(std::shared_ptr<::arrow::RecordBatch> records, int32_t bid)
: batch(records), batch_id(bid) {}

ScanBatch ScanBatch::Filtered(std::shared_ptr<::arrow::RecordBatch> records,
int32_t batch_id,
std::shared_ptr<::arrow::Int32Array> indices) {
auto batch = ScanBatch(records, batch_id);
batch.indices = std::move(indices);
return batch;
}

} // namespace lance::io::exec
108 changes: 108 additions & 0 deletions cpp/src/lance/io/exec/base.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <arrow/record_batch.h>
#include <arrow/result.h>

#include <memory>
#include <string>
#include <vector>

namespace lance::io::exec {

/// Emitted results from each ExecNode
struct ScanBatch {
/// The resulted RecordBatch.
///
/// If it is zero-sized batch, there is no valid values in this batch.
/// It it is nullptr, it reaches the end of the scan.
std::shared_ptr<::arrow::RecordBatch> batch;

/// The Id of the batch this result belongs to.
int32_t batch_id = -1;

/// Indices returned from the filter.
std::shared_ptr<::arrow::Int32Array> indices;

/// Return a null ScanBatch indicates EOF.
static ScanBatch Null();

/// Constructor with a record batch and batch id.
///
/// \param records A record batch of values to return
/// \param batch_id the id of the batch
static ScanBatch Filtered(std::shared_ptr<::arrow::RecordBatch> records,
int32_t batch_id,
std::shared_ptr<::arrow::Int32Array> indices);

/// Construct an empty response.
ScanBatch() = default;

/// Constructor with a record batch and batch id.
///
/// \param records A record batch of values to return
/// \param batch_id the id of the batch
ScanBatch(std::shared_ptr<::arrow::RecordBatch> records, int32_t batch_id);

/// Returns True if the end of file is reached.
bool eof() const { return !batch; }
};

/// I/O execute base node.
///
/// TODO: investigate to adapt Arrow Acero.
/// https://arrow.apache.org/docs/cpp/streaming_execution.html
///
/// A exec plan is usually starts with Project and ends with Scan.
///
/// \example
/// A few examples of the exec plan tree for common queries.
///
/// SELECT * FROM dataset
/// Project (*) --> Scan (*)
///
/// SELECT a, b FROM dataset WHERE c = 123
/// Project (a, b) -> Take(a,b) -> Filter(c=123) -> Scan(c)
///
/// SELECT a, b FROM dataset LIMIT 200 OFFSET 5000
/// Project (a, b) -> Limit(200, 5000) -> Scan(a, b)
///
/// SELECT a, b, c FROM dataset WHERE c = 123 LIMIT 50 OFFSET 200
/// Project (a, b, c) -> Take(a, b) -> Limit(50, 200) -> Filter(c=123) -> Scan(c)
class ExecNode {
public:
enum Type {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some comments here with:

  1. how a typical tree might look
  2. and if it's not made explicit elsewhere, what are valid child/parent nodes for node types.

kScan = 0,
kProject = 1,
kFilter = 2,
kLimit = 3,
kTake = 4,
kTableScan = 256,
changhiskhan marked this conversation as resolved.
Show resolved Hide resolved
};

ExecNode() = default;

virtual ~ExecNode() = default;

virtual constexpr Type type() const = 0;

/// Returns the next batch of rows, returns nullptr if EOF.
virtual ::arrow::Result<ScanBatch> Next() = 0;

[[nodiscard]] virtual std::string ToString() const = 0;
};

} // namespace lance::io::exec
Loading