Skip to content

Commit

Permalink
PlainDecoder read the leftover
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu committed Sep 2, 2022
1 parent 51d137a commit 6a6adea
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 19 deletions.
12 changes: 5 additions & 7 deletions cpp/src/lance/encodings/plain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,19 @@ class PlainDecoderImpl : public Decoder {

::arrow::Result<std::shared_ptr<::arrow::Array>> ToArray(
int32_t start, std::optional<int32_t> length) const override {
if (!length.has_value()) {
length = length_ - start;
}
if (start + length.value() > length_ || start > length_) {
auto read_length = std::min(length.value_or(length_), length_ - start);
if (read_length <= 0) {
return ::arrow::Status::IndexError(
fmt::format("{}::ToArray: out of range: start={}, length={}, page_length={}\n",
ToString(),
start,
length.value(),
length.value_or(-1),
length_));
}
auto byte_length = type_->byte_width();
ARROW_ASSIGN_OR_RAISE(
auto buf, infile_->ReadAt(position_ + start * byte_length, length.value() * byte_length));
return std::make_shared<ArrayType>(type_, length.value(), buf);
auto buf, infile_->ReadAt(position_ + start * byte_length, read_length * byte_length));
return std::make_shared<ArrayType>(type_, read_length, buf);
}

::arrow::Result<std::shared_ptr<::arrow::Array>> Take(
Expand Down
20 changes: 19 additions & 1 deletion cpp/src/lance/encodings/plain_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,24 @@ TEST_CASE("Test Write Int32 array") {
}
}

TEST_CASE("Read not full batch") {
auto arr = lance::arrow::ToArray({1, 2, 3, 4, 5, 6, 7, 8}).ValueOrDie();
CHECK(arr->length() == 8);

auto sink = arrow::io::BufferOutputStream::Create().ValueOrDie();
lance::encodings::PlainEncoder encoder(sink);
auto offset = encoder.Write(arr).ValueOrDie();

// Read it back
auto infile = make_shared<arrow::io::BufferReader>(sink->Finish().ValueOrDie());
lance::encodings::PlainDecoder decoder(infile, arrow::int32());
CHECK(decoder.Init().ok());
decoder.Reset(offset, arr->length());

auto values = decoder.ToArray(6, 8).ValueOrDie();
CHECK(values->Equals(lance::arrow::ToArray({7, 8}).ValueOrDie()));
}

TEST_CASE("Test take plain values") {
arrow::Int32Builder builder;
for (int i = 0; i < 100; i++) {
Expand Down Expand Up @@ -148,4 +166,4 @@ TEST_CASE("Write fixed size list") {
auto arr = builder.Finish().ValueOrDie();

TestWriteFixedSizeArray(arr);
}
}
8 changes: 7 additions & 1 deletion cpp/src/lance/io/exec/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@

#include <memory>
#include <string>
#include <vector>

namespace lance::io::exec {

struct ScanBatch {
std::shared_ptr<::arrow::RecordBatch> batch;
int32_t batch_id;
};

/// I/O execute base node.
///
/// TODO: investigate to adapt Arrow Acero.
Expand All @@ -33,7 +39,7 @@ class ExecNode {
virtual ~ExecNode() = default;

/// Returns the next batch of rows, returns nullptr if EOF.
virtual ::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> Next() = 0;
virtual ::arrow::Result<ScanBatch> Next() = 0;

virtual std::string ToString() const = 0;
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/lance/io/exec/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ ::arrow::Result<std::unique_ptr<Filter>> Filter::Make(const lance::format::Schem
return std::unique_ptr<Filter>(new Filter(filter_schema, filter));
}

::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> Filter::Next() { return child_->Next(); }
::arrow::Result<ScanBatch> Filter::Next() { return child_->Next(); }

::arrow::Result<
std::tuple<std::shared_ptr<::arrow::Int32Array>, std::shared_ptr<::arrow::RecordBatch>>>
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/lance/io/exec/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Filter : public ExecNode {
std::tuple<std::shared_ptr<::arrow::Int32Array>, std::shared_ptr<::arrow::RecordBatch>>>
Execute(std::shared_ptr<::arrow::RecordBatch>) const;

::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> Next() override;
::arrow::Result<ScanBatch> Next() override;

::arrow::Result<
std::tuple<std::shared_ptr<::arrow::Int32Array>, std::shared_ptr<::arrow::RecordBatch>>>
Expand Down
39 changes: 37 additions & 2 deletions cpp/src/lance/io/exec/scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,52 @@

#include <memory>

#include "lance/format/metadata.h"
#include "lance/io/reader.h"

namespace lance::io::exec {

::arrow::Result<std::unique_ptr<Scan>> Scan::Make(std::shared_ptr<FileReader> reader,
std::shared_ptr<lance::format::Schema> schema,
int64_t batch_size) {
auto scan = std::unique_ptr<Scan>(new Scan(reader, schema, batch_size));
if (reader->metadata().num_batches() == 0) {
return ::arrow::Status::IOError("Can not open Scan on empty file");
}
scan->current_batch_page_length_ = reader->metadata().GetBatchLength(0);
return scan;
}

Scan::Scan(std::shared_ptr<FileReader> reader,
std::shared_ptr<lance::format::Schema> schema,
int64_t batch_size)
: reader_(std::move(reader)), schema_(std::move(schema)), batch_size_(batch_size) {}

::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> Scan::Next() {
return reader_->ReadBatch(*schema_, current_batch_id_, current_offset_, batch_size_);
::arrow::Result<ScanBatch> Scan::Next() {
int32_t offset;
int32_t batch_id;

{
std::lock_guard guard(lock_);
batch_id = current_batch_id_;
offset = current_offset_;
current_offset_ += batch_size_;
if (current_offset_ > current_batch_page_length_) {
current_batch_id_++;
current_offset_ = 0;
if (current_batch_id_ < reader_->metadata().num_batches()) {
current_batch_page_length_ = reader_->metadata().GetBatchLength(current_batch_id_);
}
}
}

ARROW_ASSIGN_OR_RAISE(auto batch, reader_->ReadBatch(*schema_, batch_id, offset, batch_size_));
return ScanBatch{
batch,
batch_id,
};
}

std::string Scan::ToString() const { return "Scan"; }

} // namespace lance::io::exec
23 changes: 17 additions & 6 deletions cpp/src/lance/io/exec/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ namespace lance::io::exec {
/// Leaf scan node.
class Scan : public ExecNode {
public:
/// Factory method.
static ::arrow::Result<std::unique_ptr<Scan>> Make(std::shared_ptr<FileReader> reader,
std::shared_ptr<lance::format::Schema> schema,
int64_t batch_size);

Scan() = delete;

virtual ~Scan() = default;

/// Returns the next available batch in the file. Or returns nullptr if EOF.
::arrow::Result<ScanBatch> Next() override;

std::string ToString() const override;

private:
/// Constructor
///
/// \param reader An opened file reader.
Expand All @@ -45,12 +60,6 @@ class Scan : public ExecNode {
std::shared_ptr<lance::format::Schema> schema,
int64_t batch_size);

virtual ~Scan() = default;

/// Returns the next available batch. Or returns nullptr if EOF.
::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> Next() override;

private:
const std::shared_ptr<FileReader> reader_;
/// The projected schema to scan.
const std::shared_ptr<lance::format::Schema> schema_;
Expand All @@ -61,6 +70,8 @@ class Scan : public ExecNode {
int32_t current_batch_id_ = 0;
/// Offset in the batch.
int32_t current_offset_ = 0;
///
int32_t current_batch_page_length_ = 0;
};

} // namespace lance::io::exec
56 changes: 56 additions & 0 deletions cpp/src/lance/io/exec/scan_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "lance/io/exec/scan.h"

#include <arrow/table.h>

#include <catch2/catch_test_macros.hpp>
#include <numeric>
#include <vector>

#include "lance/arrow/stl.h"
#include "lance/format/schema.h"
#include "lance/io/exec/base.h"
#include "lance/testing/io.h"

using lance::format::Schema;
using lance::io::exec::Scan;
using lance::testing::MakeReader;

TEST_CASE("Test Scan::Next") {
std::vector<int32_t> ints(20);
std::iota(ints.begin(), ints.end(), 0);
auto schema = ::arrow::schema({
::arrow::field("ints", ::arrow::int32()),
});
auto chunked_arrs = ::arrow::ChunkedArray::Make({lance::arrow::ToArray(ints).ValueOrDie(),
lance::arrow::ToArray(ints).ValueOrDie()})
.ValueOrDie();
auto tab = ::arrow::Table::Make(schema, {chunked_arrs});

CHECK(tab->column(0)->num_chunks() == 2);
auto reader = MakeReader(tab).ValueOrDie();

auto scan = Scan::Make(reader, std::make_shared<Schema>(reader->schema()), 8).ValueOrDie();
auto batch = scan->Next().ValueOrDie();
CHECK(batch.batch_id == 0);
CHECK(batch.batch->num_rows() == 8);
batch = scan->Next().ValueOrDie();
CHECK(batch.batch_id == 0);
CHECK(batch.batch->num_rows() == 8);
batch = scan->Next().ValueOrDie();
CHECK(batch.batch_id == 0);
CHECK(batch.batch->num_rows() == 4);
}

0 comments on commit 6a6adea

Please sign in to comment.