diff --git a/cpp/src/lance/arrow/dataset.cc b/cpp/src/lance/arrow/dataset.cc index d436566a56..a3bb8e1010 100644 --- a/cpp/src/lance/arrow/dataset.cc +++ b/cpp/src/lance/arrow/dataset.cc @@ -14,8 +14,10 @@ #include "lance/arrow/dataset.h" +#include #include #include +#include #include #include #include @@ -31,6 +33,7 @@ #include "lance/arrow/fragment.h" #include "lance/format/manifest.h" #include "lance/format/schema.h" +#include "lance/io/reader.h" #include "lance/io/writer.h" namespace fs = std::filesystem; @@ -82,7 +85,56 @@ std::string GetBasenameTemplate() { ::arrow::Result> OpenManifest( const std::shared_ptr<::arrow::fs::FileSystem>& fs, const std::string& path) { ARROW_ASSIGN_OR_RAISE(auto in, fs->OpenInputFile(path)); - return lance::format::Manifest::Parse(in, 0); + return lance::io::FileReader::OpenManifest(in); +} + +::arrow::Status CollectDictionary(const std::shared_ptr& field, + const std::shared_ptr<::arrow::Array>& arr) { + assert(field && arr); + assert(field->type()->Equals(arr->type())); + auto data_type = field->type(); + if (::arrow::is_dictionary(data_type->id())) { + auto dict_arr = std::dynamic_pointer_cast<::arrow::DictionaryArray>(arr); + return field->set_dictionary(dict_arr->dictionary()); + } + + if (is_list(data_type)) { + auto list_arr = std::dynamic_pointer_cast<::arrow::ListArray>(arr); + ARROW_RETURN_NOT_OK(CollectDictionary(field->field(0), list_arr->values())); + } else if (is_struct(data_type)) { + auto struct_arr = std::dynamic_pointer_cast<::arrow::StructArray>(arr); + for (auto& child : field->fields()) { + auto child_arr = struct_arr->GetFieldByName(child->name()); + if (child_arr == nullptr) { + return ::arrow::Status::Invalid("CollectDictionary: schema mismatch: field ", + child->name(), + "does not exist in the table: ", + struct_arr->type()); + } + ARROW_RETURN_NOT_OK(CollectDictionary(child, child_arr)); + } + } + return ::arrow::Status::OK(); +} + +::arrow::Status CollectDictionary(const std::shared_ptr& schema, + const std::shared_ptr<::arrow::dataset::Scanner>& scanner) { + ARROW_ASSIGN_OR_RAISE(auto example, scanner->Head(1)); + if (example->num_rows() == 0) { + return ::arrow::Status::Invalid("CollectDictionary: empty dataset"); + } + for (auto& field : schema->fields()) { + auto chunked_arr = example->GetColumnByName(field->name()); + if (chunked_arr == nullptr) { + return ::arrow::Status::Invalid("CollectDictionary: schema mismatch: field ", + field->name(), + "does not exist in the table: ", + example->schema()); + } + assert(chunked_arr->num_chunks() > 0); + ARROW_RETURN_NOT_OK(CollectDictionary(field, chunked_arr->chunk(0))); + } + return ::arrow::Status::OK(); } } // namespace @@ -164,6 +216,7 @@ ::arrow::Status LanceDataset::Write(const ::arrow::dataset::FileSystemDatasetWri auto schema = std::make_shared(scanner->options()->dataset_schema); manifest = std::make_shared(schema); } + ARROW_RETURN_NOT_OK(CollectDictionary(manifest->schema(), scanner)); // Write manifest file auto lance_option = options; @@ -205,7 +258,7 @@ ::arrow::Status LanceDataset::Write(const ::arrow::dataset::FileSystemDatasetWri auto manifest_path = GetManifestPath(base_dir, manifest->version()); { ARROW_ASSIGN_OR_RAISE(auto out, fs->OpenOutputStream(manifest_path)); - ARROW_RETURN_NOT_OK(manifest->Write(out)); + ARROW_RETURN_NOT_OK(lance::io::FileWriter::WriteManifest(out, *manifest)); } auto latest_manifest_path = GetManifestPath(base_dir, std::nullopt); return fs->CopyFile(manifest_path, latest_manifest_path); @@ -263,7 +316,7 @@ ::arrow::Result<::arrow::dataset::FragmentIterator> LanceDataset::GetFragmentsIm std::vector> fragments = impl_->manifest->fragments() | views::transform([this](auto& data_fragment) { return std::make_shared( - impl_->fs, impl_->data_dir(), data_fragment, impl_->manifest->schema()); + impl_->fs, impl_->data_dir(), data_fragment, impl_->manifest); }) | ranges::to; diff --git a/cpp/src/lance/arrow/dataset_test.cc b/cpp/src/lance/arrow/dataset_test.cc index 13bb173166..1484b712c8 100644 --- a/cpp/src/lance/arrow/dataset_test.cc +++ b/cpp/src/lance/arrow/dataset_test.cc @@ -19,8 +19,8 @@ #include #include -#include #include +#include #include "lance/arrow/file_lance.h" #include "lance/arrow/stl.h" @@ -29,7 +29,6 @@ using lance::arrow::ToArray; using namespace ranges::views; - std::shared_ptr<::arrow::Table> ReadTable(const std::string& uri, std::optional version) { std::string path; auto fs = ::arrow::fs::FileSystemFromUriOrPath(uri, &path).ValueOrDie(); @@ -38,6 +37,24 @@ std::shared_ptr<::arrow::Table> ReadTable(const std::string& uri, std::optional< return actual_dataset->NewScan().ValueOrDie()->Finish().ValueOrDie()->ToTable().ValueOrDie(); } +// Write table as dataset. +std::string WriteTable(const std::shared_ptr<::arrow::Table>& table) { + auto base_uri = lance::testing::MakeTemporaryDir().ValueOrDie() + "/testdata"; + auto format = lance::arrow::LanceFileFormat::Make(); + ::arrow::dataset::FileSystemDatasetWriteOptions write_options; + std::string path; + auto fs = ::arrow::fs::FileSystemFromUriOrPath(base_uri, &path).ValueOrDie(); + write_options.filesystem = fs; + write_options.base_dir = path; + write_options.file_write_options = format->DefaultWriteOptions(); + + auto dataset = lance::testing::MakeDataset(table).ValueOrDie(); + CHECK(lance::arrow::LanceDataset::Write(write_options, + dataset->NewScan().ValueOrDie()->Finish().ValueOrDie()) + .ok()); + return base_uri; +} + TEST_CASE("Create new dataset") { auto ids = ToArray({1, 2, 3, 4, 5, 6, 8}).ValueOrDie(); auto values = ToArray({"a", "b", "c", "d", "e", "f", "g"}).ValueOrDie(); @@ -199,4 +216,22 @@ TEST_CASE("Dataset overwrite error cases") { lance::arrow::LanceDataset::kOverwrite); INFO("Status: " << status.message() << " is ok: " << status.ok()); CHECK(status.IsIOError()); +} + +TEST_CASE("Dataset write dictionary array") { + auto dict_values = ToArray({"a", "b", "c"}).ValueOrDie(); + auto dict_indices = ToArray({0, 1, 1, 2, 2, 0}).ValueOrDie(); + auto data_type = ::arrow::dictionary(::arrow::int32(), ::arrow::utf8()); + auto dict_arr = + ::arrow::DictionaryArray::FromArrays( + data_type, dict_indices, dict_values) + .ValueOrDie(); + auto table = + ::arrow::Table::Make(::arrow::schema({::arrow::field("dict", data_type)}), {dict_arr}); + + auto base_uri = WriteTable(table); + fmt::print("Base URI: {}\n", base_uri); + + auto actual = ReadTable(base_uri, 1); + CHECK(actual->Equals(*table)); } \ No newline at end of file diff --git a/cpp/src/lance/arrow/file_lance.cc b/cpp/src/lance/arrow/file_lance.cc index 2db0e9d5f7..78347be442 100644 --- a/cpp/src/lance/arrow/file_lance.cc +++ b/cpp/src/lance/arrow/file_lance.cc @@ -103,7 +103,7 @@ ::arrow::Future> LanceFileFormat::CountRows( ::arrow::Result<::arrow::RecordBatchGenerator> LanceFileFormat::ScanBatchesAsync( const std::shared_ptr<::arrow::dataset::ScanOptions>& options, const std::shared_ptr<::arrow::dataset::FileFragment>& file) const { - ARROW_ASSIGN_OR_RAISE(auto fragment, LanceFragment::Make(*file, impl_->manifest->schema())); + ARROW_ASSIGN_OR_RAISE(auto fragment, LanceFragment::Make(*file, impl_->manifest)); ARROW_ASSIGN_OR_RAISE(auto batch_reader, lance::io::RecordBatchReader::Make(*fragment, options)); return ::arrow::RecordBatchGenerator(std::move(batch_reader)); diff --git a/cpp/src/lance/arrow/fragment.cc b/cpp/src/lance/arrow/fragment.cc index 3b5c55a497..9069e11d5b 100644 --- a/cpp/src/lance/arrow/fragment.cc +++ b/cpp/src/lance/arrow/fragment.cc @@ -23,6 +23,7 @@ #include "lance/arrow/file_lance.h" #include "lance/arrow/utils.h" #include "lance/format/data_fragment.h" +#include "lance/format/manifest.h" #include "lance/format/schema.h" #include "lance/io/reader.h" #include "lance/io/record_batch_reader.h" @@ -33,22 +34,23 @@ namespace fs = std::filesystem; namespace lance::arrow { ::arrow::Result> LanceFragment::Make( - const ::arrow::dataset::FileFragment& file_fragment, std::shared_ptr schema) { - auto field_ids = schema->GetFieldIds(); + const ::arrow::dataset::FileFragment& file_fragment, + std::shared_ptr manifest) { + auto field_ids = manifest->schema()->GetFieldIds(); auto data_fragment = std::make_shared( format::DataFile(file_fragment.source().path(), field_ids)); return std::make_shared( - file_fragment.source().filesystem(), "", data_fragment, schema); + file_fragment.source().filesystem(), "", std::move(data_fragment), std::move(manifest)); } LanceFragment::LanceFragment(std::shared_ptr<::arrow::fs::FileSystem> fs, std::string data_dir, std::shared_ptr fragment, - std::shared_ptr schema) + std::shared_ptr manifest) : fs_(std::move(fs)), data_uri_(std::move(data_dir)), fragment_(std::move(fragment)), - schema_(std::move(schema)) {} + manifest_(std::move(manifest)) {} ::arrow::Result<::arrow::RecordBatchGenerator> LanceFragment::ScanBatchesAsync( const std::shared_ptr<::arrow::dataset::ScanOptions>& options) { @@ -57,7 +59,7 @@ ::arrow::Result<::arrow::RecordBatchGenerator> LanceFragment::ScanBatchesAsync( } ::arrow::Result> LanceFragment::ReadPhysicalSchemaImpl() { - return schema_->ToArrow(); + return schema()->ToArrow(); } ::arrow::Result> LanceFragment::Open( @@ -71,14 +73,15 @@ ::arrow::Result> LanceFragment: executor->Submit( [this, &schema](auto idx) -> ::arrow::Result { auto& data_file = this->fragment_->data_files()[idx]; - ARROW_ASSIGN_OR_RAISE(auto data_file_schema, schema_->Project(data_file.fields())); + ARROW_ASSIGN_OR_RAISE(auto data_file_schema, + this->schema()->Project(data_file.fields())); ARROW_ASSIGN_OR_RAISE(auto intersection, schema.Intersection(*data_file_schema)); if (intersection->fields().empty()) { return std::make_tuple(nullptr, nullptr); } auto full_path = (fs::path(data_uri_) / data_file.path()).string(); ARROW_ASSIGN_OR_RAISE(auto infile, fs_->OpenInputFile(full_path)) - ARROW_ASSIGN_OR_RAISE(auto reader, lance::io::FileReader::Make(infile)); + ARROW_ASSIGN_OR_RAISE(auto reader, lance::io::FileReader::Make(infile, this->manifest_)); return std::make_tuple(std::move(reader), intersection); }, i)); @@ -96,6 +99,8 @@ ::arrow::Result> LanceFragment: return readers; } +const std::shared_ptr& LanceFragment::schema() const { return manifest_->schema(); } + ::arrow::Result LanceFragment::FastCountRow() const { assert(!fragment_->data_files().empty()); ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(0)); diff --git a/cpp/src/lance/arrow/fragment.h b/cpp/src/lance/arrow/fragment.h index 233c93e4fe..7dee2b391c 100644 --- a/cpp/src/lance/arrow/fragment.h +++ b/cpp/src/lance/arrow/fragment.h @@ -45,21 +45,22 @@ class LanceFragment : public ::arrow::dataset::Fragment { /// /// It creates a LanceFragment from `arrow.dataset.FileFragment`. /// \param file_fragment plain dataset file fragment - /// \param schema the schema of dataset. + /// \param manifest dataset manifest. /// \return LanceFragment static ::arrow::Result> Make( - const ::arrow::dataset::FileFragment& file_fragment, std::shared_ptr schema); + const ::arrow::dataset::FileFragment& file_fragment, + std::shared_ptr manifest); /// Constructor /// /// \param fs a file system instance to conduct IOs. /// \param data_dir the base directory to store data. /// \param fragment data fragment, the metadata of the fragment. - /// \param schema the schema of the Fragment. + /// \param manifest dataset manifest. LanceFragment(std::shared_ptr<::arrow::fs::FileSystem> fs, std::string data_dir, std::shared_ptr fragment, - std::shared_ptr schema); + std::shared_ptr manifest); /// Destructor. ~LanceFragment() override = default; @@ -79,7 +80,7 @@ class LanceFragment : public ::arrow::dataset::Fragment { ::arrow::internal::Executor* executor = ::arrow::internal::GetCpuThreadPool()) const; /// Dataset schema. - const std::shared_ptr& schema() const { return schema_; } + const std::shared_ptr& schema() const; protected: ::arrow::Result> ReadPhysicalSchemaImpl() override; @@ -98,7 +99,7 @@ class LanceFragment : public ::arrow::dataset::Fragment { std::shared_ptr<::arrow::fs::FileSystem> fs_; std::string data_uri_; std::shared_ptr fragment_; - std::shared_ptr schema_; + std::shared_ptr manifest_; }; } // namespace lance::arrow diff --git a/cpp/src/lance/format/manifest.cc b/cpp/src/lance/format/manifest.cc index f8bbf2c542..26c886313b 100644 --- a/cpp/src/lance/format/manifest.cc +++ b/cpp/src/lance/format/manifest.cc @@ -49,6 +49,12 @@ ::arrow::Result> Manifest::Parse( return std::shared_ptr(new Manifest(pb)); } +::arrow::Result> Manifest::Parse( + const std::shared_ptr<::arrow::Buffer>& buffer) { + ARROW_ASSIGN_OR_RAISE(auto pb, io::ParseProto(buffer)); + return std::shared_ptr(new Manifest(pb)); +} + ::arrow::Result Manifest::Write(std::shared_ptr<::arrow::io::OutputStream> out) const { lance::format::pb::Manifest pb; for (auto field : schema_->ToProto()) { diff --git a/cpp/src/lance/format/manifest.h b/cpp/src/lance/format/manifest.h index 0183c9459f..18a331cb65 100644 --- a/cpp/src/lance/format/manifest.h +++ b/cpp/src/lance/format/manifest.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -54,6 +55,10 @@ class Manifest final { static ::arrow::Result> Parse( std::shared_ptr<::arrow::io::RandomAccessFile> in, int64_t offset); + /// Parse a Manifest from a buffer. + static ::arrow::Result> Parse( + const std::shared_ptr<::arrow::Buffer>& buffer); + /// Write the Manifest to a file. /// /// \param out the output stream to write this Manifest to. @@ -87,7 +92,7 @@ class Manifest final { std::vector> fragments_; - Manifest(const lance::format::pb::Manifest& pb); + explicit Manifest(const lance::format::pb::Manifest& pb); }; } // namespace lance::format diff --git a/cpp/src/lance/format/schema.cc b/cpp/src/lance/format/schema.cc index f586170a4c..8d60698de8 100644 --- a/cpp/src/lance/format/schema.cc +++ b/cpp/src/lance/format/schema.cc @@ -64,12 +64,12 @@ void Field::Init(std::shared_ptr<::arrow::DataType> dtype) { if (::lance::arrow::is_struct(dtype)) { auto struct_type = std::static_pointer_cast<::arrow::StructType>(dtype); for (auto& arrow_field : struct_type->fields()) { - children_.push_back(std::shared_ptr(new Field(arrow_field))); + children_.push_back(std::make_shared(arrow_field)); } } else if (::lance::arrow::is_list(dtype)) { auto list_type = std::static_pointer_cast<::arrow::ListType>(dtype); children_.emplace_back( - std::shared_ptr(new Field(::arrow::field("item", list_type->value_type())))); + std::make_shared(::arrow::field("item", list_type->value_type()))); encoding_ = encodings::PLAIN; } else if (::arrow::is_binary_like(type_id) || ::arrow::is_large_binary_like(type_id)) { encoding_ = encodings::VAR_BINARY; @@ -87,9 +87,12 @@ Field::Field(const pb::Field& pb) name_(pb.name()), logical_type_(pb.logical_type()), extension_name_(pb.extension_name()), - encoding_(lance::encodings::FromProto(pb.encoding())), - dictionary_offset_(pb.dictionary_offset()), - dictionary_page_length_(pb.dictionary_page_length()) {} + encoding_(lance::encodings::FromProto(pb.encoding())) { + if (pb.has_dictionary()) { + dictionary_offset_ = pb.dictionary().offset(); + dictionary_page_length_ = pb.dictionary().length(); + } +} void Field::AddChild(std::shared_ptr child) { children_.emplace_back(child); } @@ -165,6 +168,9 @@ std::string Field::ToString() const { if (is_extension_type()) { str = fmt::format("{}, extension_name={}", str, extension_name_); } + if (dictionary_) { + str = fmt::format("{}, dict={}", str, dictionary_->ToString()); + } return str; } @@ -180,11 +186,8 @@ std::string Field::name() const { const std::shared_ptr<::arrow::Array>& Field::dictionary() const { return dictionary_; } ::arrow::Status Field::set_dictionary(std::shared_ptr<::arrow::Array> dict_arr) { - if (!dictionary_) { - dictionary_ = dict_arr; - return ::arrow::Status::OK(); - } - return ::arrow::Status::Invalid("Field::dictionary has already been set"); + dictionary_ = std::move(dict_arr); + return ::arrow::Status::OK(); } ::arrow::Status Field::LoadDictionary(std::shared_ptr<::arrow::io::RandomAccessFile> infile) { @@ -291,8 +294,12 @@ std::vector Field::ToProto() const { field.set_logical_type(logical_type_); field.set_extension_name(extension_name_); field.set_encoding(::lance::encodings::ToProto(encoding_)); - field.set_dictionary_offset(dictionary_offset_); - field.set_dictionary_page_length(dictionary_page_length_); + + if (dictionary_offset_ >= 0) { + field.mutable_dictionary()->set_offset(dictionary_offset_); + field.mutable_dictionary()->set_length(dictionary_page_length_); + } + field.set_type(GetNodeType()); pb_fields.emplace_back(field); diff --git a/cpp/src/lance/format/schema.h b/cpp/src/lance/format/schema.h index 4c13e48a0e..754e9b0a10 100644 --- a/cpp/src/lance/format/schema.h +++ b/cpp/src/lance/format/schema.h @@ -199,6 +199,10 @@ class Field final { const std::shared_ptr<::arrow::Array>& dictionary() const; + /// Set the directory values for a dictionary field. + /// + /// \param dict_arr dictionary values + /// \return `status::OK()` if success. Fails if the dictionary is already set. ::arrow::Status set_dictionary(std::shared_ptr<::arrow::Array> dict_arr); lance::encodings::Encoding encoding() const { return encoding_; }; diff --git a/cpp/src/lance/format/visitors.cc b/cpp/src/lance/format/visitors.cc index f69363f8a9..baf91294ae 100644 --- a/cpp/src/lance/format/visitors.cc +++ b/cpp/src/lance/format/visitors.cc @@ -48,7 +48,7 @@ ::arrow::Result<::std::shared_ptr<::arrow::Field>> ToArrowVisitor::DoVisit( } WriteDictionaryVisitor::WriteDictionaryVisitor(std::shared_ptr<::arrow::io::OutputStream> out) - : out_(out) {} + : out_(std::move(out)) {} ::arrow::Status WriteDictionaryVisitor::Visit(std::shared_ptr root) { if (::arrow::is_dictionary(root->storage_type()->id())) { diff --git a/cpp/src/lance/io/reader.cc b/cpp/src/lance/io/reader.cc index cc6ad465f8..25ea5ac03b 100644 --- a/cpp/src/lance/io/reader.cc +++ b/cpp/src/lance/io/reader.cc @@ -49,6 +49,8 @@ typedef ::arrow::Result> ScalarResult; namespace lance::io { +namespace { + ::arrow::Result ReadFooter(const std::shared_ptr<::arrow::Buffer>& buf) { assert(buf->size() >= 16); if (auto magic_buf = ::arrow::SliceBuffer(buf, buf->size() - 4); @@ -56,19 +58,52 @@ ::arrow::Result ReadFooter(const std::shared_ptr<::arrow::Buffer>& buf) return Status::IOError( fmt::format("Invalidate file format: MAGIC NUM is not {}", lance::format::kMagic)); } - // Metadata Offset return ReadInt(buf->data() + buf->size() - 16); } +} // namespace + ::arrow::Result> FileReader::Make( std::shared_ptr<::arrow::io::RandomAccessFile> in, std::shared_ptr<::lance::format::Manifest> manifest, ::arrow::MemoryPool* pool) { - auto reader = std::make_unique(in, manifest, pool); + auto reader = std::make_unique(std::move(in), std::move(manifest), pool); ARROW_RETURN_NOT_OK(reader->Open()); return reader; } +::arrow::Result> FileReader::OpenManifest( + const std::shared_ptr<::arrow::io::RandomAccessFile>& in) { + constexpr auto kBufReadBytes = 8 * 1024 * 1024; // Read 8 MB; + ::arrow::BufferVector buffers; + int64_t pos = 0; + while (true) { + ARROW_ASSIGN_OR_RAISE(auto buf, in->ReadAt(pos, kBufReadBytes)); + auto read_nbytes = buf->size(); + if (read_nbytes > 0) { + buffers.emplace_back(std::move(buf)); + } + if (read_nbytes < kBufReadBytes) { + break; + } + pos += read_nbytes; + } + assert(!buffers.empty()); + auto buf = buffers[0]; + if (buffers.size() > 1) { + // Unlikely to get here + ARROW_ASSIGN_OR_RAISE(buf, ::arrow::ConcatenateBuffers(buffers)); + } + ARROW_ASSIGN_OR_RAISE(auto manifest_pos, ReadFooter(buf)); + ARROW_ASSIGN_OR_RAISE(auto manifest, + lance::format::Manifest::Parse(SliceBuffer(buf, manifest_pos))); + + /// TODO: optimize ReadDictionaryVisitor to read from buffer. + auto visitor = format::ReadDictionaryVisitor(in); + ARROW_RETURN_NOT_OK(visitor.VisitSchema(*manifest->schema())); + return manifest; +} + FileReader::FileReader(std::shared_ptr<::arrow::io::RandomAccessFile> in, std::shared_ptr<::lance::format::Manifest> manifest, ::arrow::MemoryPool* pool) noexcept @@ -95,6 +130,7 @@ Status FileReader::Open() { metadata_, format::Metadata::Make(::arrow::SliceBuffer(cached_last_page_, inbuf_offset))); if (!manifest_) { + /// Multiple files can share the manifest. ARROW_ASSIGN_OR_RAISE(manifest_, metadata_->GetManifest(file_)); // We need read the dictionary from the same file. auto visitor = format::ReadDictionaryVisitor(file_); diff --git a/cpp/src/lance/io/reader.h b/cpp/src/lance/io/reader.h index 67218bf34f..23b7a9b7b1 100644 --- a/cpp/src/lance/io/reader.h +++ b/cpp/src/lance/io/reader.h @@ -41,6 +41,10 @@ class FileReader { std::shared_ptr<::lance::format::Manifest> manifest = nullptr, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + /// Open Manifest file + static ::arrow::Result> OpenManifest( + const std::shared_ptr<::arrow::io::RandomAccessFile>& in); + explicit FileReader(std::shared_ptr<::arrow::io::RandomAccessFile> in, std::shared_ptr<::lance::format::Manifest> manifest = nullptr, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) noexcept; diff --git a/cpp/src/lance/io/writer.cc b/cpp/src/lance/io/writer.cc index d506bb62b4..2078e53502 100644 --- a/cpp/src/lance/io/writer.cc +++ b/cpp/src/lance/io/writer.cc @@ -34,7 +34,7 @@ namespace lance::io { static constexpr int16_t kMajorVersion = 0; static constexpr int16_t kMinorVersion = 1; -namespace internal { +namespace { /** * Write the 16 bytes footer to the end of a file. @@ -81,6 +81,16 @@ ::arrow::Status FileWriter::Write(const std::shared_ptr<::arrow::RecordBatch>& b return ::arrow::Status::OK(); } +::arrow::Status FileWriter::WriteManifest(const std::shared_ptr<::arrow::io::OutputStream>& destination, + const lance::format::Manifest& manifest) { + // Write dictionary values first. + auto visitor = format::WriteDictionaryVisitor(destination); + ARROW_RETURN_NOT_OK(visitor.VisitSchema(*manifest.schema())); + + ARROW_ASSIGN_OR_RAISE(auto offset, manifest.Write(destination)); + return ::lance::io::WriteFooter(destination, offset); +} + ::arrow::Status FileWriter::WriteArray(const std::shared_ptr& field, const std::shared_ptr<::arrow::Array>& arr) { if (::lance::arrow::is_extension(arr->type())) { @@ -198,7 +208,7 @@ ::arrow::Status FileWriter::WriteFooter() { metadata_->SetManifestPosition(pos); ARROW_ASSIGN_OR_RAISE(pos, metadata_->Write(destination_)); - return internal::WriteFooter(destination_, pos); + return ::lance::io::WriteFooter(destination_, pos); } ::arrow::Future<> FileWriter::FinishInternal() { diff --git a/cpp/src/lance/io/writer.h b/cpp/src/lance/io/writer.h index 46171352f6..51b19fc940 100644 --- a/cpp/src/lance/io/writer.h +++ b/cpp/src/lance/io/writer.h @@ -41,11 +41,15 @@ class FileWriter final : public ::arrow::dataset::FileWriter { std::shared_ptr<::arrow::io::OutputStream> destination, ::arrow::fs::FileLocator destination_locator = {}); - ~FileWriter(); + ~FileWriter() override; /// Write an arrow RecordBatch to the file. ::arrow::Status Write(const std::shared_ptr<::arrow::RecordBatch>& batch) override; + /// Write Manifest to a file. + static ::arrow::Status WriteManifest(const std::shared_ptr<::arrow::io::OutputStream>& destination, + const lance::format::Manifest& manifest); + private: ::arrow::Future<> FinishInternal() override; diff --git a/cpp/src/lance/testing/io.cc b/cpp/src/lance/testing/io.cc index a765c318d2..b74a972762 100644 --- a/cpp/src/lance/testing/io.cc +++ b/cpp/src/lance/testing/io.cc @@ -14,12 +14,10 @@ #include "lance/testing/io.h" -#include #include #include #include #include -#include #include #include @@ -31,6 +29,7 @@ #include "lance/arrow/fragment.h" #include "lance/arrow/utils.h" #include "lance/arrow/writer.h" +#include "lance/format/manifest.h" #include "lance/io/reader.h" #include "lance/io/writer.h" @@ -74,6 +73,7 @@ ::arrow::Result> MakeDataset( auto tmp_dir = "file://" + MakeTemporaryDir().ValueOrDie(); std::string path; std::vector> partition_fields; + partition_fields.reserve(partitions.size()); for (auto& part_col : partitions) { partition_fields.emplace_back(table->schema()->GetFieldByName(part_col)); } @@ -145,10 +145,11 @@ ::arrow::Result> MakeFragment( } auto schema = std::make_shared(table->schema()); + auto manifest = std::make_shared(schema); auto data_file = lance::format::DataFile(filename, schema->GetFieldIds()); auto fragment = std::make_shared(data_file); return std::make_shared( - local_fs, data_dir, std::move(fragment), std::move(schema)); + local_fs, data_dir, std::move(fragment), std::move(manifest)); }; } // namespace lance::testing diff --git a/protos/format.proto b/protos/format.proto index 30c5635ccb..b351dc7a07 100644 --- a/protos/format.proto +++ b/protos/format.proto @@ -52,7 +52,7 @@ message Manifest { // Snapshot version uint64 version = 4; - + // Fragments of the dataset. repeated DataFragment fragments = 6; } @@ -64,7 +64,7 @@ message Manifest { message DataFragment { // Unique ID of each DataFragment uint64 id = 1; - + repeated DataFile files = 2; } @@ -106,6 +106,18 @@ enum Encoding { DICTIONARY = 3; } +/// Dictionary field metadata +message Dictionary { + /// The file offset for storing the dictionary value. + /// It is only valid if encoding is DICTIONARY. + /// + /// The logic type presents the value type of the column, i.e., string value. + int64 offset = 1; + + /// The length of dictionary values. + int64 length = 2; +} + /** * Field metadata for a column. */ @@ -135,9 +147,8 @@ message Field { /// It is only valid if encoding is DICTIONARY. /// /// The logic type presents the value type of the column, i.e., string value. - int64 dictionary_offset = 8; - int64 dictionary_page_length = 9; + Dictionary dictionary = 8; // optional extension type name - string extension_name = 10; + string extension_name = 9; } diff --git a/python/lance/data/convert/base.py b/python/lance/data/convert/base.py index 8c9aa14f11..6619b657f1 100644 --- a/python/lance/data/convert/base.py +++ b/python/lance/data/convert/base.py @@ -30,9 +30,11 @@ class DatasetConverter(ABC): """Base class for converting raw => pandas => Arrow => Lance""" - def __init__(self, name, uri_root, images_root=None): + def __init__(self, name, uri_root, images_root: str=None): self.name = name self.uri_root = uri_root + if images_root is None: + images_root = os.path.join(uri_root, "images") self.images_root = images_root @abstractmethod