Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add db counters for uncommitted changes and suspicious commits #12966

Merged
merged 2 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/counters_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ enum ECumulativeCounters {
COUNTER_WRITE_BYTES = 111 [(CounterOpts) = {Name: "WriteBytes"}];
COUNTER_WRITE_DISK_SPACE_EXHAUSTED = 112 [(CounterOpts) = {Name: "WriteDiskSpaceExhausted"}];
COUNTER_PREPARE_DISK_SPACE_EXHAUSTED = 113 [(CounterOpts) = {Name: "PrepareSpaceExhausted"}];
COUNTER_REMOVED_COMMITTED_TXS = 114 [(CounterOpts) = {Name: "RemovedCommittedTxs"}];
}

enum EPercentileCounters {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tablet_flat/flat_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,11 @@ const TDbStats& TDatabase::Counters() const noexcept
return DatabaseImpl->Stats;
}

TDbRuntimeStats TDatabase::RuntimeCounters() const noexcept
{
return DatabaseImpl->GetRuntimeStats();
}

void TDatabase::UpdateApproximateFreeSharesByChannel(const THashMap<ui32, float>& approximateFreeSpaceShareByChannel)
{
for (auto& [channel, value] : approximateFreeSpaceShareByChannel) {
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tablet_flat/flat_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class TDatabase {
public:
using TMemGlobs = TVector<NPageCollection::TMemGlob>;
using TCookieAllocator = NPageCollection::TCookieAllocator;
using TCounters = TDbStats;

struct TProd {
THolder<TChange> Change;
Expand Down Expand Up @@ -221,7 +220,9 @@ class TDatabase {
ui64 GetTableIndexSize(ui32 table) const;
ui64 GetTableSearchHeight(ui32 table) const;
ui64 EstimateRowSize(ui32 table) const;
const TCounters& Counters() const noexcept;
const TDbStats& Counters() const noexcept;
TDbRuntimeStats RuntimeCounters() const noexcept;

void UpdateApproximateFreeSharesByChannel(const THashMap<ui32, float>& approximateFreeSpaceShareByChannel);
TString SnapshotToLog(ui32 table, TTxStamp);

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tablet_flat/flat_dbase_misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ namespace NTable {
THashMap<ui32, float> NormalizedFreeSpaceShareByChannel;
};

using TDbRuntimeStats = TTableRuntimeStats;

}
}
10 changes: 10 additions & 0 deletions ydb/core/tablet_flat/flat_dbase_naked.h
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,16 @@ namespace NTable {
}
}

public:
TDbRuntimeStats GetRuntimeStats() const {
TDbRuntimeStats stats;
for (auto& pr : Tables) {
// TODO: use a lazy aggregate to balance many idle tables vs frequent updates
stats += pr.second->RuntimeStats();
}
return stats;
}

private:
const TIntrusivePtr<TKeyRangeCacheNeedGCList> GCList;
const TTxStamp Weak; /* db bootstrap upper stamp */
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3565,6 +3565,16 @@ void TExecutor::UpdateCounters(const TActorContext &ctx) {
Counters->Simple()[TExecutorCounters::USED_TABLET_MEMORY].Set(UsedTabletMemory);
}

// Runtime stats related to uncommitted changes
auto runtimeCounters = Database->RuntimeCounters();
{
Counters->Simple()[TExecutorCounters::DB_OPEN_TX_COUNT].Set(runtimeCounters.OpenTxCount);
Counters->Simple()[TExecutorCounters::DB_TXS_WITH_DATA_COUNT].Set(runtimeCounters.TxsWithDataCount);
Counters->Simple()[TExecutorCounters::DB_COMMITTED_TX_COUNT].Set(runtimeCounters.CommittedTxCount);
Counters->Simple()[TExecutorCounters::DB_REMOVED_TX_COUNT].Set(runtimeCounters.RemovedTxCount);
Counters->Simple()[TExecutorCounters::DB_REMOVED_COMMITTED_TXS].Set(runtimeCounters.RemovedCommittedTxs);
}

if (CommitManager) /* exists only on leader, mostly storage usage data */ {
auto redo = LogicRedo->LogStats();
Counters->Simple()[TExecutorCounters::LOG_REDO_COUNT].Set(redo.Items);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tablet_flat/flat_executor_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ namespace NTabletFlatExecutor {
XX(DB_FLAT_INDEX_BYTES, "DbFlatIndexBytes") \
XX(DB_B_TREE_INDEX_BYTES, "DbBTreeIndexBytes") \
XX(CACHE_TOTAL_USED, "CacheTotalUsed") \
XX(DB_OPEN_TX_COUNT, "DbOpenTxCount") \
XX(DB_TXS_WITH_DATA_COUNT, "DbTxsWithDataCount") \
XX(DB_COMMITTED_TX_COUNT, "DbCommittedTxCount") \
XX(DB_REMOVED_TX_COUNT, "DbRemovedTxCount") \
XX(DB_REMOVED_COMMITTED_TXS, "DbRemovedCommittedTxs") \

// don't change order!
#define FLAT_EXECUTOR_CUMULATIVE_COUNTERS_MAP(XX) \
Expand Down
46 changes: 44 additions & 2 deletions ydb/core/tablet_flat/flat_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,11 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
if (const auto* prev = CommittedTransactions.Find(txId); Y_LIKELY(!prev) || *prev > rowVersion) {
CommittedTransactions.Add(txId, rowVersion);
if (!prev) {
RemovedTransactions.Remove(txId);
if (RemovedTransactions.Remove(txId)) {
// Transaction was in a removed set and now it's committed
// This is not an error in some cases, but may be suspicious
RemovedCommittedTxs++;
}
}
}
if (!TxRefs.contains(txId)) {
Expand All @@ -645,6 +649,10 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
const ui64 txId = item.GetTxId();
if (const auto* prev = CommittedTransactions.Find(txId); Y_LIKELY(!prev)) {
RemovedTransactions.Add(txId);
} else {
// Transaction is in a committed set but also removed
// This is not an error in some cases, but may be suspicious
RemovedCommittedTxs++;
}
if (!TxRefs.contains(txId)) {
CheckTransactions.insert(txId);
Expand Down Expand Up @@ -944,7 +952,11 @@ void TTable::CommitTx(ui64 txId, TRowVersion rowVersion)
if (RollbackState && RemovedTransactions.Contains(txId)) {
RollbackOps.emplace_back(TRollbackAddRemovedTx{ txId });
}
RemovedTransactions.Remove(txId);
if (RemovedTransactions.Remove(txId)) {
// Transaction was in a removed set and now it's committed
// This is not an error in some cases, but may be suspicious
RemovedCommittedTxs++;
}
}
if (auto it = OpenTxs.find(txId); it != OpenTxs.end()) {
if (RollbackState) {
Expand Down Expand Up @@ -982,6 +994,10 @@ void TTable::RemoveTx(ui64 txId)
}
OpenTxs.erase(it);
}
} else {
// Transaction is in a committed set but also removed
// This is not an error in some cases, but may be suspicious
RemovedCommittedTxs++;
}
}

Expand Down Expand Up @@ -1015,6 +1031,32 @@ size_t TTable::GetOpenTxCount() const
return OpenTxs.size();
}

size_t TTable::GetTxsWithDataCount() const
{
return TxRefs.size();
}

size_t TTable::GetCommittedTxCount() const
{
return CommittedTransactions.Size();
}

size_t TTable::GetRemovedTxCount() const
{
return RemovedTransactions.Size();
}

TTableRuntimeStats TTable::RuntimeStats() const noexcept
{
return TTableRuntimeStats{
.OpenTxCount = OpenTxs.size(),
.TxsWithDataCount = TxRefs.size(),
.CommittedTxCount = CommittedTransactions.Size(),
.RemovedTxCount = RemovedTransactions.Size(),
.RemovedCommittedTxs = RemovedCommittedTxs,
};
}

TMemTable& TTable::MemTable()
{
if (!Mutable) {
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tablet_flat/flat_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ class TTable: public TAtomicRefCount<TTable> {

const absl::flat_hash_set<ui64>& GetOpenTxs() const;
size_t GetOpenTxCount() const;
size_t GetTxsWithDataCount() const;
size_t GetCommittedTxCount() const;
size_t GetRemovedTxCount() const;

TPartView GetPartView(const TLogoBlobID &bundle) const
{
Expand Down Expand Up @@ -240,6 +243,8 @@ class TTable: public TAtomicRefCount<TTable> {
return Stat_;
}

TTableRuntimeStats RuntimeStats() const noexcept;

ui64 GetMemSize(TEpoch epoch = TEpoch::Max()) const noexcept
{
if (Y_LIKELY(epoch == TEpoch::Max())) {
Expand Down Expand Up @@ -364,6 +369,8 @@ class TTable: public TAtomicRefCount<TTable> {
TTransactionSet DecidedTransactions;
TIntrusivePtr<ITableObserver> TableObserver;

ui64 RemovedCommittedTxs = 0;

private:
struct TRollbackRemoveTxRef {
ui64 TxId;
Expand Down
30 changes: 26 additions & 4 deletions ydb/core/tablet_flat/flat_table_committed.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,20 @@ namespace NTable {
Unshare()[txId] = value;
}

void Remove(ui64 txId) {
bool Remove(ui64 txId) {
if (State_ && State_->contains(txId)) {
Unshare().erase(txId);
return true;
} else {
return false;
}
}

size_t Size() const {
if (State_) {
return State_->size();
} else {
return 0;
}
}

Expand Down Expand Up @@ -345,13 +356,24 @@ namespace NTable {
State_.Reset();
}

void Add(ui64 txId) {
Unshare().insert(txId);
bool Add(ui64 txId) {
return Unshare().insert(txId).second;
}

void Remove(ui64 txId) {
bool Remove(ui64 txId) {
if (State_ && State_->contains(txId)) {
Unshare().erase(txId);
return true;
} else {
return false;
}
}

size_t Size() const {
if (State_) {
return State_->size();
} else {
return 0;
}
}

Expand Down
26 changes: 26 additions & 0 deletions ydb/core/tablet_flat/flat_table_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,31 @@ namespace NTable {
ui64 MemDataWaste = 0;
};

struct TTableRuntimeStats {
ui64 OpenTxCount = 0;
ui64 TxsWithDataCount = 0;
ui64 CommittedTxCount = 0;
ui64 RemovedTxCount = 0;
ui64 RemovedCommittedTxs = 0;

TTableRuntimeStats& operator+=(const TTableRuntimeStats& s) noexcept {
OpenTxCount += s.OpenTxCount;
TxsWithDataCount += s.TxsWithDataCount;
CommittedTxCount += s.CommittedTxCount;
RemovedTxCount += s.RemovedTxCount;
RemovedCommittedTxs += s.RemovedCommittedTxs;
return *this;
}

TTableRuntimeStats& operator-=(const TTableRuntimeStats& s) noexcept {
OpenTxCount -= s.OpenTxCount;
TxsWithDataCount -= s.TxsWithDataCount;
CommittedTxCount -= s.CommittedTxCount;
RemovedTxCount -= s.RemovedTxCount;
RemovedCommittedTxs -= s.RemovedCommittedTxs;
return *this;
}
};

}
}
5 changes: 5 additions & 0 deletions ydb/core/tx/datashard/datashard_user_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ void TDataShardUserDb::CommitChanges(const TTableId& tableId, ui64 lockId, const
Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << Self.TabletID());

if (!Db.HasOpenTx(localTid, lockId)) {
if (Db.HasRemovedTx(localTid, lockId)) {
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Committing removed changes lockId# " << lockId << " tid# " << localTid << " shard# " << Self.TabletID());
Self.IncCounter(COUNTER_REMOVED_COMMITTED_TXS);
}
return;
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/volatile_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ namespace NKikimr::NDataShard {
if (txc.DB.HasOpenTx(tid, commitTxId)) {
txc.DB.CommitTx(tid, commitTxId, info->Version);
Self->GetConflictsCache().GetTableCache(tid).RemoveUncommittedWrites(commitTxId, txc.DB);
} else if (txc.DB.HasRemovedTx(tid, commitTxId)) {
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Committing removed changes txId# " << commitTxId << " tid# " << tid << " shard# " << Self->TabletID());
Self->IncCounter(COUNTER_REMOVED_COMMITTED_TXS);
}
}
}
Expand Down
Loading