Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add encoding for FixedSizeBinary and FixedSizeList #129

Merged
merged 14 commits into from
Aug 26, 2022
36 changes: 36 additions & 0 deletions cpp/src/lance/arrow/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "lance/arrow/type.h"

#include <arrow/builder.h>
#include <arrow/result.h>
#include <arrow/type.h>
#include <arrow/type_traits.h>
Expand Down Expand Up @@ -181,6 +182,41 @@ ::arrow::Result<std::shared_ptr<::arrow::DataType>> FromLogicalType(
"FromLogicalType: logical_type \"{}\" is not supported yet", logical_type.to_string()));
}

::arrow::Result<std::shared_ptr<::arrow::ArrayBuilder>> GetArrayBuilder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only for numeric types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to support more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sophon is going to need some datetime fields. Should those be added here as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and also boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do

::arrow::Type::type type_id) {
switch (type_id) {
case ::arrow::Type::UINT8:
return std::make_shared<::arrow::UInt8Builder>();
case ::arrow::Type::INT8:
return std::make_shared<::arrow::Int8Builder>();
case ::arrow::Type::UINT16:
return std::make_shared<::arrow::UInt16Builder>();
case ::arrow::Type::INT16:
return std::make_shared<::arrow::Int16Builder>();
case ::arrow::Type::UINT32:
return std::make_shared<::arrow::UInt32Builder>();
case ::arrow::Type::INT32:
return std::make_shared<::arrow::Int32Builder>();
case ::arrow::Type::UINT64:
return std::make_shared<::arrow::UInt64Builder>();
case ::arrow::Type::INT64:
return std::make_shared<::arrow::Int64Builder>();
case ::arrow::Type::HALF_FLOAT:
return std::make_shared<::arrow::HalfFloatBuilder>();
case ::arrow::Type::FLOAT:
return std::make_shared<::arrow::FloatBuilder>();
case ::arrow::Type::DOUBLE:
return std::make_shared<::arrow::DoubleBuilder>();
default:
return ::arrow::Status::Invalid(
fmt::format("Unsupported GetArrayBuilder type: type_id={}", type_id));
}
}

::arrow::Result<std::shared_ptr<::arrow::ArrayBuilder>> GetArrayBuilder(
const std::shared_ptr<::arrow::DataType>& dtype) {
return GetArrayBuilder(dtype->id());
}

std::optional<std::string> GetExtensionName(std::shared_ptr<::arrow::DataType> dtype) {
if (dtype->id() == ::arrow::Type::EXTENSION) {
Expand Down
20 changes: 14 additions & 6 deletions cpp/src/lance/arrow/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <string>
#include <vector>


template <typename T>
concept HasToString = requires(T t) {
{ t.ToString() } -> std::same_as<std::string>;
Expand All @@ -42,8 +41,7 @@ struct fmt::formatter<T> : fmt::formatter<std::string_view> {
template <HasToString T>
struct fmt::formatter<std::shared_ptr<T>> : fmt::formatter<std::string_view> {
template <typename FormatContext>
auto format(const std::shared_ptr<T>& v, FormatContext& ctx)
-> decltype(ctx.out()) {
auto format(const std::shared_ptr<T>& v, FormatContext& ctx) -> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", v->ToString());
}
};
Expand All @@ -61,19 +59,29 @@ inline bool is_struct(const std::shared_ptr<::arrow::DataType>& dtype) {
}

/// Returns True if the data type is a map.
inline bool is_map(std::shared_ptr<::arrow::DataType> dtype) {
inline bool is_map(const std::shared_ptr<::arrow::DataType>& dtype) {
return dtype->id() == ::arrow::Type::MAP;
}

/// Returns True if the data type is timestamp type.
inline bool is_timestamp(std::shared_ptr<::arrow::DataType> dtype) {
inline bool is_timestamp(const std::shared_ptr<::arrow::DataType>& dtype) {
return dtype->id() == ::arrow::TimestampType::type_id;
}

inline bool is_extension(std::shared_ptr<::arrow::DataType> dtype) {
inline bool is_extension(const std::shared_ptr<::arrow::DataType>& dtype) {
return dtype->id() == ::arrow::Type::EXTENSION;
}

inline bool is_fixed_size_list(const std::shared_ptr<::arrow::DataType>& dtype) {
return dtype->id() == ::arrow::Type::FIXED_SIZE_LIST;
}

::arrow::Result<std::shared_ptr<::arrow::ArrayBuilder>> GetArrayBuilder(
const std::shared_ptr<::arrow::DataType>& dtype);

::arrow::Result<std::shared_ptr<::arrow::ArrayBuilder>> GetArrayBuilder(
::arrow::Type::type type_id);

/// Convert arrow DataType to a string representation.
::arrow::Result<std::string> ToLogicalType(std::shared_ptr<::arrow::DataType> dtype);

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/lance/encodings/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ add_library(
dictionary.cc
dictionary.h
encoder.h
fixed_size_binary.cc
fixed_size_binary.h
plain.cc
plain.h
)

target_include_directories(encodings SYSTEM PRIVATE ${Protobuf_INCLUDE_DIR})

add_lance_test(binary_test)
add_lance_test(fixed_size_binary_test)
add_lance_test(plain_test)
13 changes: 8 additions & 5 deletions cpp/src/lance/encodings/encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Encoder {
public:
Encoder(std::shared_ptr<::arrow::io::OutputStream> out) : out_(out) {}

virtual ~Encoder() = default;

/// Write an Arrow Array and returns the start offset of the column metadata.
///
/// \param arr an array to write with the encoding.
Expand All @@ -56,15 +58,14 @@ class Encoder {
class Decoder {
public:
inline Decoder(std::shared_ptr<::arrow::io::RandomAccessFile> infile,
std::shared_ptr<::arrow::DataType> type) noexcept
: infile_(infile), type_(type) {}
std::shared_ptr<::arrow::DataType> type,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) noexcept
: infile_(infile), type_(type), pool_(pool) {}

virtual ~Decoder() = default;

/// Initialize the decoder.
virtual ::arrow::Status Init() {
return ::arrow::Status::OK();
};
virtual ::arrow::Status Init() { return ::arrow::Status::OK(); };

virtual void Reset(int64_t position, int32_t length) {
position_ = position;
Expand Down Expand Up @@ -94,6 +95,8 @@ class Decoder {
std::shared_ptr<::arrow::DataType> type_;
int64_t position_ = -1;
int32_t length_ = -1;

::arrow::MemoryPool* pool_;
};

} // namespace lance::encodings
124 changes: 124 additions & 0 deletions cpp/src/lance/encodings/fixed_size_binary.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "lance/encodings/fixed_size_binary.h"

#include <arrow/builder.h>

#include "lance/arrow/type.h"
#include "lance/encodings/encoder.h"
#include "lance/encodings/plain.h"

namespace lance::encodings {

FixedSizeBinaryEncoder::FixedSizeBinaryEncoder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called FixedSizeBinaryEncoder/Decoder but it handles both FixedSizeBinary and FixedSizeList. Would it be better to call it FixedSizeEncoder/Decoder or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. So this encoding is in contrast to VarBinary, (var-sized binary).
Maybe this is another argument to support merging with PlainEncoding

const std::shared_ptr<::arrow::io::OutputStream>& out) noexcept
: Encoder(out) {}

FixedSizeBinaryEncoder::~FixedSizeBinaryEncoder() {}

::arrow::Result<int64_t> FixedSizeBinaryEncoder::Write(const std::shared_ptr<::arrow::Array>& arr) {
assert(::arrow::is_fixed_size_binary(arr->type_id()) ||
lance::arrow::is_fixed_size_list(arr->type()));

ARROW_ASSIGN_OR_RAISE(auto values_position, out_->Tell());
if (::arrow::is_fixed_size_binary(arr->type_id())) {
auto a = std::static_pointer_cast<::arrow::FixedSizeBinaryArray>(arr);
ARROW_RETURN_NOT_OK(out_->Write(a->raw_values(), a->length() * a->byte_width()));
} else if (lance::arrow::is_fixed_size_list(arr->type())) {
auto list_arr = std::dynamic_pointer_cast<::arrow::FixedSizeListArray>(arr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use a dynamic cast here? i thought we already first check that it's a FixedSizeListArray?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I was thinking of being consistent to use a dynamic cast to cast base class to child class. dynamic_cast is safer w.r.t. to virtual tables and etc.

I should use the same in line 37 to use dynamic_cast tho.

assert(::arrow::is_primitive(list_arr->values()->type_id()));

auto plain_encoder = PlainEncoder(out_);
return plain_encoder.Write(list_arr->values());
}

return values_position;
}

std::string FixedSizeBinaryEncoder::ToString() const { return "FixedSizeBinaryEncoder"; }

::arrow::Result<std::shared_ptr<::arrow::Array>> FixedSizedBinaryDecoder::ToFixedSizeBinaryArray(
int32_t start, int32_t length) const {
auto bytes = type_->byte_width();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd probably call this nbytes or byte_width?

ARROW_ASSIGN_OR_RAISE(auto buf, infile_->ReadAt(position_ + start * bytes, length * bytes));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for fixed size binary it's just a plain encoder/decoder using fixed width data (e.g., any of the numeric dtypes) right? How come we don't just call plain_decoder like you do with the fixed size list handling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes , we can definitely do it.

return std::make_shared<::arrow::FixedSizeBinaryArray>(type_, length, buf);
}

::arrow::Result<std::shared_ptr<::arrow::Array>> FixedSizedBinaryDecoder::ToFixedSizeListArray(
int32_t start, int32_t length) const {
auto fixed_list_type = std::dynamic_pointer_cast<::arrow::FixedSizeListType>(type_);
auto value_type = fixed_list_type->value_field();
assert(::arrow::is_primitive(value_type->type()->id()));
auto plain_decoder = PlainDecoder(infile_, value_type->type());
ARROW_RETURN_NOT_OK(plain_decoder.Init());

auto list_size = fixed_list_type->list_size();
plain_decoder.Reset(position_, length_ * list_size);
ARROW_ASSIGN_OR_RAISE(auto values, plain_decoder.ToArray(start * list_size, length * list_size));
return std::make_shared<::arrow::FixedSizeListArray>(type_, length, values);
}

::arrow::Result<std::shared_ptr<::arrow::Array>> FixedSizedBinaryDecoder::ToArray(
int32_t start, std::optional<int32_t> length) const {
if (!length.has_value()) {
length = length_ - start;
}
if (start + length.value() > length_ || start > length_) {
return ::arrow::Status::IndexError(
fmt::format("{}::ToArray: out of range: start={}, length={}, page_length={}\n",
ToString(),
start,
length.value(),
length_));
}
if (lance::arrow::is_fixed_size_list(type_)) {
return ToFixedSizeListArray(start, length.value());
} else if (::arrow::is_fixed_size_binary(type_->id())) {
return ToFixedSizeBinaryArray(start, length.value());
}
return ::arrow::Status::Invalid("Invalid data type: {}\n", type_->ToString());
}

::arrow::Result<std::shared_ptr<::arrow::Array>> FixedSizedBinaryDecoder::Take(
std::shared_ptr<::arrow::Int32Array> indices) const {
std::shared_ptr<::arrow::ArrayBuilder> builder;
if (lance::arrow::is_fixed_size_list(type_)) {
auto list_type = std::dynamic_pointer_cast<::arrow::FixedSizeListType>(type_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question here wrt dynamic cast

ARROW_ASSIGN_OR_RAISE(auto value_builder,
::lance::arrow::GetArrayBuilder(list_type->value_type()));
builder = std::make_shared<::arrow::FixedSizeListBuilder>(pool_, value_builder, type_);
} else if (::arrow::is_fixed_size_binary(type_->id())) {
builder = std::make_shared<::arrow::FixedSizeBinaryBuilder>(type_);
} else {
return ::arrow::Status::Invalid("FixedSizeBuilderDecoder::Take: Invalid data type: ", type_);
}

// TODO: Use thread pool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be done for all decoders? Or just ones where we may end up reading large amounts of data for each scalar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done if we choose to pick "scalar" individually. Usually it works better for the case where scalar is big, i.e., large binary, tensors, images and etc.

Other cases, i.e., for numeric values, it is faster to read a block and then filter them. (i.e., 64KB = 8000 int64 values in one read)

for (int i = 0; i < indices->length(); i++) {
ARROW_ASSIGN_OR_RAISE(auto scalar, GetScalar(indices->Value(i)));
ARROW_RETURN_NOT_OK(builder->AppendScalar(*scalar));
}
return builder->Finish();
}

::arrow::Result<std::shared_ptr<::arrow::Scalar>> FixedSizedBinaryDecoder::GetScalar(
int64_t idx) const {
ARROW_ASSIGN_OR_RAISE(auto arr, ToArray(idx, 1));
return arr->GetScalar(0);
};

std::string FixedSizedBinaryDecoder::ToString() const { return "FixedSizeBinaryDecoder"; }

} // namespace lance::encodings
64 changes: 64 additions & 0 deletions cpp/src/lance/encodings/fixed_size_binary.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <arrow/array.h>
#include <arrow/io/api.h>

#include <memory>

#include "lance/encodings/encoder.h"

namespace lance::encodings {

/// Fixed length binary encoder
class FixedSizeBinaryEncoder : public Encoder {
public:
FixedSizeBinaryEncoder(const std::shared_ptr<::arrow::io::OutputStream>& out) noexcept;

~FixedSizeBinaryEncoder() override;

/// Write an fixed-size array, and returns the offsets to the index block.
::arrow::Result<int64_t> Write(const std::shared_ptr<::arrow::Array>& arr) override;

[[nodiscard]] std::string ToString() const override;
};

/// Fixed size binary decoder.
class FixedSizedBinaryDecoder : public Decoder {
public:
using Decoder::Decoder;

virtual ~FixedSizedBinaryDecoder() = default;

::arrow::Result<std::shared_ptr<::arrow::Scalar>> GetScalar(int64_t idx) const override;

::arrow::Result<std::shared_ptr<::arrow::Array>> ToArray(
int32_t start = 0, std::optional<int32_t> length = std::nullopt) const override;

::arrow::Result<std::shared_ptr<::arrow::Array>> Take(
std::shared_ptr<::arrow::Int32Array> indices) const override;

[[nodiscard]] std::string ToString() const;

private:
::arrow::Result<std::shared_ptr<::arrow::Array>> ToFixedSizeBinaryArray(int32_t start,
int32_t length) const;

::arrow::Result<std::shared_ptr<::arrow::Array>> ToFixedSizeListArray(int32_t start,
int32_t length) const;
};

} // namespace lance::encodings
Loading