Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Refine local index for more index kinds #9774

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ namespace DB
M(force_wait_index_timeout) \
M(force_local_index_task_memory_limit_exceeded) \
M(exception_build_local_index_for_file) \
M(force_not_support_vector_index) \
M(force_not_support_local_index) \
M(sync_schema_request_failure)

#define APPLY_FOR_FAILPOINTS(M) \
Expand Down
88 changes: 70 additions & 18 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,45 @@
namespace DB::DM
{

namespace details
{

inline dtpb::ColumnFileIndexInfo migrateFromIndexInfoV1(const dtpb::ColumnFileIndexInfo & index_pb)
{
RUNTIME_CHECK(index_pb.has_deprecated_vector_index());

auto idx_info = dtpb::ColumnFileIndexInfo{};
idx_info.set_index_page_id(index_pb.index_page_id());
auto * idx_props = idx_info.mutable_index_props();
idx_props->set_kind(dtpb::IndexFileKind::VECTOR_INDEX);
idx_props->set_index_id(index_pb.deprecated_vector_index().index_id());
idx_props->set_file_size(index_pb.deprecated_vector_index().index_bytes());
auto * vec_idx = idx_props->mutable_vector_index();
vec_idx->set_format_version(0);
vec_idx->set_dimensions(index_pb.deprecated_vector_index().dimensions());
vec_idx->set_distance_metric(index_pb.deprecated_vector_index().distance_metric());
return idx_info;
}

inline void integrityCheckIndexInfoV2(const dtpb::ColumnFileIndexInfo & index_info)
{
RUNTIME_CHECK(index_info.has_index_page_id());
RUNTIME_CHECK(index_info.index_props().has_file_size());
RUNTIME_CHECK(index_info.index_props().has_index_id());
RUNTIME_CHECK(index_info.index_props().has_kind());
switch (index_info.index_props().kind())
{
case dtpb::IndexFileKind::VECTOR_INDEX:
RUNTIME_CHECK(index_info.index_props().has_vector_index());
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index_info.index_props().kind()));
}
}

} // namespace details


ColumnFileReaderPtr ColumnFileTiny::getReader(
const DMContext &,
const IColumnFileDataProviderPtr & data_provider,
Expand Down Expand Up @@ -61,10 +100,12 @@ void ColumnFileTiny::serializeMetadata(dtpb::ColumnFilePersisted * cf_pb, bool s

for (const auto & index_info : *index_infos)
{
// Just some integrity checks to ensure we are writing correct data.
// These data may come from deserialization, or generated in runtime.
details::integrityCheckIndexInfoV2(index_info);

auto * index_pb = tiny_pb->add_indexes();
index_pb->set_index_page_id(index_info.index_page_id);
if (index_info.vector_index.has_value())
index_pb->mutable_vector_index()->CopyFrom(*index_info.vector_index);
index_pb->CopyFrom(index_info);
}
}

Expand Down Expand Up @@ -121,10 +162,16 @@ ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata(
index_infos->reserve(cf_pb.indexes().size());
for (const auto & index_pb : cf_pb.indexes())
{
if (index_pb.has_vector_index())
index_infos->emplace_back(index_pb.index_page_id(), index_pb.vector_index());
else
index_infos->emplace_back(index_pb.index_page_id(), std::nullopt);
// Old format compatibility
if unlikely (index_pb.has_deprecated_vector_index())
{
auto idx_info = details::migrateFromIndexInfoV1(index_pb);
index_infos->emplace_back(std::move(idx_info));
continue;
}

details::integrityCheckIndexInfoV2(index_pb);
index_infos->emplace_back(index_pb);
}

return std::make_shared<ColumnFileTiny>(schema, rows, bytes, data_page_id, context, index_infos);
Expand Down Expand Up @@ -178,13 +225,13 @@ ColumnFilePersistedPtr ColumnFileTiny::restoreFromCheckpoint(

// Write index data page to local ps
auto new_index_infos = std::make_shared<IndexInfos>();
for (const auto & index : *index_infos)
for (const auto & index_pb : *index_infos)
{
auto new_index_page_id = put_remote_page(index.index_page_id);
if (index.vector_index)
new_index_infos->emplace_back(new_index_page_id, index.vector_index);
else
new_index_infos->emplace_back(new_index_page_id, std::nullopt);
details::integrityCheckIndexInfoV2(index_pb);
auto new_index_page_id = put_remote_page(index_pb.index_page_id());
auto new_index_pb = index_pb;
new_index_pb.set_index_page_id(new_index_page_id);
new_index_infos->emplace_back(std::move(index_pb));
}
return std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context, new_index_infos);
}
Expand Down Expand Up @@ -235,10 +282,15 @@ std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoin
index_infos->reserve(cf_pb.indexes().size());
for (const auto & index_pb : cf_pb.indexes())
{
if (index_pb.has_vector_index())
index_infos->emplace_back(index_pb.index_page_id(), index_pb.vector_index());
else
index_infos->emplace_back(index_pb.index_page_id(), std::nullopt);
// Old format compatibility
if unlikely (index_pb.has_deprecated_vector_index())
{
auto idx_info = details::migrateFromIndexInfoV1(index_pb);
index_infos->emplace_back(std::move(idx_info));
continue;
}

index_infos->emplace_back(index_pb);
}

return {
Expand Down Expand Up @@ -343,7 +395,7 @@ void ColumnFileTiny::removeData(WriteBatches & wbs) const
if (index_infos)
{
for (const auto & index_info : *index_infos)
wbs.removed_log.delPage(index_info.index_page_id);
wbs.removed_log.delPage(index_info.index_page_id());
}
}

Expand Down
19 changes: 3 additions & 16 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <Storages/DeltaMerge/DMContext_fwd.h>
#include <Storages/DeltaMerge/Remote/Serializer_fwd.h>
#include <Storages/DeltaMerge/dtpb/column_file.pb.h>
#include <Storages/DeltaMerge/dtpb/vector_index.pb.h>
#include <Storages/Page/PageStorage_fwd.h>

namespace DB::DM
Expand All @@ -37,21 +36,11 @@ class ColumnFileTiny : public ColumnFilePersisted
{
public:
friend class ColumnFileTinyReader;
friend class ColumnFileTinyVectorIndexWriter;
friend class ColumnFileTinyLocalIndexWriter;
friend class ColumnFileTinyVectorIndexReader;
friend struct Remote::Serializer;

struct IndexInfo
{
IndexInfo(PageIdU64 page_id, std::optional<dtpb::VectorIndexFileProps> vec_index)
: index_page_id(page_id)
, vector_index(vec_index)
{}

PageIdU64 index_page_id{};
std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;
};
using IndexInfos = std::vector<IndexInfo>;
using IndexInfos = std::vector<dtpb::ColumnFileIndexInfo>;
using IndexInfosPtr = std::shared_ptr<IndexInfos>;

private:
Expand Down Expand Up @@ -99,9 +88,7 @@ class ColumnFileTiny : public ColumnFilePersisted
if (!index_infos)
return false;
return std::any_of(index_infos->cbegin(), index_infos->cend(), [index_id](const auto & info) {
if (!info.vector_index)
return false;
return info.vector_index->index_id() == index_id;
return info.index_props().index_id() == index_id;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

#include <IO/Compression/CompressedWriteBuffer.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyLocalIndexWriter.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexWriter.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>

Expand All @@ -28,7 +28,7 @@ extern const int ABORTED;
namespace DB::DM
{

ColumnFileTinyVectorIndexWriter::LocalIndexBuildInfo ColumnFileTinyVectorIndexWriter::getLocalIndexBuildInfo(
ColumnFileTinyLocalIndexWriter::LocalIndexBuildInfo ColumnFileTinyLocalIndexWriter::getLocalIndexBuildInfo(
const LocalIndexInfosSnapshot & index_infos,
const ColumnFilePersistedSetPtr & file_set)
{
Expand Down Expand Up @@ -78,7 +78,7 @@ ColumnFileTinyVectorIndexWriter::LocalIndexBuildInfo ColumnFileTinyVectorIndexWr
return build;
}

ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
const ColumnDefines & column_defines,
const ColumnDefine & del_cd,
const ColumnFileTiny * file,
Expand All @@ -91,17 +91,27 @@ ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
read_columns->reserve(options.index_infos->size() + 1);
read_columns->push_back(del_cd);

std::unordered_map<ColId, std::vector<VectorIndexBuilderPtr>> index_builders;
struct IndexToBuild
{
LocalIndexInfo info;
VectorIndexBuilderPtr builder_vector;
};

std::unordered_map<ColId, std::vector<IndexToBuild>> index_builders;

std::unordered_map<ColId, std::vector<LocalIndexInfo>> col_indexes;
for (const auto & index_info : *options.index_infos)
{
if (index_info.type != IndexType::Vector)
// Just skip if the index is already built
if (file->hasIndex(index_info.index_id))
continue;
col_indexes[index_info.column_id].emplace_back(index_info);
RUNTIME_CHECK(index_info.def_vector_index != nullptr);
index_builders[index_info.column_id].emplace_back(IndexToBuild{
.info = index_info,
.builder_vector = {},
});
}

for (const auto & [col_id, index_infos] : col_indexes)
for (auto & [col_id, indexes] : index_builders)
{
// Make sure the column_id is in the schema.
const auto cd_iter = std::find_if( //
Expand All @@ -114,14 +124,17 @@ ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
col_id,
file->getDataPageId());

for (const auto & idx_info : index_infos)
for (auto & index : indexes)
{
// Just skip if the index is already built
if (file->hasIndex(idx_info.index_id))
continue;

index_builders[col_id].emplace_back(
VectorIndexBuilder::create(idx_info.index_id, idx_info.index_definition));
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
index.builder_vector = VectorIndexBuilder::create(index.info.def_vector_index);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
}
read_columns->push_back(*cd_iter);
}
Expand Down Expand Up @@ -156,9 +169,18 @@ ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
const auto & col_with_type_and_name = block.safeGetByPosition(col_idx);
RUNTIME_CHECK(col_with_type_and_name.column_id == read_columns->at(col_idx).id);
const auto & col = col_with_type_and_name.column;
for (const auto & index_builder : index_builders[read_columns->at(col_idx).id])
for (const auto & index : index_builders[read_columns->at(col_idx).id])
{
index_builder->addBlock(*col, del_mark, should_proceed);
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
RUNTIME_CHECK(index.builder_vector);
index.builder_vector->addBlock(*col, del_mark, should_proceed);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
}
}
}
Expand All @@ -168,26 +190,42 @@ ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
for (size_t col_idx = 1; col_idx < num_cols; ++col_idx)
{
const auto & cd = read_columns->at(col_idx);
for (const auto & index_builder : index_builders[cd.id])
for (const auto & index : index_builders[cd.id])
{
auto index_page_id = options.storage_pool->newLogPageId();
MemoryWriteBuffer write_buf;
CompressedWriteBuffer compressed(write_buf);
index_builder->saveToBuffer(compressed);
compressed.next();
auto data_size = write_buf.count();
auto buf = write_buf.tryGetReadBuffer();
// ColumnFileDataProviderRNLocalPageCache currently does not support read data with fields
options.wbs.log.putPage(index_page_id, 0, buf, data_size, {data_size});

dtpb::VectorIndexFileProps vector_index;
vector_index.set_index_id(index_builder->index_id);
vector_index.set_index_bytes(data_size);
vector_index.set_index_kind(tipb::VectorIndexKind_Name(index_builder->definition->kind));
vector_index.set_distance_metric(
tipb::VectorDistanceMetric_Name(index_builder->definition->distance_metric));
vector_index.set_dimensions(index_builder->definition->dimension);
index_infos->emplace_back(index_page_id, vector_index);
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
{
RUNTIME_CHECK(index.builder_vector);
auto index_page_id = options.storage_pool->newLogPageId();
MemoryWriteBuffer write_buf;
CompressedWriteBuffer compressed(write_buf);
index.builder_vector->saveToBuffer(compressed);
compressed.next();
auto data_size = write_buf.count();
auto buf = write_buf.tryGetReadBuffer();
// ColumnFileDataProviderRNLocalPageCache currently does not support read data with fields
options.wbs.log.putPage(index_page_id, 0, buf, data_size, {data_size});

auto idx_info = dtpb::ColumnFileIndexInfo{};
idx_info.set_index_page_id(index_page_id);
auto * idx_props = idx_info.mutable_index_props();
idx_props->set_kind(dtpb::IndexFileKind::VECTOR_INDEX);
idx_props->set_index_id(index.info.index_id);
idx_props->set_file_size(data_size);
auto * vector_index = idx_props->mutable_vector_index();
vector_index->set_format_version(0);
vector_index->set_dimensions(index.info.def_vector_index->dimension);
vector_index->set_distance_metric(
tipb::VectorDistanceMetric_Name(index.info.def_vector_index->distance_metric));
index_infos->emplace_back(std::move(idx_info));

break;
}
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
}
}

Expand All @@ -200,7 +238,7 @@ ColumnFileTinyPtr ColumnFileTinyVectorIndexWriter::buildIndexForFile(
return file->cloneWith(file->getDataPageId(), index_infos);
}

ColumnFileTinys ColumnFileTinyVectorIndexWriter::build(ProceedCheckFn should_proceed) const
ColumnFileTinys ColumnFileTinyLocalIndexWriter::build(ProceedCheckFn should_proceed) const
{
ColumnFileTinys new_files;
new_files.reserve(options.files.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ namespace DB::DM

using ColumnFileTinys = std::vector<ColumnFileTinyPtr>;

// ColumnFileTinyVectorIndexWriter write vector index store in PageStorage for ColumnFileTiny.
class ColumnFileTinyVectorIndexWriter
// ColumnFileTinyLocalIndexWriter write vector index store in PageStorage for ColumnFileTiny.
class ColumnFileTinyLocalIndexWriter
{
public:
struct LocalIndexBuildInfo
Expand All @@ -54,7 +54,7 @@ class ColumnFileTinyVectorIndexWriter
WriteBatches & wbs; // Write index and modify meta in the same batch.
};

explicit ColumnFileTinyVectorIndexWriter(const Options & options)
explicit ColumnFileTinyLocalIndexWriter(const Options & options)
: logger(Logger::get())
, options(options)
{}
Expand Down
Loading