diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp index c4ac1eafd02..bae793807df 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp @@ -173,6 +173,17 @@ void ColumnFileTiny::serializeMetadata(dtpb::ColumnFilePersisted * cf_pb, bool s tiny_pb->set_id(data_page_id); tiny_pb->set_rows(rows); tiny_pb->set_bytes(bytes); + + if (!index_infos) + return; + + for (const auto & index_info : *index_infos) + { + auto * index_pb = tiny_pb->add_indexes(); + index_pb->set_index_page_id(index_info.index_page_id); + if (index_info.vector_index.has_value()) + index_pb->mutable_vector_index()->CopyFrom(*index_info.vector_index); + } } ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata( @@ -224,8 +235,17 @@ ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata( PageIdU64 data_page_id = cf_pb.id(); size_t rows = cf_pb.rows(); size_t bytes = cf_pb.bytes(); + auto index_infos = std::make_shared(); + index_infos->reserve(cf_pb.indexes().size()); + for (const auto & index_pb : cf_pb.indexes()) + { + if (index_pb.has_vector_index()) + index_infos->emplace_back(index_pb.index_page_id(), index_pb.vector_index()); + else + index_infos->emplace_back(index_pb.index_page_id(), std::nullopt); + } - return std::make_shared(schema, rows, bytes, data_page_id, context); + return std::make_shared(schema, rows, bytes, data_page_id, context, index_infos); } ColumnFilePersistedPtr ColumnFileTiny::restoreFromCheckpoint( @@ -236,36 +256,55 @@ ColumnFilePersistedPtr ColumnFileTiny::restoreFromCheckpoint( BlockPtr schema, PageIdU64 data_page_id, size_t rows, - size_t bytes) + size_t bytes, + IndexInfosPtr index_infos) { - auto new_cf_id = context.storage_pool->newLogPageId(); - /// Generate a new RemotePage with an entry with data location on S3 - auto remote_page_id = UniversalPageIdFormat::toFullPageId( - UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Log, context.physical_table_id), - data_page_id); - // The `data_file_id` in temp_ps is lock key, we need convert it to data key before write to local ps - auto remote_data_location = temp_ps->getCheckpointLocation(remote_page_id); - RUNTIME_CHECK(remote_data_location.has_value()); - auto remote_data_file_lock_key_view = S3::S3FilenameView::fromKey(*remote_data_location->data_file_id); - RUNTIME_CHECK(remote_data_file_lock_key_view.isLockFile()); - auto remote_data_file_key = remote_data_file_lock_key_view.asDataFile().toFullKey(); - PS::V3::CheckpointLocation new_remote_data_location{ - .data_file_id = std::make_shared(remote_data_file_key), - .offset_in_file = remote_data_location->offset_in_file, - .size_in_file = remote_data_location->size_in_file, + auto put_remote_page = [&](PageIdU64 page_id) { + auto new_cf_id = context.storage_pool->newLogPageId(); + /// Generate a new RemotePage with an entry with data location on S3 + auto remote_page_id = UniversalPageIdFormat::toFullPageId( + UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Log, context.physical_table_id), + page_id); + // The `data_file_id` in temp_ps is lock key, we need convert it to data key before write to local ps + auto remote_data_location = temp_ps->getCheckpointLocation(remote_page_id); + RUNTIME_CHECK(remote_data_location.has_value()); + auto remote_data_file_lock_key_view = S3::S3FilenameView::fromKey(*remote_data_location->data_file_id); + RUNTIME_CHECK(remote_data_file_lock_key_view.isLockFile()); + auto remote_data_file_key = remote_data_file_lock_key_view.asDataFile().toFullKey(); + PS::V3::CheckpointLocation new_remote_data_location{ + .data_file_id = std::make_shared(remote_data_file_key), + .offset_in_file = remote_data_location->offset_in_file, + .size_in_file = remote_data_location->size_in_file, + }; + // TODO: merge the `getEntry` and `getCheckpointLocation` + auto entry = temp_ps->getEntry(remote_page_id); + LOG_DEBUG( + parent_log, + "Write remote page to local, page_id={} remote_location={} remote_page_id={}", + new_cf_id, + new_remote_data_location.toDebugString(), + remote_page_id); + wbs.log.putRemotePage(new_cf_id, 0, entry.size, new_remote_data_location, std::move(entry.field_offsets)); + return new_cf_id; }; - // TODO: merge the `getEntry` and `getCheckpointLocation` - auto entry = temp_ps->getEntry(remote_page_id); - LOG_DEBUG( - parent_log, - "Write remote page to local, page_id={} remote_location={} remote_page_id={}", - new_cf_id, - new_remote_data_location.toDebugString(), - remote_page_id); - wbs.log.putRemotePage(new_cf_id, 0, entry.size, new_remote_data_location, std::move(entry.field_offsets)); + // Write column data page to local ps + auto new_cf_id = put_remote_page(data_page_id); auto column_file_schema = std::make_shared(*schema); - return std::make_shared(column_file_schema, rows, bytes, new_cf_id, context); + if (!index_infos) + return std::make_shared(column_file_schema, rows, bytes, new_cf_id, context); + + // Write index data page to local ps + auto new_index_infos = std::make_shared(); + for (const auto & index : *index_infos) + { + auto new_index_page_id = put_remote_page(index.index_page_id); + if (index.vector_index) + new_index_infos->emplace_back(new_index_page_id, index.vector_index); + else + new_index_infos->emplace_back(new_index_page_id, std::nullopt); + } + return std::make_shared(column_file_schema, rows, bytes, new_cf_id, context, new_index_infos); } std::tuple ColumnFileTiny::createFromCheckpoint( @@ -289,7 +328,7 @@ std::tuple ColumnFileTiny::createFromCheckpoin readIntBinary(bytes, buf); return { - restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes), + restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes, nullptr), schema, }; } @@ -310,9 +349,18 @@ std::tuple ColumnFileTiny::createFromCheckpoin PageIdU64 data_page_id = cf_pb.id(); size_t rows = cf_pb.rows(); size_t bytes = cf_pb.bytes(); + auto index_infos = std::make_shared(); + index_infos->reserve(cf_pb.indexes().size()); + for (const auto & index_pb : cf_pb.indexes()) + { + if (index_pb.has_vector_index()) + index_infos->emplace_back(index_pb.index_page_id(), index_pb.vector_index()); + else + index_infos->emplace_back(index_pb.index_page_id(), std::nullopt); + } return { - restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes), + restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes, index_infos), schema, }; } @@ -363,7 +411,7 @@ ColumnFileTinyPtr ColumnFileTiny::writeColumnFile( auto schema = getSharedBlockSchemas(context)->getOrCreate(block); auto bytes = block.bytes(offset, limit); - return std::make_shared(schema, limit, bytes, page_id, context, cache); + return std::make_shared(schema, limit, bytes, page_id, context, nullptr, cache); } PageIdU64 ColumnFileTiny::writeColumnFileData( @@ -424,6 +472,11 @@ PageIdU64 ColumnFileTiny::writeColumnFileData( void ColumnFileTiny::removeData(WriteBatches & wbs) const { wbs.removed_log.delPage(data_page_id); + if (index_infos) + { + for (const auto & index_info : *index_infos) + wbs.removed_log.delPage(index_info.index_page_id); + } } ColumnFileTiny::ColumnFileTiny( @@ -432,11 +485,13 @@ ColumnFileTiny::ColumnFileTiny( UInt64 bytes_, PageIdU64 data_page_id_, const DMContext & dm_context, + const IndexInfosPtr & index_infos_, const CachePtr & cache_) : schema(schema_) , rows(rows_) , bytes(bytes_) , data_page_id(data_page_id_) + , index_infos(index_infos_) , keyspace_id(dm_context.keyspace_id) , file_provider(dm_context.global_context.getFileProvider()) , cache(cache_) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h index 317ddd51ad8..2dd59ee7ba8 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h @@ -20,6 +20,7 @@ #include #include #include +#include #include namespace DB::DM @@ -34,9 +35,18 @@ using ColumnFileTinyPtr = std::shared_ptr; /// 2. created when flushed `ColumnFileInMemory` to disk class ColumnFileTiny : public ColumnFilePersisted { +public: friend class ColumnFileTinyReader; friend struct Remote::Serializer; + struct IndexInfo + { + PageIdU64 index_page_id{}; + std::optional vector_index = std::nullopt; + }; + using IndexInfos = std::vector; + using IndexInfosPtr = std::shared_ptr; + private: ColumnFileSchemaPtr schema; @@ -51,6 +61,9 @@ class ColumnFileTiny : public ColumnFilePersisted /// Maybe we should just drop this field, and store the data_page_size in somewhere else. UInt64 data_page_size = 0; + /// The index information of this file. + IndexInfosPtr index_infos; + /// The id of the keyspace which this ColumnFileTiny belongs to. KeyspaceID keyspace_id; /// The global file_provider @@ -87,6 +100,7 @@ class ColumnFileTiny : public ColumnFilePersisted UInt64 bytes_, PageIdU64 data_page_id_, const DMContext & dm_context, + const IndexInfosPtr & index_infos_ = nullptr, const CachePtr & cache_ = nullptr); Type getType() const override { return Type::TINY_FILE; } @@ -94,10 +108,21 @@ class ColumnFileTiny : public ColumnFilePersisted size_t getRows() const override { return rows; } size_t getBytes() const override { return bytes; } + IndexInfosPtr getIndexInfos() const { return index_infos; } + bool hasIndex(Int64 index_id) const + { + if (!index_infos) + return false; + return std::any_of(index_infos->cbegin(), index_infos->cend(), [index_id](const auto & info) { + if (!info.vector_index) + return false; + return info.vector_index->index_id() == index_id; + }); + } + auto getCache() const { return cache; } void clearCache() { cache = {}; } - /// The schema of this pack. Could be empty, i.e. a DeleteRange does not have a schema. ColumnFileSchemaPtr getSchema() const { return schema; } ColumnFileTinyPtr cloneWith(PageIdU64 new_data_page_id) @@ -107,6 +132,14 @@ class ColumnFileTiny : public ColumnFilePersisted return new_tiny_file; } + ColumnFileTinyPtr cloneWith(PageIdU64 new_data_page_id, const IndexInfosPtr & index_infos_) const + { + auto new_tiny_file = std::make_shared(*this); + new_tiny_file->data_page_id = new_data_page_id; + new_tiny_file->index_infos = index_infos_; + return new_tiny_file; + } + ColumnFileReaderPtr getReader( const DMContext &, const IColumnFileDataProviderPtr & data_provider, @@ -165,7 +198,8 @@ class ColumnFileTiny : public ColumnFilePersisted BlockPtr schema, PageIdU64 data_page_id, size_t rows, - size_t bytes); + size_t bytes, + IndexInfosPtr index_infos); static std::tuple createFromCheckpoint( const LoggerPtr & parent_log, const DMContext & context, diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index 584181c5852..cfaffe2faa5 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -28,10 +28,9 @@ #include -namespace DB -{ -namespace DM +namespace DB::DM { + // ================================================ // Public methods // ================================================ @@ -108,16 +107,6 @@ std::string DeltaValueSpace::serializeMeta() const return wb.releaseStr(); } -template -struct CloneColumnFilesHelper -{ - static std::vector clone( - DMContext & dm_context, - const std::vector & src, - const RowKeyRange & target_range, - WriteBatches & wbs); -}; - template std::vector CloneColumnFilesHelper::clone( DMContext & dm_context, @@ -158,6 +147,21 @@ std::vector CloneColumnFilesHelper::clone( // Use a newly created page_id to reference the data page_id of current column file. PageIdU64 new_data_page_id = dm_context.storage_pool->newLogPageId(); wbs.log.putRefPage(new_data_page_id, t->getDataPageId()); + if (auto index_infos = t->getIndexInfos(); index_infos) + { + auto new_index_infos = std::make_shared(); + new_index_infos->reserve(index_infos->size()); + // Use a newly created page_id to reference the index page_id of current column file. + for (auto & index_info : *index_infos) + { + auto new_index_page_id = dm_context.storage_pool->newLogPageId(); + wbs.log.putRefPage(new_index_page_id, index_info.index_page_id); + new_index_infos->emplace_back(new_index_page_id, index_info.vector_index); + } + auto new_column_file = t->cloneWith(new_data_page_id, new_index_infos); + cloned.push_back(new_column_file); + continue; + } auto new_column_file = t->cloneWith(new_data_page_id); cloned.push_back(new_column_file); } @@ -503,5 +507,5 @@ bool DeltaValueSpace::compact(DMContext & context) return true; } -} // namespace DM -} // namespace DB + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 6902f1c8df7..3432b747701 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -338,6 +338,16 @@ class DeltaValueSpace DeltaSnapshotPtr createSnapshot(const DMContext & context, bool for_update, CurrentMetrics::Metric type); }; +template +struct CloneColumnFilesHelper +{ + static std::vector clone( + DMContext & dm_context, + const std::vector & src, + const RowKeyRange & target_range, + WriteBatches & wbs); +}; + class DeltaValueSnapshot : public std::enable_shared_from_this , private boost::noncopyable diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp index b77855d497b..432bab7c9d6 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp @@ -231,42 +231,69 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats() { UNUSED(handle); - // Currently Delta is always not indexed. - index_stats.rows_delta_not_indexed - += segment->getDelta()->getRows(); // TODO: More precisely count column bytes instead. - - const auto & stable = segment->getStable(); - bool is_stable_indexed = true; - for (const auto & dmfile : stable->getDMFiles()) + // Delta { - const auto [state, bytes] = dmfile->getLocalIndexState(index_info.column_id, index_info.index_id); - UNUSED(bytes); - switch (state) + const auto & delta = segment->getDelta(); + auto lock = delta->getLock(); + if (!lock) + continue; + const auto & mem_table = delta->getMemTableSet(); + index_stats.rows_delta_not_indexed += mem_table->getRows(); + const auto & persisted = delta->getPersistedFileSet(); + for (const auto & file : persisted->getFiles()) { - case DMFileMeta::LocalIndexState::NoNeed: // Regard as indexed, because column does not need any index - case DMFileMeta::LocalIndexState::IndexBuilt: - break; - case DMFileMeta::LocalIndexState::IndexPending: - is_stable_indexed = false; - break; + // TODO: this is not efficient, we can maintain the indexed_rows in ColumnFilePersisted + const auto * tiny_file = file->tryToTinyFile(); + if (tiny_file) + { + if (tiny_file->hasIndex(index_stats.index_id)) + index_stats.rows_delta_indexed += tiny_file->getRows(); + else + index_stats.rows_delta_not_indexed += tiny_file->getRows(); + } + else + { + index_stats.rows_delta_not_indexed += file->getRows(); + } } } - if (is_stable_indexed) - { - index_stats.rows_stable_indexed += stable->getRows(); - } - else + // Stable { - index_stats.rows_stable_not_indexed += stable->getRows(); - } + const auto & stable = segment->getStable(); + bool is_stable_indexed = true; + for (const auto & dmfile : stable->getDMFiles()) + { + const auto [state, bytes] = dmfile->getLocalIndexState(index_info.column_id, index_info.index_id); + UNUSED(bytes); + switch (state) + { + case DMFileMeta::LocalIndexState::NoNeed: + // Regard as indexed, because column does not need any index + case DMFileMeta::LocalIndexState::IndexBuilt: + break; + case DMFileMeta::LocalIndexState::IndexPending: + is_stable_indexed = false; + break; + } + } - const auto index_build_error = segment->getIndexBuildError(); - // Set error_message to the first error_message we meet among all segments - if (auto err_iter = index_build_error.find(index_info.index_id); - err_iter != index_build_error.end() && index_stats.error_message.empty()) - { - index_stats.error_message = err_iter->second; + if (is_stable_indexed) + { + index_stats.rows_stable_indexed += stable->getRows(); + } + else + { + index_stats.rows_stable_not_indexed += stable->getRows(); + } + + const auto index_build_error = segment->getIndexBuildError(); + // Set error_message to the first error_message we meet among all segments + if (auto err_iter = index_build_error.find(index_info.index_id); + err_iter != index_build_error.end() && index_stats.error_message.empty()) + { + index_stats.error_message = err_iter->second; + } } } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h index 8566f144980..fc783492444 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h @@ -201,7 +201,7 @@ class DMFileMeta public: - enum LocalIndexState + enum class LocalIndexState { NoNeed, IndexPending, diff --git a/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto b/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto index 62b4022dc76..64d904404e4 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto +++ b/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto @@ -74,6 +74,25 @@ message ColumnFileInMemory { uint64 rows = 3; } +// Note: This message is something different to VectorIndexDefinition. +// VectorIndexDefinition defines an index, comes from table DDL. +// It includes information about how index should be constructed, +// for example, it contains HNSW's 'efConstruct' parameter. +// However, VectorIndexFileProps provides information for read out the index, +// for example, very basic information about what the index is, and how it is stored. +message VectorIndexFileProps { + string index_kind = 1; // The value is tipb.VectorIndexKind + string distance_metric = 2; // The value is tipb.VectorDistanceMetric + uint64 dimensions = 3; + int64 index_id = 4; + uint64 index_bytes = 5; +} + +message IndexInfo { + uint64 index_page_id = 1; // which page it stores + VectorIndexFileProps vector_index = 2; // vector index file properties, needed when read +} + message ColumnFileTiny { uint64 page_id = 1; uint64 page_size = 2; @@ -84,6 +103,8 @@ message ColumnFileTiny { uint64 rows = 6; uint64 bytes = 7; + + repeated IndexInfo indexes = 8; } message ColumnFileBig { diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index e51b7f5ea2e..f260c54f012 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -356,6 +356,25 @@ RemotePb::ColumnFileRemote Serializer::serializeCFTiny( remote_tiny->set_rows(cf_tiny.rows); remote_tiny->set_bytes(cf_tiny.bytes); + if (!cf_tiny.index_infos) + return ret; + + for (const auto & index_info : *cf_tiny.index_infos) + { + auto * index_pb = remote_tiny->add_indexes(); + index_pb->set_index_page_id(index_info.index_page_id); + if (index_info.vector_index.has_value()) + { + RemotePb::VectorIndexFileProps index_props; + index_props.set_index_kind(index_info.vector_index->index_kind()); + index_props.set_distance_metric(index_info.vector_index->distance_metric()); + index_props.set_dimensions(index_info.vector_index->dimensions()); + index_props.set_index_id(index_info.vector_index->index_id()); + index_props.set_index_bytes(index_info.vector_index->index_bytes()); + index_pb->mutable_vector_index()->Swap(&index_props); + } + } + // TODO: read the checkpoint info from data_provider and send it to the compute node return ret; @@ -371,9 +390,32 @@ ColumnFileTinyPtr Serializer::deserializeCFTiny(const DMContext & dm_context, co // We do not try to reuse the CFSchema from `SharedBlockSchemas`, because the ColumnFile will be freed immediately after the request. auto schema = std::make_shared(*block_schema); - auto cf = std::make_shared(schema, proto.rows(), proto.bytes(), proto.page_id(), dm_context); - cf->data_page_size = proto.page_size(); + auto index_infos = std::make_shared(); + index_infos->reserve(proto.indexes().size()); + for (const auto & index_pb : proto.indexes()) + { + if (index_pb.has_vector_index()) + { + dtpb::VectorIndexFileProps index_props; + index_props.set_index_kind(index_pb.vector_index().index_kind()); + index_props.set_distance_metric(index_pb.vector_index().distance_metric()); + index_props.set_dimensions(index_pb.vector_index().dimensions()); + index_props.set_index_id(index_pb.vector_index().index_id()); + index_props.set_index_bytes(index_pb.vector_index().index_bytes()); + index_infos->emplace_back(index_pb.index_page_id(), index_props); + } + else + index_infos->emplace_back(index_pb.index_page_id(), std::nullopt); + } + auto cf = std::make_shared( + schema, + proto.rows(), + proto.bytes(), + proto.page_id(), + dm_context, + index_infos); + cf->data_page_size = proto.page_size(); return cf; } diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index 0acbb1c8d53..8fe1dea8e50 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -112,6 +112,7 @@ SegmentReadTask::SegmentReadTask( ranges.push_back(RowKeyRange::deserialize(rb)); } + // The index page of ColumnFileTiny is also included. std::vector remote_page_ids; std::vector remote_page_sizes; { @@ -130,6 +131,19 @@ SegmentReadTask::SegmentReadTask( remote_page_ids.emplace_back(tiny->getDataPageId()); remote_page_sizes.emplace_back(tiny->getDataPageSize()); ++count; + // Add vector index pages. + if (auto index_infos = tiny->getIndexInfos(); index_infos) + { + for (const auto & index_info : *index_infos) + { + if (index_info.vector_index) + { + remote_page_ids.emplace_back(index_info.index_page_id); + remote_page_sizes.emplace_back(index_info.vector_index->index_bytes()); + ++count; + } + } + } } } return count; diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h index be28df7dc8e..a52d88f2840 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h @@ -50,6 +50,7 @@ struct ExtraRemoteSegmentInfo String store_address; // DisaggTaskId is corresponding to a storage snapshot in write node. // Returned by EstablishDisaggTask and used by FetchDisaggPages. + // The index pages of ColumnFileTiny are also included. DisaggTaskId snapshot_id; std::vector remote_page_ids; std::vector remote_page_sizes; diff --git a/dbms/src/Storages/DeltaMerge/dtpb/CMakeLists.txt b/dbms/src/Storages/DeltaMerge/dtpb/CMakeLists.txt index 1f571f6e209..bcc869e3828 100644 --- a/dbms/src/Storages/DeltaMerge/dtpb/CMakeLists.txt +++ b/dbms/src/Storages/DeltaMerge/dtpb/CMakeLists.txt @@ -12,12 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -protobuf_generate_cpp( - dtpb_srcs dtpb_hdrs - dmfile.proto - column_file.proto - segment.proto -) +file(GLOB PROTO_FILES CONFIGURE_DEPENDS *.proto) +protobuf_generate_cpp(dtpb_srcs dtpb_hdrs ${PROTO_FILES}) add_library(dtpb ${dtpb_srcs}) target_include_directories(dtpb PUBLIC ${Protobuf_INCLUDE_DIR}) diff --git a/dbms/src/Storages/DeltaMerge/dtpb/column_file.proto b/dbms/src/Storages/DeltaMerge/dtpb/column_file.proto index 6ec97db9507..34b63372a8f 100644 --- a/dbms/src/Storages/DeltaMerge/dtpb/column_file.proto +++ b/dbms/src/Storages/DeltaMerge/dtpb/column_file.proto @@ -16,6 +16,8 @@ syntax = "proto2"; package dtpb; +import "vector_index.proto"; // for VectorIndexFileProps + message ColumnFileBig { required uint64 id = 1; required uint64 valid_rows = 2; @@ -40,11 +42,17 @@ message ColumnSchema { required string column_type = 3; } +message IndexInfo { + required uint64 index_page_id = 1; // which page it stores + optional VectorIndexFileProps vector_index = 2; // vector index file properties, needed when read +} + message ColumnFileTiny { repeated ColumnSchema columns = 1; required uint64 id = 2; required uint64 rows = 3; required uint64 bytes = 4; + repeated IndexInfo indexes = 5; } enum ColumnFileType { diff --git a/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto b/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto index 59aff511a2b..1161ee6c7b4 100644 --- a/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto +++ b/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto @@ -16,6 +16,8 @@ syntax = "proto2"; package dtpb; +import "vector_index.proto"; // for VectorIndexFileProps + message PackProperty { // when gc_safe_point exceed this version, there must be some data obsolete in this pack required uint64 gc_hint_version = 1; @@ -82,17 +84,3 @@ message StableLayerMeta { optional uint64 valid_bytes = 2; repeated StableFile files = 3; } - -// Note: This message is something different to VectorIndexDefinition. -// VectorIndexDefinition defines an index, comes from table DDL. -// It includes information about how index should be constructed, -// for example, it contains HNSW's 'efConstruct' parameter. -// However, VectorIndexFileProps provides information for read out the index, -// for example, very basic information about what the index is, and how it is stored. -message VectorIndexFileProps { - optional string index_kind = 1; // The value is tipb.VectorIndexKind - optional string distance_metric = 2; // The value is tipb.VectorDistanceMetric - optional uint64 dimensions = 3; - optional int64 index_id = 4; - optional uint64 index_bytes = 5; -} diff --git a/dbms/src/Storages/DeltaMerge/dtpb/vector_index.proto b/dbms/src/Storages/DeltaMerge/dtpb/vector_index.proto new file mode 100644 index 00000000000..34060b99592 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/dtpb/vector_index.proto @@ -0,0 +1,31 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; + +package dtpb; + +// Note: This message is something different to VectorIndexDefinition. +// VectorIndexDefinition defines an index, comes from table DDL. +// It includes information about how index should be constructed, +// for example, it contains HNSW's 'efConstruct' parameter. +// However, VectorIndexFileProps provides information for read out the index, +// for example, very basic information about what the index is, and how it is stored. +message VectorIndexFileProps { + optional string index_kind = 1; // The value is tipb.VectorIndexKind + optional string distance_metric = 2; // The value is tipb.VectorDistanceMetric + optional uint64 dimensions = 3; + optional int64 index_id = 4; + optional uint64 index_bytes = 5; +} diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_column_file_clone.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_column_file_clone.cpp new file mode 100644 index 00000000000..b29abae5878 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_column_file_clone.cpp @@ -0,0 +1,94 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + + +namespace DB::DM::tests +{ + +class ColumnFileCloneTest : public SegmentTestBasic +{ +}; + +TEST_F(ColumnFileCloneTest, CloneColumnFileTinyWithVectorIndex) +{ + WriteBatches wbs(*storage_pool, dm_context->getWriteLimiter()); + const PageIdU64 index_page_id = dm_context->storage_pool->newLogPageId(); + const PageIdU64 data_page_id = dm_context->storage_pool->newLogPageId(); + String mock_data_page_content = "mock_data_page_content"; + wbs.log.putPage(data_page_id, 0, std::string_view{mock_data_page_content.data(), mock_data_page_content.size()}); + String mock_index_page_content = "mock_index_page_content"; + wbs.log.putPage(index_page_id, 0, std::string_view{mock_index_page_content.data(), mock_index_page_content.size()}); + + dtpb::VectorIndexFileProps index_props; + index_props.set_index_kind(tipb::VectorIndexKind_Name(tipb::VectorIndexKind::HNSW)); + index_props.set_distance_metric(tipb::VectorDistanceMetric_Name(tipb::VectorDistanceMetric::L2)); + index_props.set_dimensions(1); + index_props.set_index_id(1); + index_props.set_index_bytes(10); + std::vector persisted_files; + { + auto index_infos = std::make_shared(); + index_infos->emplace_back(data_page_id, index_props); + auto file = std::make_shared(nullptr, 1, 10, data_page_id, *dm_context, index_infos); + persisted_files.emplace_back(std::move(file)); + } + auto range = RowKeyRange::newAll(dm_context->is_common_handle, dm_context->rowkey_column_size); + auto new_persisted_files + = CloneColumnFilesHelper::clone(*dm_context, persisted_files, range, wbs); + wbs.writeAll(); + ASSERT_EQ(new_persisted_files.size(), 1); + auto new_persisted_file = new_persisted_files[0]; + auto new_tiny_file = std::dynamic_pointer_cast(new_persisted_file); + ASSERT_NE(new_tiny_file, nullptr); + // Different data page id + ASSERT_NE(new_tiny_file->getDataPageId(), data_page_id); + ASSERT_EQ(new_tiny_file->getIndexInfos()->size(), 1); + auto new_index_info = new_tiny_file->getIndexInfos()->at(0); + // Different index page id + ASSERT_NE(new_index_info.index_page_id, index_page_id); + // Same index properties + ASSERT_EQ(new_index_info.vector_index->index_bytes(), 10); + ASSERT_EQ(new_index_info.vector_index->index_id(), 1); + ASSERT_EQ(new_index_info.vector_index->index_kind(), tipb::VectorIndexKind_Name(tipb::VectorIndexKind::HNSW)); + ASSERT_EQ( + new_index_info.vector_index->distance_metric(), + tipb::VectorDistanceMetric_Name(tipb::VectorDistanceMetric::L2)); + ASSERT_EQ(new_index_info.vector_index->dimensions(), 1); + + // Check the data page and index page content + auto storage_snap = std::make_shared( + *dm_context->storage_pool, + dm_context->getReadLimiter(), + dm_context->tracing_id, + /*snapshot_read*/ true); + auto data_from_storage_snap = ColumnFileDataProviderLocalStoragePool::create(storage_snap); + { + Page page = data_from_storage_snap->readTinyData(data_page_id, {}); + ASSERT_EQ(page.data.size(), mock_data_page_content.size()); + ASSERT_EQ(String(page.data.data(), page.data.size()), mock_data_page_content); + } + { + Page page = data_from_storage_snap->readTinyData(index_page_id, {}); + ASSERT_EQ(page.data.size(), mock_index_page_content.size()); + ASSERT_EQ(String(page.data.data(), page.data.size()), mock_index_page_content.data()); + } +} + +} // namespace DB::DM::tests