Skip to content

Commit

Permalink
Make it possible to change in-memory setting for tables (#12099)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Nov 29, 2024
1 parent b03bee3 commit 38aef7d
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 15 deletions.
3 changes: 3 additions & 0 deletions ydb/core/tablet_flat/flat_dbase_apply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ bool TSchemeModifier::Apply(const TAlterRecord &delta)
ui32 large = delta.HasLarge() ? delta.GetLarge() : family.Large;

Y_ABORT_UNLESS(ui32(cache) <= 2, "Invalid pages cache policy value");
if (family.Cache != cache && cache == ECache::Ever) {
ChangeTableSetting(table, tableInfo.PendingCacheEnable, true);
}
changes |= ChangeTableSetting(table, family.Cache, cache);
changes |= ChangeTableSetting(table, family.Codec, codec);
changes |= ChangeTableSetting(table, family.Small, small);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tablet_flat/flat_dbase_scheme.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class TScheme {
bool EraseCacheEnabled = false;
ui32 EraseCacheMinRows = 0; // 0 means use default
ui32 EraseCacheMaxBytes = 0; // 0 means use default

// When true this table has an in-memory caching enabled that has not been processed yet
mutable bool PendingCacheEnable = false;
};

struct TRedo {
Expand Down
22 changes: 16 additions & 6 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,8 @@ void TExecutor::ActivateFollower(const TActorContext &ctx) {
RecreatePageCollectionsCache();
ReflectSchemeSettings();

RequestInMemPagesForDatabase();

Become(&TThis::StateFollower);
Stats->IsActive = true;
Stats->FollowerId = FollowerId;
Expand Down Expand Up @@ -647,16 +649,21 @@ void TExecutor::TranslateCacheTouchesToSharedCache() {
Send(MakeSharedPageCacheId(), new NSharedCache::TEvTouch(std::move(touches)));
}

void TExecutor::RequestInMemPagesForDatabase() {
const auto &scheme = Scheme();
for (auto &sxpair : scheme.Tables) {
auto stickyColumns = GetStickyColumns(sxpair.first);
void TExecutor::RequestInMemPagesForDatabase(bool pendingOnly) {
const auto& scheme = Scheme();
for (auto& pr : scheme.Tables) {
const ui32 tid = pr.first;
if (pendingOnly && !pr.second.PendingCacheEnable) {
continue;
}
auto stickyColumns = GetStickyColumns(tid);
if (stickyColumns) {
auto subset = Database->Subset(sxpair.first, NTable::TEpoch::Max(), { } , { });
auto subset = Database->Subset(tid, NTable::TEpoch::Max(), { } , { });

for (auto &partView: subset->Flatten)
RequestInMemPagesForPartStore(sxpair.first, partView, stickyColumns);
RequestInMemPagesForPartStore(tid, partView, stickyColumns);
}
pr.second.PendingCacheEnable = false;
}
}

Expand Down Expand Up @@ -968,6 +975,7 @@ void TExecutor::ApplyFollowerUpdate(THolder<TEvTablet::TFUpdateBody> update) {
if (schemeUpdate) {
ReadResourceProfile();
ReflectSchemeSettings();
RequestInMemPagesForDatabase(/* pendingOnly */ true);
Owner->OnFollowerSchemaUpdated();
}

Expand Down Expand Up @@ -1345,6 +1353,7 @@ void TExecutor::RequestInMemPagesForPartStore(ui32 tableId, const NTable::TPartV
for (ui32 pageId : req->Pages)
PrivatePageCache->MarkSticky(pageId, info);

// TODO: only request missing pages
RequestFromSharedCache(req, NBlockIO::EPriority::Bkgr, EPageCollectionRequest::CacheSync);
}
}
Expand Down Expand Up @@ -2065,6 +2074,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv

ReadResourceProfile();
ReflectSchemeSettings();
RequestInMemPagesForDatabase(/* pendingOnly */ true);

// For every table that changed strategy we need to generate a
// special part switch that notifies bootlogic about new strategy
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ class TExecutor
void DropSingleCache(const TLogoBlobID&) noexcept;

void TranslateCacheTouchesToSharedCache();
void RequestInMemPagesForDatabase();
void RequestInMemPagesForDatabase(bool pendingOnly = false);
void RequestInMemPagesForPartStore(ui32 tableId, const NTable::TPartView &partView, const THashSet<NTable::TTag> &stickyColumns);
THashSet<NTable::TTag> GetStickyColumns(ui32 tableId);
void RequestFromSharedCache(TAutoPtr<NPageCollection::TFetch> fetch,
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tablet_flat/flat_executor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6140,7 +6140,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorStickyPages) {

int failedAttempts = 0;
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(failedAttempts) });
UNIT_ASSERT_GE(failedAttempts, 20); // old parts aren't sticky before restart
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 0); // parts become sticky soon after it's enabled

// restart tablet
env.SendSync(new TEvents::TEvPoison, false, true);
Expand Down Expand Up @@ -6174,7 +6174,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorStickyPages) {

int failedAttempts = 0;
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(failedAttempts) });
UNIT_ASSERT_GE(failedAttempts, 20); // old parts aren't sticky before restart
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 0); // parts become sticky soon after it's enabled

// restart tablet
env.SendSync(new TEvents::TEvPoison, false, true);
Expand Down
14 changes: 8 additions & 6 deletions ydb/core/ydb_convert/column_families.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,15 @@ namespace NKikimr {
case Ydb::FeatureFlag::STATUS_UNSPECIFIED:
break;
case Ydb::FeatureFlag::ENABLED:
*code = Ydb::StatusIds::BAD_REQUEST;
*error = TStringBuilder()
<< "Setting keep_in_memory to ENABLED is not supported in column family '"
<< familySettings.name() << "'";
return false;
if (!AppData()->FeatureFlags.GetEnablePublicApiKeepInMemory()) {
*code = Ydb::StatusIds::BAD_REQUEST;
*error = "Setting keep_in_memory to ENABLED is not allowed";
return false;
}
family->SetColumnCache(NKikimrSchemeOp::ColumnCacheEver);
break;
case Ydb::FeatureFlag::DISABLED:
family->ClearColumnCache();
family->SetColumnCache(NKikimrSchemeOp::ColumnCacheNone);
break;
default:
*code = Ydb::StatusIds::BAD_REQUEST;
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_table/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,11 @@ TColumnFamilyBuilder& TColumnFamilyBuilder::SetCompression(EColumnFamilyCompress
return *this;
}

TColumnFamilyBuilder& TColumnFamilyBuilder::SetKeepInMemory(bool enabled) {
Impl_->Proto.set_keep_in_memory(enabled ? Ydb::FeatureFlag::ENABLED : Ydb::FeatureFlag::DISABLED);
return *this;
}

TColumnFamilyDescription TColumnFamilyBuilder::Build() const {
return TColumnFamilyDescription(Impl_->Proto);
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_table/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ class TColumnFamilyBuilder {

TColumnFamilyBuilder& SetData(const TString& media);
TColumnFamilyBuilder& SetCompression(EColumnFamilyCompression compression);
TColumnFamilyBuilder& SetKeepInMemory(bool enabled);

TColumnFamilyDescription Build() const;

Expand Down Expand Up @@ -868,6 +869,11 @@ class TTableColumnFamilyBuilder {
return *this;
}

TTableColumnFamilyBuilder& SetKeepInMemory(bool enabled) {
Builder_.SetKeepInMemory(enabled);
return *this;
}

TTableBuilder& EndColumnFamily();

private:
Expand Down Expand Up @@ -1486,6 +1492,11 @@ class TAlterColumnFamilyBuilder {
return *this;
}

TAlterColumnFamilyBuilder& SetKeepInMemory(bool enabled) {
Builder_.SetKeepInMemory(enabled);
return *this;
}

TAlterTableSettings& EndAddColumnFamily();
TAlterTableSettings& EndAlterColumnFamily();

Expand Down
81 changes: 81 additions & 0 deletions ydb/services/ydb/ydb_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,87 @@ Y_UNIT_TEST_SUITE(TGRpcNewClient) {
client.CreateSession().Apply(createSessionHandler).Wait();
UNIT_ASSERT(done);
}

Y_UNIT_TEST(InMemoryTables) {
TKikimrWithGrpcAndRootSchemaNoSystemViews server;
server.Server_->GetRuntime()->GetAppData().FeatureFlags.SetEnablePublicApiKeepInMemory(true);

ui16 grpc = server.GetPort();
TString location = TStringBuilder() << "localhost:" << grpc;

auto connection = NYdb::TDriver(
TDriverConfig()
.SetEndpoint(location));

auto client = NYdb::NTable::TTableClient(connection);
auto createSessionResult = client.CreateSession().ExtractValueSync();
UNIT_ASSERT(!createSessionResult.IsTransportError());
auto session = createSessionResult.GetSession();

auto createTableResult = session.CreateTable("/Root/Table", client.GetTableBuilder()
.AddNullableColumn("Key", EPrimitiveType::Int32)
.AddNullableColumn("Value", EPrimitiveType::String)
.SetPrimaryKeyColumn("Key")
// Note: only needed because this test doesn't initial table profiles
.BeginStorageSettings()
.SetTabletCommitLog0("ssd")
.SetTabletCommitLog1("ssd")
.EndStorageSettings()
.BeginColumnFamily("default")
.SetData("ssd")
.SetKeepInMemory(true)
.EndColumnFamily()
.Build()).ExtractValueSync();
UNIT_ASSERT_C(createTableResult.IsSuccess(), (NYdb::TStatus&)createTableResult);

{
auto describeTableResult = session.DescribeTable("/Root/Table").ExtractValueSync();
UNIT_ASSERT_C(describeTableResult.IsSuccess(), (NYdb::TStatus&)describeTableResult);
auto desc = describeTableResult.GetTableDescription();
auto families = desc.GetColumnFamilies();
UNIT_ASSERT_VALUES_EQUAL(families.size(), 1u);
auto family = families.at(0);
UNIT_ASSERT_VALUES_EQUAL(family.GetKeepInMemory(), true);
}

{
auto alterTableResult = session.AlterTable("/Root/Table", NYdb::NTable::TAlterTableSettings()
.BeginAlterColumnFamily("default")
.SetKeepInMemory(false)
.EndAlterColumnFamily()).ExtractValueSync();
UNIT_ASSERT_C(alterTableResult.IsSuccess(), (NYdb::TStatus&)alterTableResult);
}

{
auto describeTableResult = session.DescribeTable("/Root/Table").ExtractValueSync();
UNIT_ASSERT_C(describeTableResult.IsSuccess(), (NYdb::TStatus&)describeTableResult);
auto desc = describeTableResult.GetTableDescription();
auto families = desc.GetColumnFamilies();
UNIT_ASSERT_VALUES_EQUAL(families.size(), 1u);
auto family = families.at(0);
// Note: server cannot currently distinguish between implicitly
// unset and explicitly disabled, so it returns the former.
UNIT_ASSERT_VALUES_EQUAL(family.GetKeepInMemory(), Nothing());
}

{
auto alterTableResult = session.AlterTable("/Root/Table", NYdb::NTable::TAlterTableSettings()
.BeginAlterColumnFamily("default")
.SetKeepInMemory(true)
.EndAlterColumnFamily()).ExtractValueSync();
UNIT_ASSERT_C(alterTableResult.IsSuccess(), (NYdb::TStatus&)alterTableResult);
}

{
auto describeTableResult = session.DescribeTable("/Root/Table").ExtractValueSync();
UNIT_ASSERT_C(describeTableResult.IsSuccess(), (NYdb::TStatus&)describeTableResult);
auto desc = describeTableResult.GetTableDescription();
auto families = desc.GetColumnFamilies();
UNIT_ASSERT_VALUES_EQUAL(families.size(), 1u);
auto family = families.at(0);
UNIT_ASSERT_VALUES_EQUAL(family.GetKeepInMemory(), true);
}
}
}

static TString CreateSession(std::shared_ptr<grpc::Channel> channel) {
Expand Down

0 comments on commit 38aef7d

Please sign in to comment.