Skip to content

Commit

Permalink
Merge branch 'main' into howtos
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Sep 24, 2022
2 parents bf5df25 + fe90201 commit ab2918c
Show file tree
Hide file tree
Showing 21 changed files with 378 additions and 150 deletions.
66 changes: 66 additions & 0 deletions cpp/src/lance/arrow/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@

#include "lance/arrow/stl.h"
#include "lance/arrow/type.h"
#include "lance/arrow/utils.h"
#include "lance/format/schema.h"
#include "lance/testing/extension_types.h"
#include "lance/testing/io.h"

using lance::arrow::ToArray;

auto nested_schema = ::arrow::schema({::arrow::field("pk", ::arrow::int32()),
::arrow::field("objects",
Expand Down Expand Up @@ -202,4 +206,66 @@ TEST_CASE("Test ScanBatchesAsync with batch size") {
CHECK(batch.record_batch->num_rows() == kBatchSize);
}
CHECK(num_batches == kTotalValues / kBatchSize);
}

// GH-188
TEST_CASE("Filter over empty list") {
auto values_arr = ToArray({1, 2, 3}).ValueOrDie();

auto elem_builder = std::make_shared<::arrow::FloatBuilder>();
auto list_builder = ::arrow::ListBuilder(::arrow::default_memory_pool(), elem_builder);
CHECK(list_builder.Append().ok());
CHECK(elem_builder->AppendValues({0.1, 0.2}).ok());
CHECK(list_builder.AppendNull().ok());
CHECK(list_builder.Append().ok());
CHECK(elem_builder->Append(11.1).ok());
auto list_arr = list_builder.Finish().ValueOrDie();

auto schema = ::arrow::schema({::arrow::field("ints", ::arrow::int32()),
::arrow::field("floats", ::arrow::list(::arrow::float32()))});
auto t = ::arrow::Table::Make(schema, {values_arr, list_arr});

auto dataset = lance::testing::MakeDataset(t).ValueOrDie();
auto scan_builder = dataset->NewScan().ValueOrDie();

// This filter should result in an empty list array
CHECK(scan_builder
->Filter(::arrow::compute::equal(::arrow::compute::field_ref("ints"),
::arrow::compute::literal(100)))
.ok());
auto scanner = scan_builder->Finish().ValueOrDie();

auto actual = scanner->ToTable().ValueOrDie();
CHECK(actual->num_rows() == 0);
CHECK(t->schema()->Equals(actual->schema()));
}

TEST_CASE("Filter with limit") {
auto values_arr = ToArray({1, 2, 3}).ValueOrDie();

auto elem_builder = std::make_shared<::arrow::FloatBuilder>();
auto list_builder = ::arrow::ListBuilder(::arrow::default_memory_pool(), elem_builder);
CHECK(list_builder.Append().ok());
CHECK(elem_builder->AppendValues({0.1, 0.2}).ok());
CHECK(list_builder.AppendNull().ok());
CHECK(list_builder.Append().ok());
CHECK(elem_builder->Append(11.1).ok());
auto list_arr = list_builder.Finish().ValueOrDie();

auto schema = ::arrow::schema({::arrow::field("ints", ::arrow::int32()),
::arrow::field("floats", ::arrow::list(::arrow::float32()))});
auto t = ::arrow::Table::Make(schema, {values_arr, list_arr});
auto dataset = lance::testing::MakeDataset(t).ValueOrDie();
auto scan_builder = lance::arrow::ScannerBuilder(dataset);
CHECK(scan_builder
.Filter(::arrow::compute::equal(::arrow::compute::field_ref("ints"),
::arrow::compute::literal(100)))
.ok());
CHECK(scan_builder.Limit(20).ok());

auto scanner = scan_builder.Finish().ValueOrDie();

auto actual = scanner->ToTable().ValueOrDie();
CHECK(actual->num_rows() == 0);
CHECK(t->schema()->Equals(actual->schema()));
}
12 changes: 8 additions & 4 deletions cpp/src/lance/encodings/plain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,15 @@ class PlainDecoderImpl : public Decoder {
return Decoder::Take(indices);
}

if (indices->length() == 0) {
return MakeEmpty();
}

int32_t start = indices->Value(0);
int32_t length = indices->Value(indices->length() - 1) - start + 1;
if (indices->length() == 0 || start < 0 || start + length > length_) {
return ::arrow::Status::Invalid("PlainDecoder::Take: Indices array is not valid");
return ::arrow::Status::Invalid(fmt::format(
"PlainDecoder::Take: Indices array is not valid: start={}, length={}", start, length));
}
// For the simplicity, we read all data in batch to reduce random I/O.
// And apply indices later.
Expand Down Expand Up @@ -187,8 +192,7 @@ class PlainDecoderImpl : public Decoder {
}

::arrow::Result<std::shared_ptr<::arrow::Array>> MakeEmpty() const {
ARROW_ASSIGN_OR_RAISE(auto buffer, ::arrow::AllocateBuffer(0));
return std::make_shared<ArrayType>(type_, 0, std::move(buffer));
return ::arrow::MakeEmptyArray(type_, pool_);
}

private:
Expand Down Expand Up @@ -265,7 +269,7 @@ class FixedSizeListPlainDecoderImpl : public Decoder {
PlainDecoder::~PlainDecoder() {}

::arrow::Status PlainDecoder::Init() {
assert (!arrow::is_extension(type_));
assert(!arrow::is_extension(type_));
switch (type_->id()) {
case ::arrow::Type::BOOL:
impl_.reset(new BooleanPlainDecoderImpl(infile_, type_));
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/lance/format/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ ::arrow::Result<std::shared_ptr<Schema>> Schema::Project(

auto actual_field = GetField(components[0]);
if (!actual_field) {
return ::arrow::Status::Invalid("Field {} dose not exist.", name);
continue;
}
auto view_field = view->GetField(components[0]);
if (!view_field) {
Expand All @@ -495,7 +495,7 @@ ::arrow::Result<std::shared_ptr<Schema>> Schema::Project(
for (auto& arrow_field : arrow_schema.fields()) {
auto field = GetField(arrow_field->name());
if (!field) {
return ::arrow::Status::Invalid(fmt::format("Field {} dose not exist", arrow_field->name()));
continue;
}
auto proj_field = field->Project(arrow_field);
projection->AddField(proj_field);
Expand Down
1 change: 1 addition & 0 deletions cpp/src/lance/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ target_include_directories(io SYSTEM PRIVATE ${Protobuf_INCLUDE_DIR})
add_dependencies(io format lance_io_exec)

add_lance_test(reader_test)
add_lance_test(record_batch_reader_test)
29 changes: 21 additions & 8 deletions cpp/src/lance/io/exec/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,33 @@

#include "lance/io/exec/base.h"

#include <arrow/array.h>

#include <memory>

namespace lance::io::exec {

ScanBatch ScanBatch::Null() { return ScanBatch(nullptr, -1); }

ScanBatch::ScanBatch(std::shared_ptr<::arrow::RecordBatch> records, int32_t bid)
: batch(records), batch_id(bid) {}
ScanBatch::ScanBatch(std::shared_ptr<::arrow::RecordBatch> records,
int32_t bid,
std::shared_ptr<::arrow::Int32Array> idx)
: batch(std::move(records)), batch_id(bid), indices(std::move(idx)) {}

ScanBatch ScanBatch::Slice(int64_t offset, int64_t length) const {
auto sliced_batch = batch->Slice(offset, length);
decltype(indices) sliced_indices;
if (indices) {
sliced_indices = std::dynamic_pointer_cast<::arrow::Int32Array>(indices->Slice(offset, length));
}
return ScanBatch(sliced_batch, batch_id, sliced_indices);
}

ScanBatch ScanBatch::Filtered(std::shared_ptr<::arrow::RecordBatch> records,
int32_t batch_id,
std::shared_ptr<::arrow::Int32Array> indices) {
auto batch = ScanBatch(records, batch_id);
batch.indices = std::move(indices);
return batch;
int64_t ScanBatch::length() const {
if (!batch) {
return 0;
}
return batch->num_rows();
}

} // namespace lance::io::exec
19 changes: 10 additions & 9 deletions cpp/src/lance/io/exec/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,26 @@ struct ScanBatch {
/// Return a null ScanBatch indicates EOF.
static ScanBatch Null();

/// Constructor with a record batch and batch id.
///
/// \param records A record batch of values to return
/// \param batch_id the id of the batch
static ScanBatch Filtered(std::shared_ptr<::arrow::RecordBatch> records,
int32_t batch_id,
std::shared_ptr<::arrow::Int32Array> indices);

/// Construct an empty response.
ScanBatch() = default;

/// Constructor with a record batch and batch id.
///
/// \param records A record batch of values to return
/// \param batch_id the id of the batch
ScanBatch(std::shared_ptr<::arrow::RecordBatch> records, int32_t batch_id);
/// \param indices the indices from filter. Optional
ScanBatch(std::shared_ptr<::arrow::RecordBatch> records,
int32_t batch_id,
std::shared_ptr<::arrow::Int32Array> indices = nullptr);

/// Returns True if the end of file is reached.
bool eof() const { return !batch; }

/// Make a zero-copy slice from this batch.
ScanBatch Slice(int64_t offset, int64_t length) const;

/// The length of this batch.
int64_t length() const;
};

/// I/O execute base node.
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/lance/io/exec/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ ::arrow::Result<ScanBatch> Filter::Next() {
if (batch.eof()) {
return ScanBatch::Null();
}
if (batch.batch->num_rows() == 0) {
if (batch.length() == 0) {
return batch;
}
ARROW_ASSIGN_OR_RAISE(auto indices_and_values, Apply(*batch.batch));
auto [indices, values] = indices_and_values;
ARROW_ASSIGN_OR_RAISE(auto values_arr, values->ToStructArray());
return ScanBatch::Filtered(values, batch.batch_id, indices);
return ScanBatch(values, batch.batch_id, indices);
}

::arrow::Result<
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/lance/io/exec/limit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "lance/io/exec/limit.h"

#include <arrow/array/util.h>
#include <arrow/record_batch.h>
#include <fmt/format.h>

Expand Down Expand Up @@ -51,18 +52,18 @@ ::arrow::Result<ScanBatch> Limit::Next() {
return batch;
}
// Find intersection of two ranges (offset, offset + limit) and (seen, seen + batch_size).
auto batch_size = batch.batch->num_rows();
auto batch_size = batch.length();
auto left = std::max(offset_, seen_);
auto right = std::min(seen_ + batch_size, offset_ + limit_);
std::shared_ptr<::arrow::RecordBatch> record_batch;
ScanBatch limited_batch;
if (left < right) {
record_batch = batch.batch->Slice(left - seen_, right - left);
limited_batch = batch.Slice(left - seen_, right - left);
} else {
/// No intersection, skip the whole batch.
ARROW_ASSIGN_OR_RAISE(record_batch, ::arrow::RecordBatch::MakeEmpty(batch.batch->schema()));
limited_batch = batch.Slice(0, 0);
}
seen_ += batch_size;
return ScanBatch{record_batch, batch.batch_id};
seen_ += batch.length();
return limited_batch;
}

std::string Limit::ToString() const {
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/lance/io/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "lance/io/reader.h"

#include <arrow/array/concatenate.h>
#include <arrow/array/util.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/table.h>
Expand All @@ -27,6 +28,7 @@
#include <memory>

#include "lance/arrow/type.h"
#include "lance/arrow/utils.h"
#include "lance/encodings/binary.h"
#include "lance/encodings/plain.h"
#include "lance/format/format.h"
Expand Down Expand Up @@ -353,8 +355,7 @@ ::arrow::Result<std::shared_ptr<::arrow::Array>> FileReader::GetListArray(
// TODO: GH-39. We should improve the read behavior to use indices to save some I/Os.
auto& indices = params.indices.value();
if (indices->length() == 0) {
return ::arrow::Status::IndexError(fmt::format(
"FileReader::GetListArray: indices is empty: field={}({})", field->name(), field->id()));
return ::arrow::MakeEmptyArray(field->type());
}
auto start = static_cast<int32_t>(indices->Value(0));
auto length = static_cast<int32_t>(indices->Value(indices->length() - 1) - start + 1);
Expand Down
28 changes: 28 additions & 0 deletions cpp/src/lance/io/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "lance/io/reader.h"

#include <arrow/builder.h>
#include <arrow/compute/api.h>
#include <arrow/io/api.h>
#include <arrow/table.h>
#include <fmt/format.h>
Expand All @@ -24,6 +25,7 @@

#include "lance/arrow/stl.h"
#include "lance/arrow/writer.h"
#include "lance/testing/io.h"

TEST_CASE("Test List Array With Nulls") {
auto int_builder = std::make_shared<::arrow::Int32Builder>();
Expand Down Expand Up @@ -97,4 +99,30 @@ TEST_CASE("Get List Array With Indices") {
.ValueOrDie();
CHECK(batch->Equals(*expected_table->CombineChunksToBatch().ValueOrDie()));
}
}

TEST_CASE("Filter over dictionary base") {
auto indices = lance::arrow::ToArray<int8_t>({0, 1, 1, 2}).ValueOrDie();
auto dict = lance::arrow::ToArray({"car", "horse", "plane", "bike", "cat"}).ValueOrDie();
auto dict_arr =
::arrow::DictionaryArray::FromArrays(::arrow::dictionary(::arrow::int8(), ::arrow::utf8()),
std::static_pointer_cast<::arrow::Array>(indices),
std::static_pointer_cast<::arrow::Array>(dict))
.ValueOrDie();
auto value_arr = lance::arrow::ToArray({1, 2, 3, 4, 5}).ValueOrDie();

auto schema = ::arrow::schema(
{::arrow::field("category", ::arrow::dictionary(::arrow::int8(), ::arrow::utf8())),
::arrow::field("value", ::arrow::int32())});
auto tab = ::arrow::Table::Make(schema, {dict_arr, value_arr});

auto dataset = lance::testing::MakeDataset(tab).ValueOrDie();
auto scan_builder = dataset->NewScan().ValueOrDie();
CHECK(scan_builder
->Filter(::arrow::compute::equal(::arrow::compute::field_ref("category"),
::arrow::compute::literal("bike")))
.ok());
auto scanner = scan_builder->Finish().ValueOrDie();
auto actual = scanner->ToTable().ValueOrDie();
CHECK(actual->num_rows() == 0);
}
56 changes: 56 additions & 0 deletions cpp/src/lance/io/record_batch_reader_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <arrow/compute/api.h>
#include <arrow/table.h>
#include <arrow/type.h>
#include <fmt/format.h>

#include <catch2/catch_test_macros.hpp>

#include "lance/arrow/stl.h"
#include "lance/testing/io.h"

using lance::arrow::ToArray;

TEST_CASE("Scan partitioned dataset") {
auto value_arr = ToArray({1, 2, 3, 4, 5}).ValueOrDie();
auto split_arr = ToArray({"train", "train", "eval", "test", "train"}).ValueOrDie();

auto schema = ::arrow::schema(
{::arrow::field("value", ::arrow::int32()), ::arrow::field("split", ::arrow::utf8())});
auto t = ::arrow::Table::Make(schema, {value_arr, split_arr});

auto dataset = lance::testing::MakeDataset(t, {"split"}).ValueOrDie();
auto scanner = dataset->NewScan().ValueOrDie()->Finish().ValueOrDie();
auto actual = scanner->ToTable().ValueOrDie()->CombineChunks().ValueOrDie();
auto indices = ::arrow::compute::SortIndices(*actual->GetColumnByName("value")).ValueOrDie();
auto new_datum = ::arrow::compute::Take(actual, indices).ValueOrDie();
auto sorted_table = new_datum.table();
INFO("Expected table: " << t->ToString() << " \nActual table: " << sorted_table->ToString());
CHECK(t->Equals(*sorted_table));
}

TEST_CASE("Scan partitioned dataset with nonexistent column") {
auto value_arr = ToArray({1, 2, 3, 4, 5}).ValueOrDie();
auto split_arr = ToArray({"train", "train", "eval", "test", "train"}).ValueOrDie();

auto schema = ::arrow::schema(
{::arrow::field("value", ::arrow::int32()), ::arrow::field("split", ::arrow::utf8())});
auto t = ::arrow::Table::Make(schema, {value_arr, split_arr});
auto dataset = lance::testing::MakeDataset(t, {"split"}).ValueOrDie();
auto scan_builder = dataset->NewScan().ValueOrDie();
// Woo column does not exist in the dataset, split column does not exist in the lance file.
CHECK(!scan_builder->Project({"value", "split", "woo"}).ok());
}
Loading

0 comments on commit ab2918c

Please sign in to comment.