Skip to content

Commit

Permalink
Refactor I/O exec nodes (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Sep 4, 2022
1 parent 9e61510 commit 728df90
Show file tree
Hide file tree
Showing 39 changed files with 1,041 additions and 506 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ set(lance_objects
$<TARGET_OBJECTS:encodings>
$<TARGET_OBJECTS:format>
$<TARGET_OBJECTS:io>
$<TARGET_OBJECTS:lance_io_exec>
)

add_subdirectory(src)
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/lance/arrow/file_lance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
#include "lance/arrow/file_lance_ext.h"
#include "lance/arrow/reader.h"
#include "lance/format/schema.h"
#include "lance/io/filter.h"
#include "lance/io/project.h"
#include "lance/io/exec/filter.h"
#include "lance/io/exec/project.h"
#include "lance/io/reader.h"
#include "lance/io/record_batch_reader.h"
#include "lance/io/writer.h"
Expand Down
18 changes: 8 additions & 10 deletions cpp/src/lance/encodings/binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,28 +98,26 @@ ::arrow::Result<std::shared_ptr<::arrow::Scalar>> VarBinaryDecoder<T>::GetScalar
template <ArrowType T>
::arrow::Result<std::shared_ptr<::arrow::Array>> VarBinaryDecoder<T>::ToArray(
int32_t start, std::optional<int32_t> length) const {
if (!length.has_value()) {
length = length_ - start;
}
if (start + *length > length_) {
auto len = std::min(length.value_or(length_), length_ - start);
if (len < 0) {
return ::arrow::Status::IndexError(
fmt::format("VarBinaryDecoder::ToArray: out of range: start={} length={} page_length={}\n",
start,
*length,
length.value_or(-1),
length_));
}

auto offsets_buf =
infile_->ReadAt(position_ + start * sizeof(OffsetCType), (*length + 1) * sizeof(OffsetCType));
infile_->ReadAt(position_ + start * sizeof(OffsetCType), (len + 1) * sizeof(OffsetCType));
if (!offsets_buf.ok()) {
return ::arrow::Status::IOError(
fmt::format("VarBinaryDecoder::ToArray: failed to read offset: start={}, length={}: {}",
start,
*length,
len,
offsets_buf.status().message()));
}
auto positions = std::make_shared<typename ::arrow::TypeTraits<OffsetType>::ArrayType>(
*length + 1, *offsets_buf);
auto positions =
std::make_shared<typename ::arrow::TypeTraits<OffsetType>::ArrayType>(len + 1, *offsets_buf);
auto start_offset = positions->Value(0);

::arrow::Int32Builder builder;
Expand All @@ -133,7 +131,7 @@ ::arrow::Result<std::shared_ptr<::arrow::Array>> VarBinaryDecoder<T>::ToArray(
auto value_offsets = std::static_pointer_cast<::arrow::Int32Array>(*value_offsets_buf);
auto read_length = positions->Value(positions->length() - 1) - start_offset;
ARROW_ASSIGN_OR_RAISE(auto data_buf, infile_->ReadAt(start_offset, read_length));
return std::make_shared<ArrayType>(*length, value_offsets->values(), data_buf);
return std::make_shared<ArrayType>(len, value_offsets->values(), data_buf);
}

} // namespace lance::encodings
16 changes: 7 additions & 9 deletions cpp/src/lance/encodings/plain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,19 @@ class PlainDecoderImpl : public Decoder {

::arrow::Result<std::shared_ptr<::arrow::Array>> ToArray(
int32_t start, std::optional<int32_t> length) const override {
if (!length.has_value()) {
length = length_ - start;
}
if (start + length.value() > length_ || start > length_) {
auto len = std::min(length.value_or(length_), length_ - start);
if (len <= 0) {
return ::arrow::Status::IndexError(
fmt::format("{}::ToArray: out of range: start={}, length={}, page_length={}\n",
ToString(),
start,
length.value(),
length.value_or(-1),
length_));
}
auto byte_length = type_->byte_width();
ARROW_ASSIGN_OR_RAISE(
auto buf, infile_->ReadAt(position_ + start * byte_length, length.value() * byte_length));
return std::make_shared<ArrayType>(type_, length.value(), buf);
auto byte_width = type_->byte_width();
ARROW_ASSIGN_OR_RAISE(auto buf,
infile_->ReadAt(position_ + start * byte_width, len * byte_width));
return std::make_shared<ArrayType>(type_, len, buf);
}

::arrow::Result<std::shared_ptr<::arrow::Array>> Take(
Expand Down
21 changes: 20 additions & 1 deletion cpp/src/lance/encodings/plain_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,25 @@ TEST_CASE("Test Write Int32 array") {
}
}

TEST_CASE("Do not read full batch") {
auto arr = lance::arrow::ToArray({10, 20, 30, 40, 50, 60, 70, 80}).ValueOrDie();
CHECK(arr->length() == 8);

auto sink = arrow::io::BufferOutputStream::Create().ValueOrDie();
lance::encodings::PlainEncoder encoder(sink);
auto offset = encoder.Write(arr).ValueOrDie();

// Read it back
auto infile = make_shared<arrow::io::BufferReader>(sink->Finish().ValueOrDie());
lance::encodings::PlainDecoder decoder(infile, arrow::int32());
CHECK(decoder.Init().ok());
decoder.Reset(offset, arr->length());

// Decode arr[6:8]
auto values = decoder.ToArray(6, 8).ValueOrDie();
CHECK(values->Equals(lance::arrow::ToArray({70, 80}).ValueOrDie()));
}

TEST_CASE("Test take plain values") {
arrow::Int32Builder builder;
for (int i = 0; i < 100; i++) {
Expand Down Expand Up @@ -148,4 +167,4 @@ TEST_CASE("Write fixed size list") {
auto arr = builder.Finish().ValueOrDie();

TestWriteFixedSizeArray(arr);
}
}
6 changes: 3 additions & 3 deletions cpp/src/lance/format/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -512,15 +512,15 @@ ::arrow::Result<std::shared_ptr<Schema>> Schema::Project(
return Project(columns);
}

::arrow::Result<std::shared_ptr<Schema>> Schema::Exclude(std::shared_ptr<Schema> other) const {
::arrow::Result<std::shared_ptr<Schema>> Schema::Exclude(const Schema& other) const {
/// An visitor to remove fields in place.
class SchemaExcludeVisitor : public FieldVisitor {
public:
SchemaExcludeVisitor(std::shared_ptr<Schema> excluded) : excluded_(excluded) {}

::arrow::Status Visit(std::shared_ptr<Field> field) override {
for (auto& field : field->fields()) {
ARROW_RETURN_NOT_OK(Visit(field));
for (auto& f : field->fields()) {
ARROW_RETURN_NOT_OK(Visit(f));
}
auto excluded_field = excluded_->GetField(field->id());
if (!excluded_field) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/lance/format/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class Schema final {
///
/// \param other the schema to be excluded. It must to be a strict subset of this schema.
/// \return The newly created schema, excluding any column in "other".
::arrow::Result<std::shared_ptr<Schema>> Exclude(std::shared_ptr<Schema> other) const;
::arrow::Result<std::shared_ptr<Schema>> Exclude(const Schema& other) const;

/// Add a new parent field.
void AddField(std::shared_ptr<Field> f);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/lance/format/schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ TEST_CASE("Exclude schema") {
auto original = lance::format::Schema(arrow_schema);
auto projected = original.Project({"split", "annotations.box"}).ValueOrDie();
INFO("Projected schema: " << projected->ToString());
auto excluded = original.Exclude(projected).ValueOrDie();
auto excluded = original.Exclude(*projected).ValueOrDie();

auto excluded_arrow_schema = ::arrow::schema(
{::arrow::field("pk", ::arrow::utf8()),
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/lance/format/visitors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

namespace lance::format {

::arrow::Status FieldVisitor::VisitSchema(std::shared_ptr<Schema> schema) {
for (auto& field : schema->fields()) {
::arrow::Status FieldVisitor::VisitSchema(const Schema& schema) {
for (auto& field : schema.fields()) {
ARROW_RETURN_NOT_OK(Visit(field));
}
return ::arrow::Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/lance/format/visitors.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class FieldVisitor {

virtual ::arrow::Status Visit(std::shared_ptr<Field> field) = 0;

::arrow::Status VisitSchema(std::shared_ptr<Schema> schema);
::arrow::Status VisitSchema(const Schema& schema);
};

/// A Visitor to convert Field / Schema to an arrow::Schema
Expand Down
14 changes: 4 additions & 10 deletions cpp/src/lance/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,15 @@

# Internal I/O

add_subdirectory(exec)

add_library(
io
OBJECT
endian.h
filter.cc
filter.h
limit.cc
limit.h

pb.cc
pb.h
project.cc
project.h
reader.cc
reader.h
record_batch_reader.cc
Expand All @@ -36,9 +33,6 @@ add_library(
)
target_include_directories(io SYSTEM PRIVATE ${Protobuf_INCLUDE_DIR})
# Depend on lance::format to generate protobuf
add_dependencies(io format)
add_dependencies(io format lance_io_exec)

add_lance_test(filter_test)
add_lance_test(limit_test)
add_lance_test(project_test)
add_lance_test(reader_test)
22 changes: 22 additions & 0 deletions cpp/src/lance/io/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
add_library(
lance_io_exec
OBJECT
base.h
filter.cc
filter.h
limit.cc
limit.h
project.cc
project.h
scan.cc
scan.h
take.cc
take.h
base.cc)
target_include_directories(lance_io_exec SYSTEM PRIVATE ${Protobuf_INCLUDE_DIR})
add_dependencies(lance_io_exec format)

add_lance_test(filter_test)
add_lance_test(project_test)
add_lance_test(limit_test)
add_lance_test(scan_test)
33 changes: 33 additions & 0 deletions cpp/src/lance/io/exec/base.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "lance/io/exec/base.h"

#include <memory>
namespace lance::io::exec {

ScanBatch ScanBatch::Null() { return ScanBatch(nullptr, -1); }

ScanBatch::ScanBatch(std::shared_ptr<::arrow::RecordBatch> records, int32_t bid)
: batch(records), batch_id(bid) {}

ScanBatch ScanBatch::Filtered(std::shared_ptr<::arrow::RecordBatch> records,
int32_t batch_id,
std::shared_ptr<::arrow::Int32Array> indices) {
auto batch = ScanBatch(records, batch_id);
batch.indices = std::move(indices);
return batch;
}

} // namespace lance::io::exec
108 changes: 108 additions & 0 deletions cpp/src/lance/io/exec/base.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <arrow/record_batch.h>
#include <arrow/result.h>

#include <memory>
#include <string>
#include <vector>

namespace lance::io::exec {

/// Emitted results from each ExecNode
struct ScanBatch {
/// The resulted RecordBatch.
///
/// If it is zero-sized batch, there is no valid values in this batch.
/// It it is nullptr, it reaches the end of the scan.
std::shared_ptr<::arrow::RecordBatch> batch;

/// The Id of the batch this result belongs to.
int32_t batch_id = -1;

/// Indices returned from the filter.
std::shared_ptr<::arrow::Int32Array> indices;

/// Return a null ScanBatch indicates EOF.
static ScanBatch Null();

/// Constructor with a record batch and batch id.
///
/// \param records A record batch of values to return
/// \param batch_id the id of the batch
static ScanBatch Filtered(std::shared_ptr<::arrow::RecordBatch> records,
int32_t batch_id,
std::shared_ptr<::arrow::Int32Array> indices);

/// Construct an empty response.
ScanBatch() = default;

/// Constructor with a record batch and batch id.
///
/// \param records A record batch of values to return
/// \param batch_id the id of the batch
ScanBatch(std::shared_ptr<::arrow::RecordBatch> records, int32_t batch_id);

/// Returns True if the end of file is reached.
bool eof() const { return !batch; }
};

/// I/O execute base node.
///
/// TODO: investigate to adapt Arrow Acero.
/// https://arrow.apache.org/docs/cpp/streaming_execution.html
///
/// A exec plan is usually starts with Project and ends with Scan.
///
/// \example
/// A few examples of the exec plan tree for common queries.
///
/// SELECT * FROM dataset
/// Project (*) --> Scan (*)
///
/// SELECT a, b FROM dataset WHERE c = 123
/// Project (a, b) -> Take(a,b) -> Filter(c=123) -> Scan(c)
///
/// SELECT a, b FROM dataset LIMIT 200 OFFSET 5000
/// Project (a, b) -> Limit(200, 5000) -> Scan(a, b)
///
/// SELECT a, b, c FROM dataset WHERE c = 123 LIMIT 50 OFFSET 200
/// Project (a, b, c) -> Take(a, b) -> Limit(50, 200) -> Filter(c=123) -> Scan(c)
class ExecNode {
public:
enum Type {
kScan = 0,
kProject = 1,
kFilter = 2,
kLimit = 3,
kTake = 4,
kTableScan = 256,
};

ExecNode() = default;

virtual ~ExecNode() = default;

virtual constexpr Type type() const = 0;

/// Returns the next batch of rows, returns nullptr if EOF.
virtual ::arrow::Result<ScanBatch> Next() = 0;

[[nodiscard]] virtual std::string ToString() const = 0;
};

} // namespace lance::io::exec
Loading

0 comments on commit 728df90

Please sign in to comment.