Skip to content

Commit

Permalink
Write schema metadata to Manifest (#222)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Oct 4, 2022
1 parent 293e556 commit ce5d9ff
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 10 deletions.
26 changes: 20 additions & 6 deletions cpp/src/lance/format/manifest.cc
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "lance/format/manifest.h"

#include <arrow/result.h>

#include <memory>

#include "lance/arrow/type.h"
#include "lance/format/schema.h"
#include "lance/io/pb.h"

Expand All @@ -13,16 +26,14 @@ using arrow::Status;

namespace lance::format {

Manifest::Manifest(std::shared_ptr<Schema> schema)
: schema_(std::move(schema)) {}
Manifest::Manifest(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}

Manifest::Manifest(Manifest&& other) noexcept
: schema_(std::move(other.schema_)) {}
Manifest::Manifest(Manifest&& other) noexcept : schema_(std::move(other.schema_)) {}

::arrow::Result<std::shared_ptr<Manifest>> Manifest::Parse(
std::shared_ptr<::arrow::io::RandomAccessFile> in, int64_t offset) {
ARROW_ASSIGN_OR_RAISE(auto pb, io::ParseProto<pb::Manifest>(in, offset));
auto schema = std::make_unique<Schema>(pb.fields());
auto schema = std::make_unique<Schema>(pb.fields(), pb.metadata());
return std::make_shared<Manifest>(std::move(schema));
}

Expand All @@ -32,6 +43,9 @@ ::arrow::Result<int64_t> Manifest::Write(std::shared_ptr<::arrow::io::OutputStre
auto pb_field = pb.add_fields();
*pb_field = field;
}
for (const auto& [key, value] : schema_->metadata()) {
(*pb.mutable_metadata())[key] = value;
}
return io::WriteProto(out, pb);
}

Expand Down
16 changes: 14 additions & 2 deletions cpp/src/lance/format/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <arrow/status.h>
#include <arrow/type.h>
#include <arrow/util/key_value_metadata.h>
#include <arrow/util/string.h>
#include <fmt/format.h>
#include <fmt/ranges.h>
Expand Down Expand Up @@ -412,7 +413,9 @@ bool Field::operator==(const Field& other) const { return Equals(other, true); }

//------ Schema

Schema::Schema(const google::protobuf::RepeatedPtrField<::lance::format::pb::Field>& pb_fields) {
Schema::Schema(const google::protobuf::RepeatedPtrField<::lance::format::pb::Field>& pb_fields,
const google::protobuf::Map<std::string, std::string>& metadata)
: metadata_(std::begin(metadata), std::end(metadata)) {
for (auto& f : pb_fields) {
auto field = std::make_shared<Field>(f);
if (field->parent_id() < 0) {
Expand All @@ -429,6 +432,9 @@ Schema::Schema(const std::shared_ptr<::arrow::Schema>& schema) {
for (auto f : schema->fields()) {
fields_.emplace_back(make_shared<Field>(f));
}
if (schema->metadata()) {
schema->metadata()->ToUnorderedMap(&metadata_);
}
AssignIds();
}

Expand Down Expand Up @@ -677,7 +683,13 @@ std::shared_ptr<::arrow::Schema> Schema::ToArrow() const {
for (auto f : fields_) {
arrow_fields.emplace_back(f->ToArrow());
}
return ::arrow::schema(arrow_fields);

std::shared_ptr<::arrow::KeyValueMetadata> arrow_metadata;
if (!metadata_.empty()) {
arrow_metadata = std::make_shared<::arrow::KeyValueMetadata>(metadata_);
}

return ::arrow::schema(arrow_fields, arrow_metadata);
}

pb::Field::Type Field::GetNodeType() const {
Expand Down
17 changes: 15 additions & 2 deletions cpp/src/lance/format/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
#include <arrow/compute/api.h>
#include <arrow/type.h>

#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "lance/encodings/encoder.h"
Expand All @@ -40,7 +40,12 @@ class Schema final {
/// Construct Lance Schema from Arrow Schema.
Schema(const std::shared_ptr<::arrow::Schema>& schema);

Schema(const google::protobuf::RepeatedPtrField<::lance::format::pb::Field>& pb_fields);
/// Construct Lance Schema from Protobuf.
///
/// \param pb_fields the fields described in protobuf.
/// \param metadata the metadata pairs.
Schema(const google::protobuf::RepeatedPtrField<::lance::format::pb::Field>& pb_fields,
const google::protobuf::Map<std::string, std::string>& metadata = {});

~Schema() = default;

Expand Down Expand Up @@ -91,6 +96,11 @@ class Schema final {
/// \return the field if found. Return nullptr if not found.
std::shared_ptr<Field> GetField(const std::string& name) const;

/// Schema metadata, k/v pairs.
const std::unordered_map<std::string, std::string>& metadata() const {
return metadata_;
}

std::string ToString() const;

bool Equals(const std::shared_ptr<Schema>& other, bool check_id = true) const;
Expand All @@ -110,6 +120,9 @@ class Schema final {
bool RemoveField(int32_t id);

std::vector<std::shared_ptr<Field>> fields_;

/// Schema metadata
std::unordered_map<std::string, std::string> metadata_;
};

/// \brief Field is the metadata of a column on disk.
Expand Down
34 changes: 34 additions & 0 deletions cpp/src/lance/format/schema_test.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,33 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "lance/format/schema.h"

#include <arrow/type.h>
#include <arrow/util/key_value_metadata.h>
#include <fmt/format.h>

#include <catch2/catch_test_macros.hpp>

#include "lance/arrow/stl.h"
#include "lance/testing/extension_types.h"
#include "lance/testing/io.h"
#include "lance/testing/json.h"

using lance::arrow::ToArray;
using lance::testing::MakeDataset;
using lance::testing::TableFromJSON;

const auto arrow_schema = ::arrow::schema(
{::arrow::field("pk", ::arrow::utf8()),
Expand Down Expand Up @@ -183,4 +205,16 @@ TEST_CASE("Test nested storage type") {
::arrow::field("ymax", ::arrow::float64()),
})),
})));
}

TEST_CASE("Test schema metadata") {
auto schema = ::arrow::schema({::arrow::field("val", ::arrow::int32())},
::arrow::KeyValueMetadata::Make({"k1", "k2"}, {"v1", "v2"}));

auto table = ::arrow::Table::Make(schema, {ToArray({1, 2, 3}).ValueOrDie()});

auto dataset = MakeDataset(table).ValueOrDie();
CHECK(dataset->schema()->metadata());
CHECK(dataset->schema()->metadata()->Get("k1").ValueOrDie() == "v1");
CHECK(dataset->schema()->metadata()->Get("k1").ValueOrDie() == "v1");
}
3 changes: 3 additions & 0 deletions protos/format.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ Format:
message Manifest {
// All fields of the dataset, including the nested fields.
repeated Field fields = 1;

// Dataset metadata.
map<string, bytes> metadata = 2;
}

// Metadata of one Lane file.
Expand Down
10 changes: 10 additions & 0 deletions python/lance/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,13 @@ def test_write_dataset(tmp_path: Path):
assert actual == pa.Table.from_pandas(
pd.DataFrame({"label": [123, 789], "values": [22, 2.24]})
)


def test_write_dataset_with_metadata(tmp_path: Path):
table = pa.Table.from_pylist([{"a": 1}, {"a": 2}], metadata={"k1": "v1", "k2": "v2"})
ds.write_dataset(table, tmp_path / "test.lance", format=LanceFileFormat())

actual = dataset(str(tmp_path / "test.lance")).to_table()
schema: pa.Schema = actual.schema
assert schema.metadata[b"k1"] == b"v1"
assert schema.metadata[b"k2"] == b"v2"

0 comments on commit ce5d9ff

Please sign in to comment.