Skip to content

Commit

Permalink
handle eof in scan
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu committed Sep 2, 2022
1 parent 6a6adea commit 2b52d6d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
10 changes: 5 additions & 5 deletions cpp/src/lance/encodings/plain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ class PlainDecoderImpl : public Decoder {

::arrow::Result<std::shared_ptr<::arrow::Array>> ToArray(
int32_t start, std::optional<int32_t> length) const override {
auto read_length = std::min(length.value_or(length_), length_ - start);
if (read_length <= 0) {
auto len = std::min(length.value_or(length_), length_ - start);
if (len <= 0) {
return ::arrow::Status::IndexError(
fmt::format("{}::ToArray: out of range: start={}, length={}, page_length={}\n",
ToString(),
Expand All @@ -135,9 +135,9 @@ class PlainDecoderImpl : public Decoder {
length_));
}
auto byte_length = type_->byte_width();
ARROW_ASSIGN_OR_RAISE(
auto buf, infile_->ReadAt(position_ + start * byte_length, read_length * byte_length));
return std::make_shared<ArrayType>(type_, read_length, buf);
ARROW_ASSIGN_OR_RAISE(auto buf,
infile_->ReadAt(position_ + start * byte_length, len * byte_length));
return std::make_shared<ArrayType>(type_, len, buf);
}

::arrow::Result<std::shared_ptr<::arrow::Array>> Take(
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/lance/io/exec/scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ ::arrow::Result<ScanBatch> Scan::Next() {
}
}
}
if (batch_id >= reader_->metadata().num_batches()) {
// EOF
return ScanBatch();
}

ARROW_ASSIGN_OR_RAISE(auto batch, reader_->ReadBatch(*schema_, batch_id, offset, batch_size_));
return ScanBatch{
Expand Down
25 changes: 21 additions & 4 deletions cpp/src/lance/io/exec/scan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,31 @@ TEST_CASE("Test Scan::Next") {
CHECK(tab->column(0)->num_chunks() == 2);
auto reader = MakeReader(tab).ValueOrDie();

auto scan = Scan::Make(reader, std::make_shared<Schema>(reader->schema()), 8).ValueOrDie();
const int32_t kBatchSize = 8;
auto scan =
Scan::Make(reader, std::make_shared<Schema>(reader->schema()), kBatchSize).ValueOrDie();
auto batch = scan->Next().ValueOrDie();
CHECK(batch.batch_id == 0);
CHECK(batch.batch->num_rows() == 8);
CHECK(batch.batch->num_rows() == kBatchSize);
batch = scan->Next().ValueOrDie();
CHECK(batch.batch_id == 0);
CHECK(batch.batch->num_rows() == 8);
CHECK(batch.batch->num_rows() == kBatchSize);
batch = scan->Next().ValueOrDie();
CHECK(batch.batch_id == 0);
CHECK(batch.batch->num_rows() == 4);
CHECK(batch.batch->num_rows() == 20 - 2 * kBatchSize);

// Second batch
batch = scan->Next().ValueOrDie();
CHECK(batch.batch_id == 1);
CHECK(batch.batch->num_rows() == kBatchSize);
batch = scan->Next().ValueOrDie();
CHECK(batch.batch_id == 1);
CHECK(batch.batch->num_rows() == kBatchSize);
batch = scan->Next().ValueOrDie();
CHECK(batch.batch_id == 1);
CHECK(batch.batch->num_rows() == 20 - 2 * kBatchSize);

// We should stop now
batch = scan->Next().ValueOrDie();
CHECK(!batch.batch);
}

0 comments on commit 2b52d6d

Please sign in to comment.