From 0d0b3c98b36abe14ef8ac4ffab50916cd7005caf Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 12 Nov 2022 03:19:54 -0800 Subject: [PATCH 01/10] print --- cpp/src/lance/io/reader.cc | 2 ++ python/lance/data/convert/base.py | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cpp/src/lance/io/reader.cc b/cpp/src/lance/io/reader.cc index cc6ad465f8..d6c7b4a540 100644 --- a/cpp/src/lance/io/reader.cc +++ b/cpp/src/lance/io/reader.cc @@ -95,6 +95,7 @@ Status FileReader::Open() { metadata_, format::Metadata::Make(::arrow::SliceBuffer(cached_last_page_, inbuf_offset))); if (!manifest_) { + /// Multiple files can share the manifest. ARROW_ASSIGN_OR_RAISE(manifest_, metadata_->GetManifest(file_)); // We need read the dictionary from the same file. auto visitor = format::ReadDictionaryVisitor(file_); @@ -106,6 +107,7 @@ Status FileReader::Open() { auto num_batches = metadata_->num_batches(); auto num_columns = manifest_->schema()->GetFieldsCount(); + fmt::print("num_columns: {}\n", num_columns); ARROW_ASSIGN_OR_RAISE( page_table_, format::PageTable::Make(file_, metadata_->page_table_position(), num_columns, num_batches)); diff --git a/python/lance/data/convert/base.py b/python/lance/data/convert/base.py index 8c9aa14f11..6619b657f1 100644 --- a/python/lance/data/convert/base.py +++ b/python/lance/data/convert/base.py @@ -30,9 +30,11 @@ class DatasetConverter(ABC): """Base class for converting raw => pandas => Arrow => Lance""" - def __init__(self, name, uri_root, images_root=None): + def __init__(self, name, uri_root, images_root: str=None): self.name = name self.uri_root = uri_root + if images_root is None: + images_root = os.path.join(uri_root, "images") self.images_root = images_root @abstractmethod From b8e91cdca071c69e096111fb7d4c1f5db6639cda Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 15 Nov 2022 10:48:30 -0800 Subject: [PATCH 02/10] write manifest with dictionary values --- cpp/src/lance/arrow/dataset.cc | 4 +-- cpp/src/lance/arrow/dataset_test.cc | 38 +++++++++++++++++++++++++-- cpp/src/lance/arrow/file_lance.cc | 2 +- cpp/src/lance/arrow/fragment.cc | 21 +++++++++------ cpp/src/lance/arrow/fragment.h | 13 ++++----- cpp/src/lance/encodings/binary.cc | 3 ++- cpp/src/lance/encodings/binary.h | 4 +-- cpp/src/lance/encodings/dictionary.cc | 7 +++-- cpp/src/lance/encodings/dictionary.h | 4 +-- cpp/src/lance/encodings/encoder.cc | 1 + cpp/src/lance/format/schema.cc | 8 ++++-- cpp/src/lance/format/schema.h | 4 +++ cpp/src/lance/format/visitors.cc | 2 +- cpp/src/lance/io/exec/filter.h | 2 +- cpp/src/lance/io/writer.cc | 14 ++++++++-- cpp/src/lance/io/writer.h | 6 ++++- cpp/src/lance/testing/io.cc | 4 ++- 17 files changed, 101 insertions(+), 36 deletions(-) diff --git a/cpp/src/lance/arrow/dataset.cc b/cpp/src/lance/arrow/dataset.cc index d436566a56..7a650cdf08 100644 --- a/cpp/src/lance/arrow/dataset.cc +++ b/cpp/src/lance/arrow/dataset.cc @@ -205,7 +205,7 @@ ::arrow::Status LanceDataset::Write(const ::arrow::dataset::FileSystemDatasetWri auto manifest_path = GetManifestPath(base_dir, manifest->version()); { ARROW_ASSIGN_OR_RAISE(auto out, fs->OpenOutputStream(manifest_path)); - ARROW_RETURN_NOT_OK(manifest->Write(out)); + ARROW_RETURN_NOT_OK(lance::io::FileWriter::WriteManifest(out, *manifest)); } auto latest_manifest_path = GetManifestPath(base_dir, std::nullopt); return fs->CopyFile(manifest_path, latest_manifest_path); @@ -263,7 +263,7 @@ ::arrow::Result<::arrow::dataset::FragmentIterator> LanceDataset::GetFragmentsIm std::vector> fragments = impl_->manifest->fragments() | views::transform([this](auto& data_fragment) { return std::make_shared( - impl_->fs, impl_->data_dir(), data_fragment, impl_->manifest->schema()); + impl_->fs, impl_->data_dir(), data_fragment, impl_->manifest); }) | ranges::to; diff --git a/cpp/src/lance/arrow/dataset_test.cc b/cpp/src/lance/arrow/dataset_test.cc index 13bb173166..a2a5c845de 100644 --- a/cpp/src/lance/arrow/dataset_test.cc +++ b/cpp/src/lance/arrow/dataset_test.cc @@ -19,8 +19,8 @@ #include #include -#include #include +#include #include "lance/arrow/file_lance.h" #include "lance/arrow/stl.h" @@ -29,7 +29,6 @@ using lance::arrow::ToArray; using namespace ranges::views; - std::shared_ptr<::arrow::Table> ReadTable(const std::string& uri, std::optional version) { std::string path; auto fs = ::arrow::fs::FileSystemFromUriOrPath(uri, &path).ValueOrDie(); @@ -38,6 +37,24 @@ std::shared_ptr<::arrow::Table> ReadTable(const std::string& uri, std::optional< return actual_dataset->NewScan().ValueOrDie()->Finish().ValueOrDie()->ToTable().ValueOrDie(); } +// Write table as dataset. +std::string WriteTable(const std::shared_ptr<::arrow::Table>& table) { + auto base_uri = lance::testing::MakeTemporaryDir().ValueOrDie() + "/testdata"; + auto format = lance::arrow::LanceFileFormat::Make(); + ::arrow::dataset::FileSystemDatasetWriteOptions write_options; + std::string path; + auto fs = ::arrow::fs::FileSystemFromUriOrPath(base_uri, &path).ValueOrDie(); + write_options.filesystem = fs; + write_options.base_dir = path; + write_options.file_write_options = format->DefaultWriteOptions(); + + auto dataset = lance::testing::MakeDataset(table).ValueOrDie(); + CHECK(lance::arrow::LanceDataset::Write(write_options, + dataset->NewScan().ValueOrDie()->Finish().ValueOrDie()) + .ok()); + return base_uri; +} + TEST_CASE("Create new dataset") { auto ids = ToArray({1, 2, 3, 4, 5, 6, 8}).ValueOrDie(); auto values = ToArray({"a", "b", "c", "d", "e", "f", "g"}).ValueOrDie(); @@ -199,4 +216,21 @@ TEST_CASE("Dataset overwrite error cases") { lance::arrow::LanceDataset::kOverwrite); INFO("Status: " << status.message() << " is ok: " << status.ok()); CHECK(status.IsIOError()); +} + +TEST_CASE("Dataset write dictionary array") { + auto dict_values = ToArray({"a", "b", "c"}).ValueOrDie(); + auto dict_indices = ToArray({0, 1, 1, 2, 2, 0}).ValueOrDie(); + auto data_type = ::arrow::dictionary(::arrow::int32(), ::arrow::utf8()); + auto dict_arr = + ::arrow::DictionaryArray::FromArrays( + data_type, dict_indices, dict_values) + .ValueOrDie(); + auto table = + ::arrow::Table::Make(::arrow::schema({::arrow::field("dict", data_type)}), {dict_arr}); + + auto base_uri = WriteTable(table); + fmt::print("Base URI: {}\n", base_uri); + + auto actual = ReadTable(base_uri, 1); } \ No newline at end of file diff --git a/cpp/src/lance/arrow/file_lance.cc b/cpp/src/lance/arrow/file_lance.cc index 2db0e9d5f7..78347be442 100644 --- a/cpp/src/lance/arrow/file_lance.cc +++ b/cpp/src/lance/arrow/file_lance.cc @@ -103,7 +103,7 @@ ::arrow::Future> LanceFileFormat::CountRows( ::arrow::Result<::arrow::RecordBatchGenerator> LanceFileFormat::ScanBatchesAsync( const std::shared_ptr<::arrow::dataset::ScanOptions>& options, const std::shared_ptr<::arrow::dataset::FileFragment>& file) const { - ARROW_ASSIGN_OR_RAISE(auto fragment, LanceFragment::Make(*file, impl_->manifest->schema())); + ARROW_ASSIGN_OR_RAISE(auto fragment, LanceFragment::Make(*file, impl_->manifest)); ARROW_ASSIGN_OR_RAISE(auto batch_reader, lance::io::RecordBatchReader::Make(*fragment, options)); return ::arrow::RecordBatchGenerator(std::move(batch_reader)); diff --git a/cpp/src/lance/arrow/fragment.cc b/cpp/src/lance/arrow/fragment.cc index 3b5c55a497..9069e11d5b 100644 --- a/cpp/src/lance/arrow/fragment.cc +++ b/cpp/src/lance/arrow/fragment.cc @@ -23,6 +23,7 @@ #include "lance/arrow/file_lance.h" #include "lance/arrow/utils.h" #include "lance/format/data_fragment.h" +#include "lance/format/manifest.h" #include "lance/format/schema.h" #include "lance/io/reader.h" #include "lance/io/record_batch_reader.h" @@ -33,22 +34,23 @@ namespace fs = std::filesystem; namespace lance::arrow { ::arrow::Result> LanceFragment::Make( - const ::arrow::dataset::FileFragment& file_fragment, std::shared_ptr schema) { - auto field_ids = schema->GetFieldIds(); + const ::arrow::dataset::FileFragment& file_fragment, + std::shared_ptr manifest) { + auto field_ids = manifest->schema()->GetFieldIds(); auto data_fragment = std::make_shared( format::DataFile(file_fragment.source().path(), field_ids)); return std::make_shared( - file_fragment.source().filesystem(), "", data_fragment, schema); + file_fragment.source().filesystem(), "", std::move(data_fragment), std::move(manifest)); } LanceFragment::LanceFragment(std::shared_ptr<::arrow::fs::FileSystem> fs, std::string data_dir, std::shared_ptr fragment, - std::shared_ptr schema) + std::shared_ptr manifest) : fs_(std::move(fs)), data_uri_(std::move(data_dir)), fragment_(std::move(fragment)), - schema_(std::move(schema)) {} + manifest_(std::move(manifest)) {} ::arrow::Result<::arrow::RecordBatchGenerator> LanceFragment::ScanBatchesAsync( const std::shared_ptr<::arrow::dataset::ScanOptions>& options) { @@ -57,7 +59,7 @@ ::arrow::Result<::arrow::RecordBatchGenerator> LanceFragment::ScanBatchesAsync( } ::arrow::Result> LanceFragment::ReadPhysicalSchemaImpl() { - return schema_->ToArrow(); + return schema()->ToArrow(); } ::arrow::Result> LanceFragment::Open( @@ -71,14 +73,15 @@ ::arrow::Result> LanceFragment: executor->Submit( [this, &schema](auto idx) -> ::arrow::Result { auto& data_file = this->fragment_->data_files()[idx]; - ARROW_ASSIGN_OR_RAISE(auto data_file_schema, schema_->Project(data_file.fields())); + ARROW_ASSIGN_OR_RAISE(auto data_file_schema, + this->schema()->Project(data_file.fields())); ARROW_ASSIGN_OR_RAISE(auto intersection, schema.Intersection(*data_file_schema)); if (intersection->fields().empty()) { return std::make_tuple(nullptr, nullptr); } auto full_path = (fs::path(data_uri_) / data_file.path()).string(); ARROW_ASSIGN_OR_RAISE(auto infile, fs_->OpenInputFile(full_path)) - ARROW_ASSIGN_OR_RAISE(auto reader, lance::io::FileReader::Make(infile)); + ARROW_ASSIGN_OR_RAISE(auto reader, lance::io::FileReader::Make(infile, this->manifest_)); return std::make_tuple(std::move(reader), intersection); }, i)); @@ -96,6 +99,8 @@ ::arrow::Result> LanceFragment: return readers; } +const std::shared_ptr& LanceFragment::schema() const { return manifest_->schema(); } + ::arrow::Result LanceFragment::FastCountRow() const { assert(!fragment_->data_files().empty()); ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(0)); diff --git a/cpp/src/lance/arrow/fragment.h b/cpp/src/lance/arrow/fragment.h index 233c93e4fe..7dee2b391c 100644 --- a/cpp/src/lance/arrow/fragment.h +++ b/cpp/src/lance/arrow/fragment.h @@ -45,21 +45,22 @@ class LanceFragment : public ::arrow::dataset::Fragment { /// /// It creates a LanceFragment from `arrow.dataset.FileFragment`. /// \param file_fragment plain dataset file fragment - /// \param schema the schema of dataset. + /// \param manifest dataset manifest. /// \return LanceFragment static ::arrow::Result> Make( - const ::arrow::dataset::FileFragment& file_fragment, std::shared_ptr schema); + const ::arrow::dataset::FileFragment& file_fragment, + std::shared_ptr manifest); /// Constructor /// /// \param fs a file system instance to conduct IOs. /// \param data_dir the base directory to store data. /// \param fragment data fragment, the metadata of the fragment. - /// \param schema the schema of the Fragment. + /// \param manifest dataset manifest. LanceFragment(std::shared_ptr<::arrow::fs::FileSystem> fs, std::string data_dir, std::shared_ptr fragment, - std::shared_ptr schema); + std::shared_ptr manifest); /// Destructor. ~LanceFragment() override = default; @@ -79,7 +80,7 @@ class LanceFragment : public ::arrow::dataset::Fragment { ::arrow::internal::Executor* executor = ::arrow::internal::GetCpuThreadPool()) const; /// Dataset schema. - const std::shared_ptr& schema() const { return schema_; } + const std::shared_ptr& schema() const; protected: ::arrow::Result> ReadPhysicalSchemaImpl() override; @@ -98,7 +99,7 @@ class LanceFragment : public ::arrow::dataset::Fragment { std::shared_ptr<::arrow::fs::FileSystem> fs_; std::string data_uri_; std::shared_ptr fragment_; - std::shared_ptr schema_; + std::shared_ptr manifest_; }; } // namespace lance::arrow diff --git a/cpp/src/lance/encodings/binary.cc b/cpp/src/lance/encodings/binary.cc index 06f332a795..0cad41f5b1 100644 --- a/cpp/src/lance/encodings/binary.cc +++ b/cpp/src/lance/encodings/binary.cc @@ -20,6 +20,7 @@ #include #include +#include #include using arrow::Result; @@ -30,7 +31,7 @@ using std::vector; namespace lance::encodings { VarBinaryEncoder::VarBinaryEncoder(std::shared_ptr<::arrow::io::OutputStream> out) noexcept - : Encoder(out) {} + : Encoder(std::move(out)) {} Result VarBinaryEncoder::Write(const std::shared_ptr<::arrow::Array>& data) { ARROW_ASSIGN_OR_RAISE(auto values_position, out_->Tell()); diff --git a/cpp/src/lance/encodings/binary.h b/cpp/src/lance/encodings/binary.h index c5734977e3..10437443be 100644 --- a/cpp/src/lance/encodings/binary.h +++ b/cpp/src/lance/encodings/binary.h @@ -49,7 +49,7 @@ class VarBinaryEncoder : public Encoder { explicit VarBinaryEncoder(std::shared_ptr<::arrow::io::OutputStream> out) noexcept; - virtual ~VarBinaryEncoder() = default; + ~VarBinaryEncoder() override = default; /// Write an Array, and returns the offsets to the index block. ::arrow::Result Write(const std::shared_ptr<::arrow::Array>& arr) override; @@ -68,7 +68,7 @@ class VarBinaryDecoder : public Decoder { public: using Decoder::Decoder; - virtual ~VarBinaryDecoder() = default; + ~VarBinaryDecoder() override = default; /** Get a Value without scanning the full row group. */ ::arrow::Result> GetScalar(int64_t idx) const override; diff --git a/cpp/src/lance/encodings/dictionary.cc b/cpp/src/lance/encodings/dictionary.cc index c8ec72c64c..4bdab8b280 100644 --- a/cpp/src/lance/encodings/dictionary.cc +++ b/cpp/src/lance/encodings/dictionary.cc @@ -37,7 +37,8 @@ ::arrow::Result DictionaryEncoder::Write(const std::shared_ptr<::arrow: return plain_encoder_->Write(dict_arr->indices()); } -::arrow::Result DictionaryEncoder::WriteValueArray(std::shared_ptr<::arrow::Array> arr) { +::arrow::Result DictionaryEncoder::WriteValueArray( + const std::shared_ptr<::arrow::Array>& arr) { if (::arrow::is_primitive(arr->type_id())) { return PlainEncoder(out_).Write(arr); } else if (arr->type_id() == ::arrow::Type::STRING) { @@ -58,9 +59,7 @@ DictionaryDecoder::DictionaryDecoder(std::shared_ptr<::arrow::io::RandomAccessFi assert(dict); } -::arrow::Status DictionaryDecoder::Init() { - return plain_decoder_->Init(); -} +::arrow::Status DictionaryDecoder::Init() { return plain_decoder_->Init(); } void DictionaryDecoder::Reset(int64_t position, int32_t length) { Decoder::Reset(position, length); diff --git a/cpp/src/lance/encodings/dictionary.h b/cpp/src/lance/encodings/dictionary.h index e34fcd8b97..08cd6884ab 100644 --- a/cpp/src/lance/encodings/dictionary.h +++ b/cpp/src/lance/encodings/dictionary.h @@ -30,14 +30,14 @@ class DictionaryEncoder : public Encoder { public: DictionaryEncoder(std::shared_ptr<::arrow::io::OutputStream> out); - virtual ~DictionaryEncoder() = default; + ~DictionaryEncoder() override = default; ::arrow::Result Write(const std::shared_ptr<::arrow::Array>& arr) override; /// Write value array. /// /// It should be only called once per dataset / file. - ::arrow::Result WriteValueArray(std::shared_ptr<::arrow::Array> arr); + ::arrow::Result WriteValueArray(const std::shared_ptr<::arrow::Array>& arr); std::string ToString() const override; diff --git a/cpp/src/lance/encodings/encoder.cc b/cpp/src/lance/encodings/encoder.cc index 134cb714d0..699fc8549f 100644 --- a/cpp/src/lance/encodings/encoder.cc +++ b/cpp/src/lance/encodings/encoder.cc @@ -76,6 +76,7 @@ Decoder::Decoder(std::shared_ptr<::arrow::io::RandomAccessFile> infile, ::arrow::Status Decoder::Init() { return ::arrow::Status::OK(); } void Decoder::Reset(int64_t position, int32_t length) { + fmt::print("Reset to position={}, length={}\n", position, length); position_ = position; length_ = length; } diff --git a/cpp/src/lance/format/schema.cc b/cpp/src/lance/format/schema.cc index f586170a4c..6be8902548 100644 --- a/cpp/src/lance/format/schema.cc +++ b/cpp/src/lance/format/schema.cc @@ -64,12 +64,12 @@ void Field::Init(std::shared_ptr<::arrow::DataType> dtype) { if (::lance::arrow::is_struct(dtype)) { auto struct_type = std::static_pointer_cast<::arrow::StructType>(dtype); for (auto& arrow_field : struct_type->fields()) { - children_.push_back(std::shared_ptr(new Field(arrow_field))); + children_.push_back(std::make_shared(arrow_field)); } } else if (::lance::arrow::is_list(dtype)) { auto list_type = std::static_pointer_cast<::arrow::ListType>(dtype); children_.emplace_back( - std::shared_ptr(new Field(::arrow::field("item", list_type->value_type())))); + std::make_shared(::arrow::field("item", list_type->value_type()))); encoding_ = encodings::PLAIN; } else if (::arrow::is_binary_like(type_id) || ::arrow::is_large_binary_like(type_id)) { encoding_ = encodings::VAR_BINARY; @@ -195,6 +195,10 @@ ::arrow::Status Field::LoadDictionary(std::shared_ptr<::arrow::io::RandomAccessF auto decoder = lance::encodings::VarBinaryDecoder<::arrow::StringType>(std::move(infile), ::arrow::utf8()); + fmt::print("Load dictionary: name={} offset={} length={}\n", + name_, + dictionary_offset_, + dictionary_page_length_); decoder.Reset(dictionary_offset_, dictionary_page_length_); ARROW_ASSIGN_OR_RAISE(auto dict_arr, decoder.ToArray()); diff --git a/cpp/src/lance/format/schema.h b/cpp/src/lance/format/schema.h index 4c13e48a0e..50474ada91 100644 --- a/cpp/src/lance/format/schema.h +++ b/cpp/src/lance/format/schema.h @@ -199,6 +199,10 @@ class Field final { const std::shared_ptr<::arrow::Array>& dictionary() const; + /// Set the directory values for a dictionary field. + /// + /// \param dict_arr + /// \return `status::OK()` if success. ::arrow::Status set_dictionary(std::shared_ptr<::arrow::Array> dict_arr); lance::encodings::Encoding encoding() const { return encoding_; }; diff --git a/cpp/src/lance/format/visitors.cc b/cpp/src/lance/format/visitors.cc index f69363f8a9..baf91294ae 100644 --- a/cpp/src/lance/format/visitors.cc +++ b/cpp/src/lance/format/visitors.cc @@ -48,7 +48,7 @@ ::arrow::Result<::std::shared_ptr<::arrow::Field>> ToArrowVisitor::DoVisit( } WriteDictionaryVisitor::WriteDictionaryVisitor(std::shared_ptr<::arrow::io::OutputStream> out) - : out_(out) {} + : out_(std::move(out)) {} ::arrow::Status WriteDictionaryVisitor::Visit(std::shared_ptr root) { if (::arrow::is_dictionary(root->storage_type()->id())) { diff --git a/cpp/src/lance/io/exec/filter.h b/cpp/src/lance/io/exec/filter.h index 16053a7232..824656e8f1 100644 --- a/cpp/src/lance/io/exec/filter.h +++ b/cpp/src/lance/io/exec/filter.h @@ -41,7 +41,7 @@ class Filter : public ExecNode { ::arrow::Result Next() override; constexpr Type type() const override { return Type::kFilter; } - + std::string ToString() const override; private: diff --git a/cpp/src/lance/io/writer.cc b/cpp/src/lance/io/writer.cc index d506bb62b4..2078e53502 100644 --- a/cpp/src/lance/io/writer.cc +++ b/cpp/src/lance/io/writer.cc @@ -34,7 +34,7 @@ namespace lance::io { static constexpr int16_t kMajorVersion = 0; static constexpr int16_t kMinorVersion = 1; -namespace internal { +namespace { /** * Write the 16 bytes footer to the end of a file. @@ -81,6 +81,16 @@ ::arrow::Status FileWriter::Write(const std::shared_ptr<::arrow::RecordBatch>& b return ::arrow::Status::OK(); } +::arrow::Status FileWriter::WriteManifest(const std::shared_ptr<::arrow::io::OutputStream>& destination, + const lance::format::Manifest& manifest) { + // Write dictionary values first. + auto visitor = format::WriteDictionaryVisitor(destination); + ARROW_RETURN_NOT_OK(visitor.VisitSchema(*manifest.schema())); + + ARROW_ASSIGN_OR_RAISE(auto offset, manifest.Write(destination)); + return ::lance::io::WriteFooter(destination, offset); +} + ::arrow::Status FileWriter::WriteArray(const std::shared_ptr& field, const std::shared_ptr<::arrow::Array>& arr) { if (::lance::arrow::is_extension(arr->type())) { @@ -198,7 +208,7 @@ ::arrow::Status FileWriter::WriteFooter() { metadata_->SetManifestPosition(pos); ARROW_ASSIGN_OR_RAISE(pos, metadata_->Write(destination_)); - return internal::WriteFooter(destination_, pos); + return ::lance::io::WriteFooter(destination_, pos); } ::arrow::Future<> FileWriter::FinishInternal() { diff --git a/cpp/src/lance/io/writer.h b/cpp/src/lance/io/writer.h index 46171352f6..51b19fc940 100644 --- a/cpp/src/lance/io/writer.h +++ b/cpp/src/lance/io/writer.h @@ -41,11 +41,15 @@ class FileWriter final : public ::arrow::dataset::FileWriter { std::shared_ptr<::arrow::io::OutputStream> destination, ::arrow::fs::FileLocator destination_locator = {}); - ~FileWriter(); + ~FileWriter() override; /// Write an arrow RecordBatch to the file. ::arrow::Status Write(const std::shared_ptr<::arrow::RecordBatch>& batch) override; + /// Write Manifest to a file. + static ::arrow::Status WriteManifest(const std::shared_ptr<::arrow::io::OutputStream>& destination, + const lance::format::Manifest& manifest); + private: ::arrow::Future<> FinishInternal() override; diff --git a/cpp/src/lance/testing/io.cc b/cpp/src/lance/testing/io.cc index a765c318d2..8c8cae9d90 100644 --- a/cpp/src/lance/testing/io.cc +++ b/cpp/src/lance/testing/io.cc @@ -31,6 +31,7 @@ #include "lance/arrow/fragment.h" #include "lance/arrow/utils.h" #include "lance/arrow/writer.h" +#include "lance/format/manifest.h" #include "lance/io/reader.h" #include "lance/io/writer.h" @@ -145,10 +146,11 @@ ::arrow::Result> MakeFragment( } auto schema = std::make_shared(table->schema()); + auto manifest = std::make_shared(schema); auto data_file = lance::format::DataFile(filename, schema->GetFieldIds()); auto fragment = std::make_shared(data_file); return std::make_shared( - local_fs, data_dir, std::move(fragment), std::move(schema)); + local_fs, data_dir, std::move(fragment), std::move(manifest)); }; } // namespace lance::testing From 763cd716dd84b12fe5998e24edd5c4ab255c5dde Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 15 Nov 2022 11:51:52 -0800 Subject: [PATCH 03/10] put read manifest to FileReader --- cpp/src/lance/arrow/dataset.cc | 4 +++- cpp/src/lance/format/manifest.cc | 6 ++++++ cpp/src/lance/format/manifest.h | 7 ++++++- cpp/src/lance/format/schema.h | 4 ++-- cpp/src/lance/io/reader.cc | 35 ++++++++++++++++++++++++++++++-- cpp/src/lance/io/reader.h | 4 ++++ 6 files changed, 54 insertions(+), 6 deletions(-) diff --git a/cpp/src/lance/arrow/dataset.cc b/cpp/src/lance/arrow/dataset.cc index 7a650cdf08..61b18a88e3 100644 --- a/cpp/src/lance/arrow/dataset.cc +++ b/cpp/src/lance/arrow/dataset.cc @@ -31,6 +31,7 @@ #include "lance/arrow/fragment.h" #include "lance/format/manifest.h" #include "lance/format/schema.h" +#include "lance/io/reader.h" #include "lance/io/writer.h" namespace fs = std::filesystem; @@ -82,7 +83,7 @@ std::string GetBasenameTemplate() { ::arrow::Result> OpenManifest( const std::shared_ptr<::arrow::fs::FileSystem>& fs, const std::string& path) { ARROW_ASSIGN_OR_RAISE(auto in, fs->OpenInputFile(path)); - return lance::format::Manifest::Parse(in, 0); + return lance::io::FileReader::OpenManifest(in); } } // namespace @@ -202,6 +203,7 @@ ::arrow::Status LanceDataset::Write(const ::arrow::dataset::FileSystemDatasetWri // It only supports single writer at the moment. auto version_dir = (fs::path(base_dir) / kVersionsDir).string(); ARROW_RETURN_NOT_OK(fs->CreateDir(version_dir)); + fmt::print("Manifest schema ptr: {}\n", fmt::ptr(manifest->schema().get())); auto manifest_path = GetManifestPath(base_dir, manifest->version()); { ARROW_ASSIGN_OR_RAISE(auto out, fs->OpenOutputStream(manifest_path)); diff --git a/cpp/src/lance/format/manifest.cc b/cpp/src/lance/format/manifest.cc index f8bbf2c542..26c886313b 100644 --- a/cpp/src/lance/format/manifest.cc +++ b/cpp/src/lance/format/manifest.cc @@ -49,6 +49,12 @@ ::arrow::Result> Manifest::Parse( return std::shared_ptr(new Manifest(pb)); } +::arrow::Result> Manifest::Parse( + const std::shared_ptr<::arrow::Buffer>& buffer) { + ARROW_ASSIGN_OR_RAISE(auto pb, io::ParseProto(buffer)); + return std::shared_ptr(new Manifest(pb)); +} + ::arrow::Result Manifest::Write(std::shared_ptr<::arrow::io::OutputStream> out) const { lance::format::pb::Manifest pb; for (auto field : schema_->ToProto()) { diff --git a/cpp/src/lance/format/manifest.h b/cpp/src/lance/format/manifest.h index 0183c9459f..18a331cb65 100644 --- a/cpp/src/lance/format/manifest.h +++ b/cpp/src/lance/format/manifest.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -54,6 +55,10 @@ class Manifest final { static ::arrow::Result> Parse( std::shared_ptr<::arrow::io::RandomAccessFile> in, int64_t offset); + /// Parse a Manifest from a buffer. + static ::arrow::Result> Parse( + const std::shared_ptr<::arrow::Buffer>& buffer); + /// Write the Manifest to a file. /// /// \param out the output stream to write this Manifest to. @@ -87,7 +92,7 @@ class Manifest final { std::vector> fragments_; - Manifest(const lance::format::pb::Manifest& pb); + explicit Manifest(const lance::format::pb::Manifest& pb); }; } // namespace lance::format diff --git a/cpp/src/lance/format/schema.h b/cpp/src/lance/format/schema.h index 50474ada91..754e9b0a10 100644 --- a/cpp/src/lance/format/schema.h +++ b/cpp/src/lance/format/schema.h @@ -201,8 +201,8 @@ class Field final { /// Set the directory values for a dictionary field. /// - /// \param dict_arr - /// \return `status::OK()` if success. + /// \param dict_arr dictionary values + /// \return `status::OK()` if success. Fails if the dictionary is already set. ::arrow::Status set_dictionary(std::shared_ptr<::arrow::Array> dict_arr); lance::encodings::Encoding encoding() const { return encoding_; }; diff --git a/cpp/src/lance/io/reader.cc b/cpp/src/lance/io/reader.cc index d6c7b4a540..cbe87fa1be 100644 --- a/cpp/src/lance/io/reader.cc +++ b/cpp/src/lance/io/reader.cc @@ -49,6 +49,8 @@ typedef ::arrow::Result> ScalarResult; namespace lance::io { +namespace { + ::arrow::Result ReadFooter(const std::shared_ptr<::arrow::Buffer>& buf) { assert(buf->size() >= 16); if (auto magic_buf = ::arrow::SliceBuffer(buf, buf->size() - 4); @@ -56,19 +58,48 @@ ::arrow::Result ReadFooter(const std::shared_ptr<::arrow::Buffer>& buf) return Status::IOError( fmt::format("Invalidate file format: MAGIC NUM is not {}", lance::format::kMagic)); } - // Metadata Offset return ReadInt(buf->data() + buf->size() - 16); } +} // namespace + ::arrow::Result> FileReader::Make( std::shared_ptr<::arrow::io::RandomAccessFile> in, std::shared_ptr<::lance::format::Manifest> manifest, ::arrow::MemoryPool* pool) { - auto reader = std::make_unique(in, manifest, pool); + auto reader = std::make_unique(std::move(in), std::move(manifest), pool); ARROW_RETURN_NOT_OK(reader->Open()); return reader; } +::arrow::Result> FileReader::OpenManifest( + const std::shared_ptr<::arrow::io::RandomAccessFile>& in) { + constexpr auto kBufReadBytes = 8 * 1024 * 1024; // Read 8 MB; + ::arrow::BufferVector buffers; + int64_t pos = 0; + while (true) { + ARROW_ASSIGN_OR_RAISE(auto buf, in->ReadAt(pos, kBufReadBytes)); + auto read_nbytes = buf->size(); + if (read_nbytes > 0) { + buffers.emplace_back(std::move(buf)); + } + if (read_nbytes < kBufReadBytes) { + break; + } + pos += read_nbytes; + } + assert(!buffers.empty()); + auto buf = buffers[0]; + if (buffers.size() > 1) { + // Unlikely to get here + ARROW_ASSIGN_OR_RAISE(buf, ::arrow::ConcatenateBuffers(buffers)); + } + ARROW_ASSIGN_OR_RAISE(auto manifest_pos, ReadFooter(buf)); + ARROW_ASSIGN_OR_RAISE(auto manifest, + lance::format::Manifest::Parse(SliceBuffer(buf, manifest_pos))); + return manifest; +} + FileReader::FileReader(std::shared_ptr<::arrow::io::RandomAccessFile> in, std::shared_ptr<::lance::format::Manifest> manifest, ::arrow::MemoryPool* pool) noexcept diff --git a/cpp/src/lance/io/reader.h b/cpp/src/lance/io/reader.h index 67218bf34f..23b7a9b7b1 100644 --- a/cpp/src/lance/io/reader.h +++ b/cpp/src/lance/io/reader.h @@ -41,6 +41,10 @@ class FileReader { std::shared_ptr<::lance::format::Manifest> manifest = nullptr, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + /// Open Manifest file + static ::arrow::Result> OpenManifest( + const std::shared_ptr<::arrow::io::RandomAccessFile>& in); + explicit FileReader(std::shared_ptr<::arrow::io::RandomAccessFile> in, std::shared_ptr<::lance::format::Manifest> manifest = nullptr, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) noexcept; From c3e2f010ce9d4747a811d7644209b68566a556d8 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 15 Nov 2022 12:33:47 -0800 Subject: [PATCH 04/10] pass all tests --- cpp/src/lance/arrow/dataset.cc | 47 ++++++++++++++++++++++++++++++++++ cpp/src/lance/format/schema.cc | 5 ++++ cpp/src/lance/io/reader.cc | 4 +++ 3 files changed, 56 insertions(+) diff --git a/cpp/src/lance/arrow/dataset.cc b/cpp/src/lance/arrow/dataset.cc index 61b18a88e3..6099f1b59f 100644 --- a/cpp/src/lance/arrow/dataset.cc +++ b/cpp/src/lance/arrow/dataset.cc @@ -14,8 +14,10 @@ #include "lance/arrow/dataset.h" +#include #include #include +#include #include #include #include @@ -86,6 +88,49 @@ ::arrow::Result> OpenManifest( return lance::io::FileReader::OpenManifest(in); } +::arrow::Status CollectDictionary(const std::shared_ptr& field, + const std::shared_ptr<::arrow::Array>& arr) { + assert(field && arr); + assert(field->type()->Equals(arr->type())); + auto data_type = field->type(); + if (::arrow::is_dictionary(data_type->id())) { + auto dict_arr = std::dynamic_pointer_cast<::arrow::DictionaryArray>(arr); + return field->set_dictionary(dict_arr->dictionary()); + } + + if (::arrow::is_list_like(data_type->id())) { + auto list_arr = std::dynamic_pointer_cast<::arrow::ListArray>(arr); + ARROW_RETURN_NOT_OK(CollectDictionary(field->field(0), list_arr->values())); + } else if (is_struct(data_type)) { + auto struct_arr = std::dynamic_pointer_cast<::arrow::StructArray>(arr); + for (auto& child : field->fields()) { + auto child_arr = struct_arr->GetFieldByName(child->name()); + ARROW_RETURN_NOT_OK(CollectDictionary(child, child_arr)); + } + } + return ::arrow::Status::OK(); +} + +::arrow::Status CollectDictionary(const std::shared_ptr& schema, + const std::shared_ptr<::arrow::dataset::Scanner>& scanner) { + ARROW_ASSIGN_OR_RAISE(auto example, scanner->Head(1)); + if (example->num_rows() == 0) { + return ::arrow::Status::Invalid("CollectDictionary: empty dataset"); + } + for (auto& field : schema->fields()) { + auto chunked_arr = example->GetColumnByName(field->name()); + if (chunked_arr == nullptr) { + return ::arrow::Status::Invalid("CollectDictionary: schema mismatch: field ", + field->name(), + "does not exist in the table: ", + example->schema()); + } + assert(chunked_arr->num_chunks() > 0); + ARROW_RETURN_NOT_OK(CollectDictionary(field, chunked_arr->chunk(0))); + } + return ::arrow::Status::OK(); +} + } // namespace DatasetVersion::DatasetVersion(uint64_t version) : version_(version) {} @@ -165,6 +210,8 @@ ::arrow::Status LanceDataset::Write(const ::arrow::dataset::FileSystemDatasetWri auto schema = std::make_shared(scanner->options()->dataset_schema); manifest = std::make_shared(schema); } + ARROW_RETURN_NOT_OK(CollectDictionary(manifest->schema(), scanner)); + fmt::print("Schema: {}\n", manifest->schema()); // Write manifest file auto lance_option = options; diff --git a/cpp/src/lance/format/schema.cc b/cpp/src/lance/format/schema.cc index 6be8902548..40991b4c41 100644 --- a/cpp/src/lance/format/schema.cc +++ b/cpp/src/lance/format/schema.cc @@ -165,6 +165,9 @@ std::string Field::ToString() const { if (is_extension_type()) { str = fmt::format("{}, extension_name={}", str, extension_name_); } + if (dictionary_) { + str = fmt::format("{}, dict={}", str, dictionary_->ToString()); + } return str; } @@ -301,6 +304,8 @@ std::vector Field::ToProto() const { pb_fields.emplace_back(field); + fmt::print("Field::ToProto: name={} {} {}\n", name_, dictionary_offset_, dictionary_page_length_); + for (auto& child : children_) { auto protos = child->ToProto(); pb_fields.insert(pb_fields.end(), protos.begin(), protos.end()); diff --git a/cpp/src/lance/io/reader.cc b/cpp/src/lance/io/reader.cc index cbe87fa1be..f8908aabff 100644 --- a/cpp/src/lance/io/reader.cc +++ b/cpp/src/lance/io/reader.cc @@ -97,6 +97,10 @@ ::arrow::Result> FileReader::OpenMani ARROW_ASSIGN_OR_RAISE(auto manifest_pos, ReadFooter(buf)); ARROW_ASSIGN_OR_RAISE(auto manifest, lance::format::Manifest::Parse(SliceBuffer(buf, manifest_pos))); + + /// TODO: optimize ReadDictionaryVisitor to read from buffer. + auto visitor = format::ReadDictionaryVisitor(in); + ARROW_RETURN_NOT_OK(visitor.VisitSchema(*manifest->schema())); return manifest; } From 49afb507415ab6f4a593f4e0916a469f11ab1192 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 15 Nov 2022 12:36:49 -0800 Subject: [PATCH 05/10] clean up --- cpp/src/lance/encodings/binary.cc | 3 +-- cpp/src/lance/encodings/binary.h | 4 ++-- cpp/src/lance/encodings/dictionary.cc | 7 ++++--- cpp/src/lance/encodings/dictionary.h | 4 ++-- cpp/src/lance/encodings/encoder.cc | 1 - 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/cpp/src/lance/encodings/binary.cc b/cpp/src/lance/encodings/binary.cc index 0cad41f5b1..06f332a795 100644 --- a/cpp/src/lance/encodings/binary.cc +++ b/cpp/src/lance/encodings/binary.cc @@ -20,7 +20,6 @@ #include #include -#include #include using arrow::Result; @@ -31,7 +30,7 @@ using std::vector; namespace lance::encodings { VarBinaryEncoder::VarBinaryEncoder(std::shared_ptr<::arrow::io::OutputStream> out) noexcept - : Encoder(std::move(out)) {} + : Encoder(out) {} Result VarBinaryEncoder::Write(const std::shared_ptr<::arrow::Array>& data) { ARROW_ASSIGN_OR_RAISE(auto values_position, out_->Tell()); diff --git a/cpp/src/lance/encodings/binary.h b/cpp/src/lance/encodings/binary.h index 10437443be..c5734977e3 100644 --- a/cpp/src/lance/encodings/binary.h +++ b/cpp/src/lance/encodings/binary.h @@ -49,7 +49,7 @@ class VarBinaryEncoder : public Encoder { explicit VarBinaryEncoder(std::shared_ptr<::arrow::io::OutputStream> out) noexcept; - ~VarBinaryEncoder() override = default; + virtual ~VarBinaryEncoder() = default; /// Write an Array, and returns the offsets to the index block. ::arrow::Result Write(const std::shared_ptr<::arrow::Array>& arr) override; @@ -68,7 +68,7 @@ class VarBinaryDecoder : public Decoder { public: using Decoder::Decoder; - ~VarBinaryDecoder() override = default; + virtual ~VarBinaryDecoder() = default; /** Get a Value without scanning the full row group. */ ::arrow::Result> GetScalar(int64_t idx) const override; diff --git a/cpp/src/lance/encodings/dictionary.cc b/cpp/src/lance/encodings/dictionary.cc index 4bdab8b280..c8ec72c64c 100644 --- a/cpp/src/lance/encodings/dictionary.cc +++ b/cpp/src/lance/encodings/dictionary.cc @@ -37,8 +37,7 @@ ::arrow::Result DictionaryEncoder::Write(const std::shared_ptr<::arrow: return plain_encoder_->Write(dict_arr->indices()); } -::arrow::Result DictionaryEncoder::WriteValueArray( - const std::shared_ptr<::arrow::Array>& arr) { +::arrow::Result DictionaryEncoder::WriteValueArray(std::shared_ptr<::arrow::Array> arr) { if (::arrow::is_primitive(arr->type_id())) { return PlainEncoder(out_).Write(arr); } else if (arr->type_id() == ::arrow::Type::STRING) { @@ -59,7 +58,9 @@ DictionaryDecoder::DictionaryDecoder(std::shared_ptr<::arrow::io::RandomAccessFi assert(dict); } -::arrow::Status DictionaryDecoder::Init() { return plain_decoder_->Init(); } +::arrow::Status DictionaryDecoder::Init() { + return plain_decoder_->Init(); +} void DictionaryDecoder::Reset(int64_t position, int32_t length) { Decoder::Reset(position, length); diff --git a/cpp/src/lance/encodings/dictionary.h b/cpp/src/lance/encodings/dictionary.h index 08cd6884ab..e34fcd8b97 100644 --- a/cpp/src/lance/encodings/dictionary.h +++ b/cpp/src/lance/encodings/dictionary.h @@ -30,14 +30,14 @@ class DictionaryEncoder : public Encoder { public: DictionaryEncoder(std::shared_ptr<::arrow::io::OutputStream> out); - ~DictionaryEncoder() override = default; + virtual ~DictionaryEncoder() = default; ::arrow::Result Write(const std::shared_ptr<::arrow::Array>& arr) override; /// Write value array. /// /// It should be only called once per dataset / file. - ::arrow::Result WriteValueArray(const std::shared_ptr<::arrow::Array>& arr); + ::arrow::Result WriteValueArray(std::shared_ptr<::arrow::Array> arr); std::string ToString() const override; diff --git a/cpp/src/lance/encodings/encoder.cc b/cpp/src/lance/encodings/encoder.cc index 699fc8549f..134cb714d0 100644 --- a/cpp/src/lance/encodings/encoder.cc +++ b/cpp/src/lance/encodings/encoder.cc @@ -76,7 +76,6 @@ Decoder::Decoder(std::shared_ptr<::arrow::io::RandomAccessFile> infile, ::arrow::Status Decoder::Init() { return ::arrow::Status::OK(); } void Decoder::Reset(int64_t position, int32_t length) { - fmt::print("Reset to position={}, length={}\n", position, length); position_ = position; length_ = length; } From a35626668201e790f41e203a5f827e960074cd4c Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 15 Nov 2022 13:31:04 -0800 Subject: [PATCH 06/10] clean up --- cpp/src/lance/arrow/dataset.cc | 10 +++++++--- cpp/src/lance/arrow/dataset_test.cc | 1 + cpp/src/lance/format/schema.cc | 6 ------ cpp/src/lance/io/exec/filter.h | 2 +- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/cpp/src/lance/arrow/dataset.cc b/cpp/src/lance/arrow/dataset.cc index 6099f1b59f..a3bb8e1010 100644 --- a/cpp/src/lance/arrow/dataset.cc +++ b/cpp/src/lance/arrow/dataset.cc @@ -98,13 +98,19 @@ ::arrow::Status CollectDictionary(const std::shared_ptr& f return field->set_dictionary(dict_arr->dictionary()); } - if (::arrow::is_list_like(data_type->id())) { + if (is_list(data_type)) { auto list_arr = std::dynamic_pointer_cast<::arrow::ListArray>(arr); ARROW_RETURN_NOT_OK(CollectDictionary(field->field(0), list_arr->values())); } else if (is_struct(data_type)) { auto struct_arr = std::dynamic_pointer_cast<::arrow::StructArray>(arr); for (auto& child : field->fields()) { auto child_arr = struct_arr->GetFieldByName(child->name()); + if (child_arr == nullptr) { + return ::arrow::Status::Invalid("CollectDictionary: schema mismatch: field ", + child->name(), + "does not exist in the table: ", + struct_arr->type()); + } ARROW_RETURN_NOT_OK(CollectDictionary(child, child_arr)); } } @@ -211,7 +217,6 @@ ::arrow::Status LanceDataset::Write(const ::arrow::dataset::FileSystemDatasetWri manifest = std::make_shared(schema); } ARROW_RETURN_NOT_OK(CollectDictionary(manifest->schema(), scanner)); - fmt::print("Schema: {}\n", manifest->schema()); // Write manifest file auto lance_option = options; @@ -250,7 +255,6 @@ ::arrow::Status LanceDataset::Write(const ::arrow::dataset::FileSystemDatasetWri // It only supports single writer at the moment. auto version_dir = (fs::path(base_dir) / kVersionsDir).string(); ARROW_RETURN_NOT_OK(fs->CreateDir(version_dir)); - fmt::print("Manifest schema ptr: {}\n", fmt::ptr(manifest->schema().get())); auto manifest_path = GetManifestPath(base_dir, manifest->version()); { ARROW_ASSIGN_OR_RAISE(auto out, fs->OpenOutputStream(manifest_path)); diff --git a/cpp/src/lance/arrow/dataset_test.cc b/cpp/src/lance/arrow/dataset_test.cc index a2a5c845de..1484b712c8 100644 --- a/cpp/src/lance/arrow/dataset_test.cc +++ b/cpp/src/lance/arrow/dataset_test.cc @@ -233,4 +233,5 @@ TEST_CASE("Dataset write dictionary array") { fmt::print("Base URI: {}\n", base_uri); auto actual = ReadTable(base_uri, 1); + CHECK(actual->Equals(*table)); } \ No newline at end of file diff --git a/cpp/src/lance/format/schema.cc b/cpp/src/lance/format/schema.cc index 40991b4c41..9c65fa14c5 100644 --- a/cpp/src/lance/format/schema.cc +++ b/cpp/src/lance/format/schema.cc @@ -198,10 +198,6 @@ ::arrow::Status Field::LoadDictionary(std::shared_ptr<::arrow::io::RandomAccessF auto decoder = lance::encodings::VarBinaryDecoder<::arrow::StringType>(std::move(infile), ::arrow::utf8()); - fmt::print("Load dictionary: name={} offset={} length={}\n", - name_, - dictionary_offset_, - dictionary_page_length_); decoder.Reset(dictionary_offset_, dictionary_page_length_); ARROW_ASSIGN_OR_RAISE(auto dict_arr, decoder.ToArray()); @@ -304,8 +300,6 @@ std::vector Field::ToProto() const { pb_fields.emplace_back(field); - fmt::print("Field::ToProto: name={} {} {}\n", name_, dictionary_offset_, dictionary_page_length_); - for (auto& child : children_) { auto protos = child->ToProto(); pb_fields.insert(pb_fields.end(), protos.begin(), protos.end()); diff --git a/cpp/src/lance/io/exec/filter.h b/cpp/src/lance/io/exec/filter.h index 824656e8f1..16053a7232 100644 --- a/cpp/src/lance/io/exec/filter.h +++ b/cpp/src/lance/io/exec/filter.h @@ -41,7 +41,7 @@ class Filter : public ExecNode { ::arrow::Result Next() override; constexpr Type type() const override { return Type::kFilter; } - + std::string ToString() const override; private: From 5f0aed1ec6c0753d4ad371756bf339a4a592e67c Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 15 Nov 2022 13:44:45 -0800 Subject: [PATCH 07/10] extract dictionary pb as an object --- cpp/src/lance/format/schema.cc | 17 ++++++++++++----- protos/format.proto | 21 ++++++++++++++++----- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/cpp/src/lance/format/schema.cc b/cpp/src/lance/format/schema.cc index 9c65fa14c5..d0692e131b 100644 --- a/cpp/src/lance/format/schema.cc +++ b/cpp/src/lance/format/schema.cc @@ -87,9 +87,12 @@ Field::Field(const pb::Field& pb) name_(pb.name()), logical_type_(pb.logical_type()), extension_name_(pb.extension_name()), - encoding_(lance::encodings::FromProto(pb.encoding())), - dictionary_offset_(pb.dictionary_offset()), - dictionary_page_length_(pb.dictionary_page_length()) {} + encoding_(lance::encodings::FromProto(pb.encoding())) { + if (pb.has_dictionary()) { + dictionary_offset_ = pb.dictionary().offset(); + dictionary_page_length_ = pb.dictionary().length(); + } +} void Field::AddChild(std::shared_ptr child) { children_.emplace_back(child); } @@ -294,8 +297,12 @@ std::vector Field::ToProto() const { field.set_logical_type(logical_type_); field.set_extension_name(extension_name_); field.set_encoding(::lance::encodings::ToProto(encoding_)); - field.set_dictionary_offset(dictionary_offset_); - field.set_dictionary_page_length(dictionary_page_length_); + + if (dictionary_offset_ >= 0) { + field.mutable_dictionary()->set_offset(dictionary_offset_); + field.mutable_dictionary()->set_length(dictionary_page_length_); + } + field.set_type(GetNodeType()); pb_fields.emplace_back(field); diff --git a/protos/format.proto b/protos/format.proto index 30c5635ccb..b351dc7a07 100644 --- a/protos/format.proto +++ b/protos/format.proto @@ -52,7 +52,7 @@ message Manifest { // Snapshot version uint64 version = 4; - + // Fragments of the dataset. repeated DataFragment fragments = 6; } @@ -64,7 +64,7 @@ message Manifest { message DataFragment { // Unique ID of each DataFragment uint64 id = 1; - + repeated DataFile files = 2; } @@ -106,6 +106,18 @@ enum Encoding { DICTIONARY = 3; } +/// Dictionary field metadata +message Dictionary { + /// The file offset for storing the dictionary value. + /// It is only valid if encoding is DICTIONARY. + /// + /// The logic type presents the value type of the column, i.e., string value. + int64 offset = 1; + + /// The length of dictionary values. + int64 length = 2; +} + /** * Field metadata for a column. */ @@ -135,9 +147,8 @@ message Field { /// It is only valid if encoding is DICTIONARY. /// /// The logic type presents the value type of the column, i.e., string value. - int64 dictionary_offset = 8; - int64 dictionary_page_length = 9; + Dictionary dictionary = 8; // optional extension type name - string extension_name = 10; + string extension_name = 9; } From fc5bbbbd8c22e154d78ae8cd326115e7d34e6c5b Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 15 Nov 2022 13:59:24 -0800 Subject: [PATCH 08/10] fix clang tidy --- cpp/src/lance/testing/io.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/lance/testing/io.cc b/cpp/src/lance/testing/io.cc index 8c8cae9d90..b74a972762 100644 --- a/cpp/src/lance/testing/io.cc +++ b/cpp/src/lance/testing/io.cc @@ -14,12 +14,10 @@ #include "lance/testing/io.h" -#include #include #include #include #include -#include #include #include @@ -75,6 +73,7 @@ ::arrow::Result> MakeDataset( auto tmp_dir = "file://" + MakeTemporaryDir().ValueOrDie(); std::string path; std::vector> partition_fields; + partition_fields.reserve(partitions.size()); for (auto& part_col : partitions) { partition_fields.emplace_back(table->schema()->GetFieldByName(part_col)); } From ba7205e040519832bb222b436fac5aefb9a09224 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 15 Nov 2022 15:09:34 -0800 Subject: [PATCH 09/10] remove prints --- cpp/src/lance/io/reader.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/lance/io/reader.cc b/cpp/src/lance/io/reader.cc index f8908aabff..25ea5ac03b 100644 --- a/cpp/src/lance/io/reader.cc +++ b/cpp/src/lance/io/reader.cc @@ -142,7 +142,6 @@ Status FileReader::Open() { auto num_batches = metadata_->num_batches(); auto num_columns = manifest_->schema()->GetFieldsCount(); - fmt::print("num_columns: {}\n", num_columns); ARROW_ASSIGN_OR_RAISE( page_table_, format::PageTable::Make(file_, metadata_->page_table_position(), num_columns, num_batches)); From 622d5a4a6c7554af6dbf9057353691b7683d6bce Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 15 Nov 2022 15:21:48 -0800 Subject: [PATCH 10/10] allow set dictioanry --- cpp/src/lance/format/schema.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cpp/src/lance/format/schema.cc b/cpp/src/lance/format/schema.cc index d0692e131b..8d60698de8 100644 --- a/cpp/src/lance/format/schema.cc +++ b/cpp/src/lance/format/schema.cc @@ -186,11 +186,8 @@ std::string Field::name() const { const std::shared_ptr<::arrow::Array>& Field::dictionary() const { return dictionary_; } ::arrow::Status Field::set_dictionary(std::shared_ptr<::arrow::Array> dict_arr) { - if (!dictionary_) { - dictionary_ = dict_arr; - return ::arrow::Status::OK(); - } - return ::arrow::Status::Invalid("Field::dictionary has already been set"); + dictionary_ = std::move(dict_arr); + return ::arrow::Status::OK(); } ::arrow::Status Field::LoadDictionary(std::shared_ptr<::arrow::io::RandomAccessFile> infile) {