From 2fdc190932c6ac93a63c412cd2fa0e263a5bca7a Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 13 Dec 2024 10:14:36 -0500 Subject: [PATCH] kv/kvserver: misc additional support for prepared transactions Informs #22329. Adds support for prepared transactions to MVCC GC, rangefeeds, and a few other places. Release note: None --- docs/generated/metrics/metrics.html | 1 + pkg/kv/kvserver/gc/gc.go | 3 +++ pkg/kv/kvserver/metrics.go | 8 ++++++++ pkg/kv/kvserver/mvcc_gc_queue.go | 1 + pkg/kv/kvserver/rangefeed/task.go | 2 +- pkg/kv/kvserver/replica.go | 6 +++--- 6 files changed, 17 insertions(+), 4 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index d0e5d87e846a..56abe4b6177e 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -403,6 +403,7 @@ STORAGEqueue.gc.info.transactionspangcabortedNumber of GC'able entries corresponding to aborted txnsTxn EntriesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEqueue.gc.info.transactionspangccommittedNumber of GC'able entries corresponding to committed txnsTxn EntriesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEqueue.gc.info.transactionspangcpendingNumber of GC'able entries corresponding to pending txnsTxn EntriesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +STORAGEqueue.gc.info.transactionspangcpreparedNumber of GC'able entries corresponding to prepared txnsTxn EntriesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEqueue.gc.info.transactionspangcstagingNumber of GC'able entries corresponding to staging txnsTxn EntriesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEqueue.gc.info.transactionspanscannedNumber of entries in transaction spans scanned from the engineTxn EntriesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEqueue.gc.pendingNumber of pending replicas in the MVCC GC queueReplicasGAUGECOUNTAVGNONE diff --git a/pkg/kv/kvserver/gc/gc.go b/pkg/kv/kvserver/gc/gc.go index ddeab75b8f73..ecc974f3571f 100644 --- a/pkg/kv/kvserver/gc/gc.go +++ b/pkg/kv/kvserver/gc/gc.go @@ -222,6 +222,7 @@ type Info struct { // potentially necessary intent resolutions did not fail). TransactionSpanGCAborted, TransactionSpanGCCommitted int TransactionSpanGCStaging, TransactionSpanGCPending int + TransactionSpanGCPrepared int // AbortSpanTotal is the total number of transactions present in the AbortSpan. AbortSpanTotal int // AbortSpanConsidered is the number of AbortSpan entries old enough to be @@ -1217,6 +1218,8 @@ func processLocalKeyRange( switch txn.Status { case roachpb.PENDING: info.TransactionSpanGCPending++ + case roachpb.PREPARED: + info.TransactionSpanGCPrepared++ case roachpb.STAGING: info.TransactionSpanGCStaging++ case roachpb.ABORTED: diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 24db14f5fa10..fb0c598a133e 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -2122,6 +2122,12 @@ The messages are dropped to help these replicas to recover from I/O overload.`, Measurement: "Txn Entries", Unit: metric.Unit_COUNT, } + metaGCTransactionSpanGCPrepared = metric.Metadata{ + Name: "queue.gc.info.transactionspangcprepared", + Help: "Number of GC'able entries corresponding to prepared txns", + Measurement: "Txn Entries", + Unit: metric.Unit_COUNT, + } metaGCAbortSpanScanned = metric.Metadata{ Name: "queue.gc.info.abortspanscanned", Help: "Number of transactions present in the AbortSpan scanned from the engine", @@ -2904,6 +2910,7 @@ type StoreMetrics struct { GCTransactionSpanGCCommitted *metric.Counter GCTransactionSpanGCStaging *metric.Counter GCTransactionSpanGCPending *metric.Counter + GCTransactionSpanGCPrepared *metric.Counter GCAbortSpanScanned *metric.Counter GCAbortSpanConsidered *metric.Counter GCAbortSpanGCNum *metric.Counter @@ -3672,6 +3679,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { GCTransactionSpanGCCommitted: metric.NewCounter(metaGCTransactionSpanGCCommitted), GCTransactionSpanGCStaging: metric.NewCounter(metaGCTransactionSpanGCStaging), GCTransactionSpanGCPending: metric.NewCounter(metaGCTransactionSpanGCPending), + GCTransactionSpanGCPrepared: metric.NewCounter(metaGCTransactionSpanGCPrepared), GCAbortSpanScanned: metric.NewCounter(metaGCAbortSpanScanned), GCAbortSpanConsidered: metric.NewCounter(metaGCAbortSpanConsidered), GCAbortSpanGCNum: metric.NewCounter(metaGCAbortSpanGCNum), diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index 9c222869654c..49461adc1dfa 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -841,6 +841,7 @@ func updateStoreMetricsWithGCInfo(metrics *StoreMetrics, info gc.Info) { metrics.GCTransactionSpanGCCommitted.Inc(int64(info.TransactionSpanGCCommitted)) metrics.GCTransactionSpanGCStaging.Inc(int64(info.TransactionSpanGCStaging)) metrics.GCTransactionSpanGCPending.Inc(int64(info.TransactionSpanGCPending)) + metrics.GCTransactionSpanGCPrepared.Inc(int64(info.TransactionSpanGCPrepared)) metrics.GCAbortSpanScanned.Inc(int64(info.AbortSpanTotal)) metrics.GCAbortSpanConsidered.Inc(int64(info.AbortSpanConsidered)) metrics.GCAbortSpanGCNum.Inc(int64(info.AbortSpanGCNum)) diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index 75cc9ab34aeb..ed5c01a2a7f7 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -275,7 +275,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { var intentsToCleanup []roachpb.LockUpdate for i, txn := range pushedTxns { switch txn.Status { - case roachpb.PENDING, roachpb.STAGING: + case roachpb.PENDING, roachpb.PREPARED, roachpb.STAGING: // The transaction is still in progress but its timestamp was moved // forward to the current time. Inform the Processor that it can // forward the txn's timestamp in its unresolvedIntentQueue. diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 75fe749c7449..ac67104b30fb 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -2334,9 +2334,6 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { var mergeCommitted bool switch pushTxnRes.PusheeTxn.Status { - case roachpb.PENDING, roachpb.STAGING: - log.Fatalf(ctx, "PushTxn returned while merge transaction %s was still %s", - intentRes.Intent.Txn.ID.Short(), pushTxnRes.PusheeTxn.Status) case roachpb.COMMITTED: // If PushTxn claims that the transaction committed, then the transaction // definitely committed. @@ -2391,6 +2388,9 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { mergeCommitted = true } } + default: + log.Fatalf(ctx, "PushTxn returned while merge transaction %s was still %s", + intentRes.Intent.Txn.ID.Short(), pushTxnRes.PusheeTxn.Status) } r.raftMu.Lock() r.readOnlyCmdMu.Lock()