Skip to content

Commit

Permalink
Add db counters for uncommitted changes and suspicious commits (#12966)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Dec 25, 2024
1 parent 1049a71 commit 57f6f63
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 8 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/counters_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ enum ECumulativeCounters {
COUNTER_WRITE_BYTES = 111 [(CounterOpts) = {Name: "WriteBytes"}];
COUNTER_WRITE_DISK_SPACE_EXHAUSTED = 112 [(CounterOpts) = {Name: "WriteDiskSpaceExhausted"}];
COUNTER_PREPARE_DISK_SPACE_EXHAUSTED = 113 [(CounterOpts) = {Name: "PrepareSpaceExhausted"}];
COUNTER_REMOVED_COMMITTED_TXS = 114 [(CounterOpts) = {Name: "RemovedCommittedTxs"}];
}

enum EPercentileCounters {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tablet_flat/flat_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,11 @@ const TDbStats& TDatabase::Counters() const noexcept
return DatabaseImpl->Stats;
}

TDbRuntimeStats TDatabase::RuntimeCounters() const noexcept
{
return DatabaseImpl->GetRuntimeStats();
}

void TDatabase::UpdateApproximateFreeSharesByChannel(const THashMap<ui32, float>& approximateFreeSpaceShareByChannel)
{
for (auto& [channel, value] : approximateFreeSpaceShareByChannel) {
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tablet_flat/flat_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class TDatabase {
public:
using TMemGlobs = TVector<NPageCollection::TMemGlob>;
using TCookieAllocator = NPageCollection::TCookieAllocator;
using TCounters = TDbStats;

struct TProd {
THolder<TChange> Change;
Expand Down Expand Up @@ -221,7 +220,9 @@ class TDatabase {
ui64 GetTableIndexSize(ui32 table) const;
ui64 GetTableSearchHeight(ui32 table) const;
ui64 EstimateRowSize(ui32 table) const;
const TCounters& Counters() const noexcept;
const TDbStats& Counters() const noexcept;
TDbRuntimeStats RuntimeCounters() const noexcept;

void UpdateApproximateFreeSharesByChannel(const THashMap<ui32, float>& approximateFreeSpaceShareByChannel);
TString SnapshotToLog(ui32 table, TTxStamp);

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tablet_flat/flat_dbase_misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ namespace NTable {
THashMap<ui32, float> NormalizedFreeSpaceShareByChannel;
};

using TDbRuntimeStats = TTableRuntimeStats;

}
}
10 changes: 10 additions & 0 deletions ydb/core/tablet_flat/flat_dbase_naked.h
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,16 @@ namespace NTable {
}
}

public:
TDbRuntimeStats GetRuntimeStats() const {
TDbRuntimeStats stats;
for (auto& pr : Tables) {
// TODO: use a lazy aggregate to balance many idle tables vs frequent updates
stats += pr.second->RuntimeStats();
}
return stats;
}

private:
const TIntrusivePtr<TKeyRangeCacheNeedGCList> GCList;
const TTxStamp Weak; /* db bootstrap upper stamp */
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3565,6 +3565,16 @@ void TExecutor::UpdateCounters(const TActorContext &ctx) {
Counters->Simple()[TExecutorCounters::USED_TABLET_MEMORY].Set(UsedTabletMemory);
}

// Runtime stats related to uncommitted changes
auto runtimeCounters = Database->RuntimeCounters();
{
Counters->Simple()[TExecutorCounters::DB_OPEN_TX_COUNT].Set(runtimeCounters.OpenTxCount);
Counters->Simple()[TExecutorCounters::DB_TXS_WITH_DATA_COUNT].Set(runtimeCounters.TxsWithDataCount);
Counters->Simple()[TExecutorCounters::DB_COMMITTED_TX_COUNT].Set(runtimeCounters.CommittedTxCount);
Counters->Simple()[TExecutorCounters::DB_REMOVED_TX_COUNT].Set(runtimeCounters.RemovedTxCount);
Counters->Simple()[TExecutorCounters::DB_REMOVED_COMMITTED_TXS].Set(runtimeCounters.RemovedCommittedTxs);
}

if (CommitManager) /* exists only on leader, mostly storage usage data */ {
auto redo = LogicRedo->LogStats();
Counters->Simple()[TExecutorCounters::LOG_REDO_COUNT].Set(redo.Items);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tablet_flat/flat_executor_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ namespace NTabletFlatExecutor {
XX(DB_FLAT_INDEX_BYTES, "DbFlatIndexBytes") \
XX(DB_B_TREE_INDEX_BYTES, "DbBTreeIndexBytes") \
XX(CACHE_TOTAL_USED, "CacheTotalUsed") \
XX(DB_OPEN_TX_COUNT, "DbOpenTxCount") \
XX(DB_TXS_WITH_DATA_COUNT, "DbTxsWithDataCount") \
XX(DB_COMMITTED_TX_COUNT, "DbCommittedTxCount") \
XX(DB_REMOVED_TX_COUNT, "DbRemovedTxCount") \
XX(DB_REMOVED_COMMITTED_TXS, "DbRemovedCommittedTxs") \

// don't change order!
#define FLAT_EXECUTOR_CUMULATIVE_COUNTERS_MAP(XX) \
Expand Down
46 changes: 44 additions & 2 deletions ydb/core/tablet_flat/flat_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,11 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
if (const auto* prev = CommittedTransactions.Find(txId); Y_LIKELY(!prev) || *prev > rowVersion) {
CommittedTransactions.Add(txId, rowVersion);
if (!prev) {
RemovedTransactions.Remove(txId);
if (RemovedTransactions.Remove(txId)) {
// Transaction was in a removed set and now it's committed
// This is not an error in some cases, but may be suspicious
RemovedCommittedTxs++;
}
}
}
if (!TxRefs.contains(txId)) {
Expand All @@ -645,6 +649,10 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
const ui64 txId = item.GetTxId();
if (const auto* prev = CommittedTransactions.Find(txId); Y_LIKELY(!prev)) {
RemovedTransactions.Add(txId);
} else {
// Transaction is in a committed set but also removed
// This is not an error in some cases, but may be suspicious
RemovedCommittedTxs++;
}
if (!TxRefs.contains(txId)) {
CheckTransactions.insert(txId);
Expand Down Expand Up @@ -944,7 +952,11 @@ void TTable::CommitTx(ui64 txId, TRowVersion rowVersion)
if (RollbackState && RemovedTransactions.Contains(txId)) {
RollbackOps.emplace_back(TRollbackAddRemovedTx{ txId });
}
RemovedTransactions.Remove(txId);
if (RemovedTransactions.Remove(txId)) {
// Transaction was in a removed set and now it's committed
// This is not an error in some cases, but may be suspicious
RemovedCommittedTxs++;
}
}
if (auto it = OpenTxs.find(txId); it != OpenTxs.end()) {
if (RollbackState) {
Expand Down Expand Up @@ -982,6 +994,10 @@ void TTable::RemoveTx(ui64 txId)
}
OpenTxs.erase(it);
}
} else {
// Transaction is in a committed set but also removed
// This is not an error in some cases, but may be suspicious
RemovedCommittedTxs++;
}
}

Expand Down Expand Up @@ -1015,6 +1031,32 @@ size_t TTable::GetOpenTxCount() const
return OpenTxs.size();
}

size_t TTable::GetTxsWithDataCount() const
{
return TxRefs.size();
}

size_t TTable::GetCommittedTxCount() const
{
return CommittedTransactions.Size();
}

size_t TTable::GetRemovedTxCount() const
{
return RemovedTransactions.Size();
}

TTableRuntimeStats TTable::RuntimeStats() const noexcept
{
return TTableRuntimeStats{
.OpenTxCount = OpenTxs.size(),
.TxsWithDataCount = TxRefs.size(),
.CommittedTxCount = CommittedTransactions.Size(),
.RemovedTxCount = RemovedTransactions.Size(),
.RemovedCommittedTxs = RemovedCommittedTxs,
};
}

TMemTable& TTable::MemTable()
{
if (!Mutable) {
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tablet_flat/flat_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ class TTable: public TAtomicRefCount<TTable> {

const absl::flat_hash_set<ui64>& GetOpenTxs() const;
size_t GetOpenTxCount() const;
size_t GetTxsWithDataCount() const;
size_t GetCommittedTxCount() const;
size_t GetRemovedTxCount() const;

TPartView GetPartView(const TLogoBlobID &bundle) const
{
Expand Down Expand Up @@ -240,6 +243,8 @@ class TTable: public TAtomicRefCount<TTable> {
return Stat_;
}

TTableRuntimeStats RuntimeStats() const noexcept;

ui64 GetMemSize(TEpoch epoch = TEpoch::Max()) const noexcept
{
if (Y_LIKELY(epoch == TEpoch::Max())) {
Expand Down Expand Up @@ -364,6 +369,8 @@ class TTable: public TAtomicRefCount<TTable> {
TTransactionSet DecidedTransactions;
TIntrusivePtr<ITableObserver> TableObserver;

ui64 RemovedCommittedTxs = 0;

private:
struct TRollbackRemoveTxRef {
ui64 TxId;
Expand Down
30 changes: 26 additions & 4 deletions ydb/core/tablet_flat/flat_table_committed.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,20 @@ namespace NTable {
Unshare()[txId] = value;
}

void Remove(ui64 txId) {
bool Remove(ui64 txId) {
if (State_ && State_->contains(txId)) {
Unshare().erase(txId);
return true;
} else {
return false;
}
}

size_t Size() const {
if (State_) {
return State_->size();
} else {
return 0;
}
}

Expand Down Expand Up @@ -345,13 +356,24 @@ namespace NTable {
State_.Reset();
}

void Add(ui64 txId) {
Unshare().insert(txId);
bool Add(ui64 txId) {
return Unshare().insert(txId).second;
}

void Remove(ui64 txId) {
bool Remove(ui64 txId) {
if (State_ && State_->contains(txId)) {
Unshare().erase(txId);
return true;
} else {
return false;
}
}

size_t Size() const {
if (State_) {
return State_->size();
} else {
return 0;
}
}

Expand Down
26 changes: 26 additions & 0 deletions ydb/core/tablet_flat/flat_table_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,31 @@ namespace NTable {
ui64 MemDataWaste = 0;
};

struct TTableRuntimeStats {
ui64 OpenTxCount = 0;
ui64 TxsWithDataCount = 0;
ui64 CommittedTxCount = 0;
ui64 RemovedTxCount = 0;
ui64 RemovedCommittedTxs = 0;

TTableRuntimeStats& operator+=(const TTableRuntimeStats& s) noexcept {
OpenTxCount += s.OpenTxCount;
TxsWithDataCount += s.TxsWithDataCount;
CommittedTxCount += s.CommittedTxCount;
RemovedTxCount += s.RemovedTxCount;
RemovedCommittedTxs += s.RemovedCommittedTxs;
return *this;
}

TTableRuntimeStats& operator-=(const TTableRuntimeStats& s) noexcept {
OpenTxCount -= s.OpenTxCount;
TxsWithDataCount -= s.TxsWithDataCount;
CommittedTxCount -= s.CommittedTxCount;
RemovedTxCount -= s.RemovedTxCount;
RemovedCommittedTxs -= s.RemovedCommittedTxs;
return *this;
}
};

}
}
5 changes: 5 additions & 0 deletions ydb/core/tx/datashard/datashard_user_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ void TDataShardUserDb::CommitChanges(const TTableId& tableId, ui64 lockId, const
Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << Self.TabletID());

if (!Db.HasOpenTx(localTid, lockId)) {
if (Db.HasRemovedTx(localTid, lockId)) {
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Committing removed changes lockId# " << lockId << " tid# " << localTid << " shard# " << Self.TabletID());
Self.IncCounter(COUNTER_REMOVED_COMMITTED_TXS);
}
return;
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/volatile_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ namespace NKikimr::NDataShard {
if (txc.DB.HasOpenTx(tid, commitTxId)) {
txc.DB.CommitTx(tid, commitTxId, info->Version);
Self->GetConflictsCache().GetTableCache(tid).RemoveUncommittedWrites(commitTxId, txc.DB);
} else if (txc.DB.HasRemovedTx(tid, commitTxId)) {
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Committing removed changes txId# " << commitTxId << " tid# " << tid << " shard# " << Self->TabletID());
Self->IncCounter(COUNTER_REMOVED_COMMITTED_TXS);
}
}
}
Expand Down

0 comments on commit 57f6f63

Please sign in to comment.