From fdc17d5ef67ae6a077ede2d804a32fe98859fc72 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 20 Aug 2022 11:40:41 -0700 Subject: [PATCH 1/6] write full dataset --- cpp/src/lance/arrow/stl.h | 2 +- cpp/src/lance/arrow/writer_test.cc | 9 +++- cpp/src/lance/encodings/plain.cc | 74 +++++++++++++++++++++++++-- cpp/src/lance/encodings/plain_test.cc | 32 ++++++++++++ 4 files changed, 112 insertions(+), 5 deletions(-) diff --git a/cpp/src/lance/arrow/stl.h b/cpp/src/lance/arrow/stl.h index 1e184e93ff..125092cb70 100644 --- a/cpp/src/lance/arrow/stl.h +++ b/cpp/src/lance/arrow/stl.h @@ -37,7 +37,7 @@ ToArray(const std::vector& vec, ::arrow::MemoryPool* pool = ::arrow::default_ using ArrayType = typename ::arrow::TypeTraits::ArrayType; typename ::arrow::TypeTraits::BuilderType builder(pool); ARROW_RETURN_NOT_OK(builder.Reserve(vec.size())); - for (auto& v : vec) { + for (const auto& v : vec) { ARROW_RETURN_NOT_OK(builder.Append(v)); } auto result = builder.Finish(); diff --git a/cpp/src/lance/arrow/writer_test.cc b/cpp/src/lance/arrow/writer_test.cc index 263855a145..46065a127c 100644 --- a/cpp/src/lance/arrow/writer_test.cc +++ b/cpp/src/lance/arrow/writer_test.cc @@ -32,6 +32,7 @@ #include "lance/io/reader.h" using arrow::ArrayBuilder; +using arrow::BooleanBuilder; using arrow::Int32Builder; using arrow::LargeBinaryBuilder; using arrow::ListBuilder; @@ -58,11 +59,13 @@ auto CocoDataset() { auto schema = arrow::schema({arrow::field("filename", arrow::utf8()), arrow::field("split", arrow::utf8()), arrow::field("width", arrow::int32()), + arrow::field("valid", arrow::boolean()), arrow::field("image", image_type), arrow::field("annotations", arrow::list(annotationType))}); StringBuilder stringBuilder; Int32Builder intBuilder; + BooleanBuilder booleanBuilder; CHECK(stringBuilder.AppendValues({"1.jpg", "2.jpg", "3.jpg", "4.jpg"}).ok()); auto filenameArr = stringBuilder.Finish().ValueOrDie(); @@ -76,6 +79,10 @@ auto CocoDataset() { auto width = intBuilder.Finish().ValueOrDie(); intBuilder.Reset(); + CHECK(booleanBuilder.AppendValues(std::vector({true, true, false, true})).ok()); + auto valid = booleanBuilder.Finish().ValueOrDie(); + booleanBuilder.Reset(); + auto uriBuilder = std::make_shared(); auto imageBuilder = std::make_shared( image_type, arrow::default_memory_pool(), vector>({uriBuilder})); @@ -115,7 +122,7 @@ auto CocoDataset() { auto annotations = listBuilder.Finish().ValueOrDie(); listBuilder.Reset(); - auto table = arrow::Table::Make(schema, {filenameArr, split, width, images, annotations}); + auto table = arrow::Table::Make(schema, {filenameArr, split, width, valid, images, annotations}); return table; } diff --git a/cpp/src/lance/encodings/plain.cc b/cpp/src/lance/encodings/plain.cc index f16481fac4..41a19de066 100644 --- a/cpp/src/lance/encodings/plain.cc +++ b/cpp/src/lance/encodings/plain.cc @@ -32,18 +32,37 @@ namespace lance::encodings { PlainEncoder::PlainEncoder(std::shared_ptr<::arrow::io::OutputStream> out) : Encoder(out) {} +::arrow::Status WriteBooleanArray(const std::shared_ptr<::arrow::io::OutputStream>& out, + const std::shared_ptr<::arrow::BooleanArray>& arr) { + // TODO: Boolean array is not necessarily aligned with the byte boundary: + // See ::arrow::BooleanArray::Value(int64) + // Is there a faster way to write / shift the boolean array? + ::arrow::BooleanBuilder builder; + ARROW_RETURN_NOT_OK(builder.Reserve(arr->length())); + for (int i = 0; i < arr->length(); i++) { + ARROW_RETURN_NOT_OK(builder.Append(arr->Value(i))); + } + ARROW_ASSIGN_OR_RAISE(auto written_arr, builder.Finish()); + return out->Write(std::dynamic_pointer_cast<::arrow::BooleanArray>(written_arr)->values()); +} + ::arrow::Result PlainEncoder::Write(std::shared_ptr<::arrow::Array> arr) { auto data_type = arr->type(); ARROW_ASSIGN_OR_RAISE(auto value_offset, out_->Tell()); // TODO: support more types. switch (data_type->id()) { + case ::arrow::Type::BOOL: + ARROW_RETURN_NOT_OK( + WriteBooleanArray(out_, std::dynamic_pointer_cast<::arrow::BooleanArray>(arr))); + break; case ::arrow::Type::INT8: ARROW_RETURN_NOT_OK(out_->Write(std::static_pointer_cast<::arrow::Int8Array>(arr)->values())); break; case ::arrow::Type::UINT8: - ARROW_RETURN_NOT_OK( - out_->Write(std::static_pointer_cast<::arrow::UInt8Array>(arr)->values())); + ARROW_RETURN_NOT_OK(out_->Write( + std::static_pointer_cast<::arrow::UInt8Array>(arr)->raw_values() + arr->offset(), + arr->length())); break; case ::arrow::Type::INT16: ARROW_RETURN_NOT_OK( @@ -86,6 +105,55 @@ ::arrow::Result PlainEncoder::Write(std::shared_ptr<::arrow::Array> arr namespace { +class BooleanPlainDecoderImpl : public Decoder { + public: + using Decoder::Decoder; + + /// Get one single scalar value from the column. + ::arrow::Result> GetScalar(int64_t idx) const override { + int64_t offset = idx / 8; + uint8_t byte; + ARROW_RETURN_NOT_OK(infile_->ReadAt(position_ + offset, 1, &byte)); + return std::make_shared<::arrow::BooleanScalar>( + ::arrow::bit_util::GetBitFromByte(byte, idx % 8)); + } + + ::arrow::Result> ToArray( + int32_t start, std::optional length) const override { + if (!length.has_value()) { + length = length_ - start; + } + if (start + length.value() > length_ || start > length_) { + return ::arrow::Status::IndexError( + fmt::format("PlainDecoder::ToArray: out of range: start={}, length={}, page_length={}\n", + start, + length.value(), + length_)); + } + int64_t byte_length = length.value() / 8 + 1; + ARROW_ASSIGN_OR_RAISE(auto buf, infile_->ReadAt(position_ + start / 8, byte_length)); + return std::make_shared<::arrow::BooleanArray>(length.value(), buf); + } + + ::arrow::Result> Take( + std::shared_ptr<::arrow::Int32Array> indices) const override { + int32_t start = indices->Value(0); + int32_t length = indices->Value(indices->length() - 1) - start + 1; + if (indices->length() == 0 || start < 0 || start + length > length_) { + return ::arrow::Status::Invalid("PlainDecoder::Take: Indices array is not valid"); + } + ARROW_ASSIGN_OR_RAISE(auto raw_value_arr, ToArray(start, length)); + auto values = std::static_pointer_cast<::arrow::BooleanArray>(raw_value_arr); + ::arrow::BooleanBuilder builder; + ARROW_RETURN_NOT_OK(builder.Reserve(indices->length())); + for (int64_t i = 0; i < indices->length(); i++) { + auto index = indices->Value(i); + ARROW_RETURN_NOT_OK(builder.Append(values->Value(index - start))); + } + return builder.Finish(); + } +}; + template class PlainDecoderImpl : public Decoder { public: @@ -156,7 +224,7 @@ PlainDecoder::~PlainDecoder() {} ::arrow::Status PlainDecoder::Init() { switch (type_->id()) { case ::arrow::Type::BOOL: - impl_.reset(new PlainDecoderImpl<::arrow::BooleanType>(infile_, type_)); + impl_.reset(new BooleanPlainDecoderImpl(infile_, type_)); break; case ::arrow::Type::INT8: impl_.reset(new PlainDecoderImpl<::arrow::Int8Type>(infile_, type_)); diff --git a/cpp/src/lance/encodings/plain_test.cc b/cpp/src/lance/encodings/plain_test.cc index 57510a5557..980376b731 100644 --- a/cpp/src/lance/encodings/plain_test.cc +++ b/cpp/src/lance/encodings/plain_test.cc @@ -23,6 +23,7 @@ #include "lance/arrow/stl.h" using arrow::Int32Builder; +using lance::arrow::ToArray; TEST_CASE("Test Write Int32 array") { auto arr = lance::arrow::ToArray({1, 2, 3, 4, 5, 6, 7, 8}).ValueOrDie(); @@ -66,3 +67,34 @@ TEST_CASE("Test take plain values") { INFO("Indices " << indices->ToString() << " Actual " << actual->ToString()); CHECK(actual->Equals(indices)); } + +TEST_CASE("Write boolean array") { + arrow::BooleanBuilder builder; + for (int i = 0; i < 10; i++) { + CHECK(builder.Append(i % 3 == 0).ok()); + } + auto arr = builder.Finish().ValueOrDie(); + + auto sink = arrow::io::BufferOutputStream::Create().ValueOrDie(); + lance::encodings::PlainEncoder encoder(sink); + auto offset = encoder.Write(arr).ValueOrDie(); + + auto infile = make_shared(sink->Finish().ValueOrDie()); + lance::encodings::PlainDecoder decoder(infile, arr->type()); + CHECK(decoder.Init().ok()); + decoder.Reset(offset, arr->length()); + + auto actual = decoder.ToArray().ValueOrDie(); + CHECK(arr->Equals(actual)); + + for (int i = 0; i < arr->length(); i++) { + INFO("Expected: " << arr->GetScalar(i).ValueOrDie()->ToString() + << " Got: " << decoder.GetScalar(i).ValueOrDie()->ToString()); + CHECK(arr->GetScalar(i).ValueOrDie()->Equals(decoder.GetScalar(i).ValueOrDie())); + } + + auto indices = ToArray({0, 3, 6}).ValueOrDie(); + CHECK(lance::arrow::ToArray({true, true, true}) + .ValueOrDie() + ->Equals(decoder.Take(indices).ValueOrDie())); +} \ No newline at end of file From 397193f41a3db86dac8d0b65b10c1b8f0fabe20d Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 20 Aug 2022 11:47:16 -0700 Subject: [PATCH 2/6] simplify --- cpp/src/lance/encodings/plain.cc | 78 +++++++++++++------------------- 1 file changed, 31 insertions(+), 47 deletions(-) diff --git a/cpp/src/lance/encodings/plain.cc b/cpp/src/lance/encodings/plain.cc index 41a19de066..6e56f9b1a1 100644 --- a/cpp/src/lance/encodings/plain.cc +++ b/cpp/src/lance/encodings/plain.cc @@ -105,54 +105,7 @@ ::arrow::Result PlainEncoder::Write(std::shared_ptr<::arrow::Array> arr namespace { -class BooleanPlainDecoderImpl : public Decoder { - public: - using Decoder::Decoder; - /// Get one single scalar value from the column. - ::arrow::Result> GetScalar(int64_t idx) const override { - int64_t offset = idx / 8; - uint8_t byte; - ARROW_RETURN_NOT_OK(infile_->ReadAt(position_ + offset, 1, &byte)); - return std::make_shared<::arrow::BooleanScalar>( - ::arrow::bit_util::GetBitFromByte(byte, idx % 8)); - } - - ::arrow::Result> ToArray( - int32_t start, std::optional length) const override { - if (!length.has_value()) { - length = length_ - start; - } - if (start + length.value() > length_ || start > length_) { - return ::arrow::Status::IndexError( - fmt::format("PlainDecoder::ToArray: out of range: start={}, length={}, page_length={}\n", - start, - length.value(), - length_)); - } - int64_t byte_length = length.value() / 8 + 1; - ARROW_ASSIGN_OR_RAISE(auto buf, infile_->ReadAt(position_ + start / 8, byte_length)); - return std::make_shared<::arrow::BooleanArray>(length.value(), buf); - } - - ::arrow::Result> Take( - std::shared_ptr<::arrow::Int32Array> indices) const override { - int32_t start = indices->Value(0); - int32_t length = indices->Value(indices->length() - 1) - start + 1; - if (indices->length() == 0 || start < 0 || start + length > length_) { - return ::arrow::Status::Invalid("PlainDecoder::Take: Indices array is not valid"); - } - ARROW_ASSIGN_OR_RAISE(auto raw_value_arr, ToArray(start, length)); - auto values = std::static_pointer_cast<::arrow::BooleanArray>(raw_value_arr); - ::arrow::BooleanBuilder builder; - ARROW_RETURN_NOT_OK(builder.Reserve(indices->length())); - for (int64_t i = 0; i < indices->length(); i++) { - auto index = indices->Value(i); - ARROW_RETURN_NOT_OK(builder.Append(values->Value(index - start))); - } - return builder.Finish(); - } -}; template class PlainDecoderImpl : public Decoder { @@ -213,6 +166,37 @@ class PlainDecoderImpl : public Decoder { using BuilderType = typename ::arrow::TypeTraits::BuilderType; }; +class BooleanPlainDecoderImpl : public PlainDecoderImpl<::arrow::BooleanType> { + public: + using PlainDecoderImpl::PlainDecoderImpl; + + /// Get one single scalar value from the column. + ::arrow::Result> GetScalar(int64_t idx) const override { + int64_t offset = idx / 8; + uint8_t byte; + ARROW_RETURN_NOT_OK(infile_->ReadAt(position_ + offset, 1, &byte)); + return std::make_shared<::arrow::BooleanScalar>( + ::arrow::bit_util::GetBitFromByte(byte, idx % 8)); + } + + ::arrow::Result> ToArray( + int32_t start, std::optional length) const override { + if (!length.has_value()) { + length = length_ - start; + } + if (start + length.value() > length_ || start > length_) { + return ::arrow::Status::IndexError( + fmt::format("PlainDecoder::ToArray: out of range: start={}, length={}, page_length={}\n", + start, + length.value(), + length_)); + } + int64_t byte_length = length.value() / 8 + 1; + ARROW_ASSIGN_OR_RAISE(auto buf, infile_->ReadAt(position_ + start / 8, byte_length)); + return std::make_shared<::arrow::BooleanArray>(length.value(), buf); + } +}; + } // namespace PlainDecoder::PlainDecoder(std::shared_ptr<::arrow::io::RandomAccessFile> infile, From 2247495080d2564b3e92c9a8fc698562a6ec735b Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 20 Aug 2022 11:49:00 -0700 Subject: [PATCH 3/6] revert unnecesary change --- cpp/src/lance/encodings/plain.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/lance/encodings/plain.cc b/cpp/src/lance/encodings/plain.cc index 6e56f9b1a1..34935a72f4 100644 --- a/cpp/src/lance/encodings/plain.cc +++ b/cpp/src/lance/encodings/plain.cc @@ -60,9 +60,8 @@ ::arrow::Result PlainEncoder::Write(std::shared_ptr<::arrow::Array> arr ARROW_RETURN_NOT_OK(out_->Write(std::static_pointer_cast<::arrow::Int8Array>(arr)->values())); break; case ::arrow::Type::UINT8: - ARROW_RETURN_NOT_OK(out_->Write( - std::static_pointer_cast<::arrow::UInt8Array>(arr)->raw_values() + arr->offset(), - arr->length())); + ARROW_RETURN_NOT_OK( + out_->Write(std::static_pointer_cast<::arrow::UInt8Array>(arr)->values())); break; case ::arrow::Type::INT16: ARROW_RETURN_NOT_OK( From 5f8d5f8a312e53da5cbad79ec5d2eb4de56d6325 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 20 Aug 2022 11:49:29 -0700 Subject: [PATCH 4/6] reduce changes --- cpp/src/lance/encodings/plain.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/src/lance/encodings/plain.cc b/cpp/src/lance/encodings/plain.cc index 34935a72f4..8a56a7b9aa 100644 --- a/cpp/src/lance/encodings/plain.cc +++ b/cpp/src/lance/encodings/plain.cc @@ -104,8 +104,6 @@ ::arrow::Result PlainEncoder::Write(std::shared_ptr<::arrow::Array> arr namespace { - - template class PlainDecoderImpl : public Decoder { public: From ff0f50531d960e19a1d90b045ea6f74a41f1c41f Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 20 Aug 2022 19:21:50 -0700 Subject: [PATCH 5/6] do not write empty array --- cpp/src/lance/io/writer.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/src/lance/io/writer.cc b/cpp/src/lance/io/writer.cc index 960b0a0ab1..93004a4691 100644 --- a/cpp/src/lance/io/writer.cc +++ b/cpp/src/lance/io/writer.cc @@ -106,6 +106,10 @@ ::arrow::Status FileWriter::WriteArray(const std::shared_ptr& fie ::arrow::Status FileWriter::WritePrimitiveArray(const std::shared_ptr& field, const std::shared_ptr<::arrow::Array>& arr) { + if (arr->length() == 0) { + return ::arrow::Status::OK(); + } + auto field_id = field->id(); auto encoder = field->GetEncoder(destination_); auto type = field->type(); From 0a1bbece452b1116a08a70fadf31f9d43be5ee80 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 22 Aug 2022 08:56:58 -0700 Subject: [PATCH 6/6] use arrow bit utils --- cpp/src/lance/encodings/plain.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/lance/encodings/plain.cc b/cpp/src/lance/encodings/plain.cc index 8a56a7b9aa..76e708a24a 100644 --- a/cpp/src/lance/encodings/plain.cc +++ b/cpp/src/lance/encodings/plain.cc @@ -188,7 +188,7 @@ class BooleanPlainDecoderImpl : public PlainDecoderImpl<::arrow::BooleanType> { length.value(), length_)); } - int64_t byte_length = length.value() / 8 + 1; + int64_t byte_length = ::arrow::bit_util::BytesForBits(length.value()); ARROW_ASSIGN_OR_RAISE(auto buf, infile_->ReadAt(position_ + start / 8, byte_length)); return std::make_shared<::arrow::BooleanArray>(length.value(), buf); }