diff --git a/cpp/src/lance/encodings/plain.cc b/cpp/src/lance/encodings/plain.cc index e804b41125..bc19a2a216 100644 --- a/cpp/src/lance/encodings/plain.cc +++ b/cpp/src/lance/encodings/plain.cc @@ -125,8 +125,8 @@ class PlainDecoderImpl : public Decoder { ::arrow::Result> ToArray( int32_t start, std::optional length) const override { - auto read_length = std::min(length.value_or(length_), length_ - start); - if (read_length <= 0) { + auto len = std::min(length.value_or(length_), length_ - start); + if (len <= 0) { return ::arrow::Status::IndexError( fmt::format("{}::ToArray: out of range: start={}, length={}, page_length={}\n", ToString(), @@ -135,9 +135,9 @@ class PlainDecoderImpl : public Decoder { length_)); } auto byte_length = type_->byte_width(); - ARROW_ASSIGN_OR_RAISE( - auto buf, infile_->ReadAt(position_ + start * byte_length, read_length * byte_length)); - return std::make_shared(type_, read_length, buf); + ARROW_ASSIGN_OR_RAISE(auto buf, + infile_->ReadAt(position_ + start * byte_length, len * byte_length)); + return std::make_shared(type_, len, buf); } ::arrow::Result> Take( diff --git a/cpp/src/lance/io/exec/scan.cc b/cpp/src/lance/io/exec/scan.cc index 318fc51a2d..123bf57809 100644 --- a/cpp/src/lance/io/exec/scan.cc +++ b/cpp/src/lance/io/exec/scan.cc @@ -54,6 +54,10 @@ ::arrow::Result Scan::Next() { } } } + if (batch_id >= reader_->metadata().num_batches()) { + // EOF + return ScanBatch(); + } ARROW_ASSIGN_OR_RAISE(auto batch, reader_->ReadBatch(*schema_, batch_id, offset, batch_size_)); return ScanBatch{ diff --git a/cpp/src/lance/io/exec/scan_test.cc b/cpp/src/lance/io/exec/scan_test.cc index a244467921..78cb997700 100644 --- a/cpp/src/lance/io/exec/scan_test.cc +++ b/cpp/src/lance/io/exec/scan_test.cc @@ -43,14 +43,31 @@ TEST_CASE("Test Scan::Next") { CHECK(tab->column(0)->num_chunks() == 2); auto reader = MakeReader(tab).ValueOrDie(); - auto scan = Scan::Make(reader, std::make_shared(reader->schema()), 8).ValueOrDie(); + const int32_t kBatchSize = 8; + auto scan = + Scan::Make(reader, std::make_shared(reader->schema()), kBatchSize).ValueOrDie(); auto batch = scan->Next().ValueOrDie(); CHECK(batch.batch_id == 0); - CHECK(batch.batch->num_rows() == 8); + CHECK(batch.batch->num_rows() == kBatchSize); batch = scan->Next().ValueOrDie(); CHECK(batch.batch_id == 0); - CHECK(batch.batch->num_rows() == 8); + CHECK(batch.batch->num_rows() == kBatchSize); batch = scan->Next().ValueOrDie(); CHECK(batch.batch_id == 0); - CHECK(batch.batch->num_rows() == 4); + CHECK(batch.batch->num_rows() == 20 - 2 * kBatchSize); + + // Second batch + batch = scan->Next().ValueOrDie(); + CHECK(batch.batch_id == 1); + CHECK(batch.batch->num_rows() == kBatchSize); + batch = scan->Next().ValueOrDie(); + CHECK(batch.batch_id == 1); + CHECK(batch.batch->num_rows() == kBatchSize); + batch = scan->Next().ValueOrDie(); + CHECK(batch.batch_id == 1); + CHECK(batch.batch->num_rows() == 20 - 2 * kBatchSize); + + // We should stop now + batch = scan->Next().ValueOrDie(); + CHECK(!batch.batch); } \ No newline at end of file