From 38aef7d2e4a487f6d0bc669b75381d88af959f6c Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Fri, 29 Nov 2024 13:36:24 +0300 Subject: [PATCH] Make it possible to change in-memory setting for tables (#12099) --- ydb/core/tablet_flat/flat_dbase_apply.cpp | 3 + ydb/core/tablet_flat/flat_dbase_scheme.h | 3 + ydb/core/tablet_flat/flat_executor.cpp | 22 +++-- ydb/core/tablet_flat/flat_executor.h | 2 +- ydb/core/tablet_flat/flat_executor_ut.cpp | 4 +- ydb/core/ydb_convert/column_families.h | 14 ++-- ydb/public/sdk/cpp/client/ydb_table/table.cpp | 5 ++ ydb/public/sdk/cpp/client/ydb_table/table.h | 11 +++ ydb/services/ydb/ydb_ut.cpp | 81 +++++++++++++++++++ 9 files changed, 130 insertions(+), 15 deletions(-) diff --git a/ydb/core/tablet_flat/flat_dbase_apply.cpp b/ydb/core/tablet_flat/flat_dbase_apply.cpp index 75b009db76d3..fa3e8b8cfde2 100644 --- a/ydb/core/tablet_flat/flat_dbase_apply.cpp +++ b/ydb/core/tablet_flat/flat_dbase_apply.cpp @@ -97,6 +97,9 @@ bool TSchemeModifier::Apply(const TAlterRecord &delta) ui32 large = delta.HasLarge() ? delta.GetLarge() : family.Large; Y_ABORT_UNLESS(ui32(cache) <= 2, "Invalid pages cache policy value"); + if (family.Cache != cache && cache == ECache::Ever) { + ChangeTableSetting(table, tableInfo.PendingCacheEnable, true); + } changes |= ChangeTableSetting(table, family.Cache, cache); changes |= ChangeTableSetting(table, family.Codec, codec); changes |= ChangeTableSetting(table, family.Small, small); diff --git a/ydb/core/tablet_flat/flat_dbase_scheme.h b/ydb/core/tablet_flat/flat_dbase_scheme.h index 9b51a9591466..ed2c911886fd 100644 --- a/ydb/core/tablet_flat/flat_dbase_scheme.h +++ b/ydb/core/tablet_flat/flat_dbase_scheme.h @@ -85,6 +85,9 @@ class TScheme { bool EraseCacheEnabled = false; ui32 EraseCacheMinRows = 0; // 0 means use default ui32 EraseCacheMaxBytes = 0; // 0 means use default + + // When true this table has an in-memory caching enabled that has not been processed yet + mutable bool PendingCacheEnable = false; }; struct TRedo { diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index c786fb29aa16..c2431e929362 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -408,6 +408,8 @@ void TExecutor::ActivateFollower(const TActorContext &ctx) { RecreatePageCollectionsCache(); ReflectSchemeSettings(); + RequestInMemPagesForDatabase(); + Become(&TThis::StateFollower); Stats->IsActive = true; Stats->FollowerId = FollowerId; @@ -647,16 +649,21 @@ void TExecutor::TranslateCacheTouchesToSharedCache() { Send(MakeSharedPageCacheId(), new NSharedCache::TEvTouch(std::move(touches))); } -void TExecutor::RequestInMemPagesForDatabase() { - const auto &scheme = Scheme(); - for (auto &sxpair : scheme.Tables) { - auto stickyColumns = GetStickyColumns(sxpair.first); +void TExecutor::RequestInMemPagesForDatabase(bool pendingOnly) { + const auto& scheme = Scheme(); + for (auto& pr : scheme.Tables) { + const ui32 tid = pr.first; + if (pendingOnly && !pr.second.PendingCacheEnable) { + continue; + } + auto stickyColumns = GetStickyColumns(tid); if (stickyColumns) { - auto subset = Database->Subset(sxpair.first, NTable::TEpoch::Max(), { } , { }); + auto subset = Database->Subset(tid, NTable::TEpoch::Max(), { } , { }); for (auto &partView: subset->Flatten) - RequestInMemPagesForPartStore(sxpair.first, partView, stickyColumns); + RequestInMemPagesForPartStore(tid, partView, stickyColumns); } + pr.second.PendingCacheEnable = false; } } @@ -968,6 +975,7 @@ void TExecutor::ApplyFollowerUpdate(THolder update) { if (schemeUpdate) { ReadResourceProfile(); ReflectSchemeSettings(); + RequestInMemPagesForDatabase(/* pendingOnly */ true); Owner->OnFollowerSchemaUpdated(); } @@ -1345,6 +1353,7 @@ void TExecutor::RequestInMemPagesForPartStore(ui32 tableId, const NTable::TPartV for (ui32 pageId : req->Pages) PrivatePageCache->MarkSticky(pageId, info); + // TODO: only request missing pages RequestFromSharedCache(req, NBlockIO::EPriority::Bkgr, EPageCollectionRequest::CacheSync); } } @@ -2065,6 +2074,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr seat, TPageCollectionTxEnv ReadResourceProfile(); ReflectSchemeSettings(); + RequestInMemPagesForDatabase(/* pendingOnly */ true); // For every table that changed strategy we need to generate a // special part switch that notifies bootlogic about new strategy diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index ceac02c6c2d2..0a9b0acae6e7 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -513,7 +513,7 @@ class TExecutor void DropSingleCache(const TLogoBlobID&) noexcept; void TranslateCacheTouchesToSharedCache(); - void RequestInMemPagesForDatabase(); + void RequestInMemPagesForDatabase(bool pendingOnly = false); void RequestInMemPagesForPartStore(ui32 tableId, const NTable::TPartView &partView, const THashSet &stickyColumns); THashSet GetStickyColumns(ui32 tableId); void RequestFromSharedCache(TAutoPtr fetch, diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp index 1f7187b42e25..426c25016dbb 100644 --- a/ydb/core/tablet_flat/flat_executor_ut.cpp +++ b/ydb/core/tablet_flat/flat_executor_ut.cpp @@ -6140,7 +6140,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorStickyPages) { int failedAttempts = 0; env.SendSync(new NFake::TEvExecute{ new TTxFullScan(failedAttempts) }); - UNIT_ASSERT_GE(failedAttempts, 20); // old parts aren't sticky before restart + UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 0); // parts become sticky soon after it's enabled // restart tablet env.SendSync(new TEvents::TEvPoison, false, true); @@ -6174,7 +6174,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorStickyPages) { int failedAttempts = 0; env.SendSync(new NFake::TEvExecute{ new TTxFullScan(failedAttempts) }); - UNIT_ASSERT_GE(failedAttempts, 20); // old parts aren't sticky before restart + UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 0); // parts become sticky soon after it's enabled // restart tablet env.SendSync(new TEvents::TEvPoison, false, true); diff --git a/ydb/core/ydb_convert/column_families.h b/ydb/core/ydb_convert/column_families.h index 5e3ffc6c6ea4..809399b5f2f0 100644 --- a/ydb/core/ydb_convert/column_families.h +++ b/ydb/core/ydb_convert/column_families.h @@ -183,13 +183,15 @@ namespace NKikimr { case Ydb::FeatureFlag::STATUS_UNSPECIFIED: break; case Ydb::FeatureFlag::ENABLED: - *code = Ydb::StatusIds::BAD_REQUEST; - *error = TStringBuilder() - << "Setting keep_in_memory to ENABLED is not supported in column family '" - << familySettings.name() << "'"; - return false; + if (!AppData()->FeatureFlags.GetEnablePublicApiKeepInMemory()) { + *code = Ydb::StatusIds::BAD_REQUEST; + *error = "Setting keep_in_memory to ENABLED is not allowed"; + return false; + } + family->SetColumnCache(NKikimrSchemeOp::ColumnCacheEver); + break; case Ydb::FeatureFlag::DISABLED: - family->ClearColumnCache(); + family->SetColumnCache(NKikimrSchemeOp::ColumnCacheNone); break; default: *code = Ydb::StatusIds::BAD_REQUEST; diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 4f461f920607..ca4a031cb063 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -1115,6 +1115,11 @@ TColumnFamilyBuilder& TColumnFamilyBuilder::SetCompression(EColumnFamilyCompress return *this; } +TColumnFamilyBuilder& TColumnFamilyBuilder::SetKeepInMemory(bool enabled) { + Impl_->Proto.set_keep_in_memory(enabled ? Ydb::FeatureFlag::ENABLED : Ydb::FeatureFlag::DISABLED); + return *this; +} + TColumnFamilyDescription TColumnFamilyBuilder::Build() const { return TColumnFamilyDescription(Impl_->Proto); } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index b58b99a70f4c..f885240587ee 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -806,6 +806,7 @@ class TColumnFamilyBuilder { TColumnFamilyBuilder& SetData(const TString& media); TColumnFamilyBuilder& SetCompression(EColumnFamilyCompression compression); + TColumnFamilyBuilder& SetKeepInMemory(bool enabled); TColumnFamilyDescription Build() const; @@ -868,6 +869,11 @@ class TTableColumnFamilyBuilder { return *this; } + TTableColumnFamilyBuilder& SetKeepInMemory(bool enabled) { + Builder_.SetKeepInMemory(enabled); + return *this; + } + TTableBuilder& EndColumnFamily(); private: @@ -1486,6 +1492,11 @@ class TAlterColumnFamilyBuilder { return *this; } + TAlterColumnFamilyBuilder& SetKeepInMemory(bool enabled) { + Builder_.SetKeepInMemory(enabled); + return *this; + } + TAlterTableSettings& EndAddColumnFamily(); TAlterTableSettings& EndAlterColumnFamily(); diff --git a/ydb/services/ydb/ydb_ut.cpp b/ydb/services/ydb/ydb_ut.cpp index b0123b1affdd..ab61b98f4e04 100644 --- a/ydb/services/ydb/ydb_ut.cpp +++ b/ydb/services/ydb/ydb_ut.cpp @@ -824,6 +824,87 @@ Y_UNIT_TEST_SUITE(TGRpcNewClient) { client.CreateSession().Apply(createSessionHandler).Wait(); UNIT_ASSERT(done); } + + Y_UNIT_TEST(InMemoryTables) { + TKikimrWithGrpcAndRootSchemaNoSystemViews server; + server.Server_->GetRuntime()->GetAppData().FeatureFlags.SetEnablePublicApiKeepInMemory(true); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + + auto connection = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + auto client = NYdb::NTable::TTableClient(connection); + auto createSessionResult = client.CreateSession().ExtractValueSync(); + UNIT_ASSERT(!createSessionResult.IsTransportError()); + auto session = createSessionResult.GetSession(); + + auto createTableResult = session.CreateTable("/Root/Table", client.GetTableBuilder() + .AddNullableColumn("Key", EPrimitiveType::Int32) + .AddNullableColumn("Value", EPrimitiveType::String) + .SetPrimaryKeyColumn("Key") + // Note: only needed because this test doesn't initial table profiles + .BeginStorageSettings() + .SetTabletCommitLog0("ssd") + .SetTabletCommitLog1("ssd") + .EndStorageSettings() + .BeginColumnFamily("default") + .SetData("ssd") + .SetKeepInMemory(true) + .EndColumnFamily() + .Build()).ExtractValueSync(); + UNIT_ASSERT_C(createTableResult.IsSuccess(), (NYdb::TStatus&)createTableResult); + + { + auto describeTableResult = session.DescribeTable("/Root/Table").ExtractValueSync(); + UNIT_ASSERT_C(describeTableResult.IsSuccess(), (NYdb::TStatus&)describeTableResult); + auto desc = describeTableResult.GetTableDescription(); + auto families = desc.GetColumnFamilies(); + UNIT_ASSERT_VALUES_EQUAL(families.size(), 1u); + auto family = families.at(0); + UNIT_ASSERT_VALUES_EQUAL(family.GetKeepInMemory(), true); + } + + { + auto alterTableResult = session.AlterTable("/Root/Table", NYdb::NTable::TAlterTableSettings() + .BeginAlterColumnFamily("default") + .SetKeepInMemory(false) + .EndAlterColumnFamily()).ExtractValueSync(); + UNIT_ASSERT_C(alterTableResult.IsSuccess(), (NYdb::TStatus&)alterTableResult); + } + + { + auto describeTableResult = session.DescribeTable("/Root/Table").ExtractValueSync(); + UNIT_ASSERT_C(describeTableResult.IsSuccess(), (NYdb::TStatus&)describeTableResult); + auto desc = describeTableResult.GetTableDescription(); + auto families = desc.GetColumnFamilies(); + UNIT_ASSERT_VALUES_EQUAL(families.size(), 1u); + auto family = families.at(0); + // Note: server cannot currently distinguish between implicitly + // unset and explicitly disabled, so it returns the former. + UNIT_ASSERT_VALUES_EQUAL(family.GetKeepInMemory(), Nothing()); + } + + { + auto alterTableResult = session.AlterTable("/Root/Table", NYdb::NTable::TAlterTableSettings() + .BeginAlterColumnFamily("default") + .SetKeepInMemory(true) + .EndAlterColumnFamily()).ExtractValueSync(); + UNIT_ASSERT_C(alterTableResult.IsSuccess(), (NYdb::TStatus&)alterTableResult); + } + + { + auto describeTableResult = session.DescribeTable("/Root/Table").ExtractValueSync(); + UNIT_ASSERT_C(describeTableResult.IsSuccess(), (NYdb::TStatus&)describeTableResult); + auto desc = describeTableResult.GetTableDescription(); + auto families = desc.GetColumnFamilies(); + UNIT_ASSERT_VALUES_EQUAL(families.size(), 1u); + auto family = families.at(0); + UNIT_ASSERT_VALUES_EQUAL(family.GetKeepInMemory(), true); + } + } } static TString CreateSession(std::shared_ptr channel) {