Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient: introduce txnWriteBuffer interceptor #139742

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to
kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions application
kv.transaction.randomized_anchor_key.enabled boolean false dictates whether a transactions anchor key is randomized or not application
kv.transaction.reject_over_max_intents_budget.enabled boolean false if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed application
kv.transaction.write_buffering.enabled boolean false if enabled, transactional writes are buffered on the client application
kv.transaction.write_pipelining.locking_reads.enabled boolean true if enabled, transactional locking reads are pipelined through Raft consensus application
kv.transaction.write_pipelining.ranged_writes.enabled boolean true if enabled, transactional ranged writes are pipelined through Raft consensus application
kv.transaction.write_pipelining.enabled (alias: kv.transaction.write_pipelining_enabled) boolean true if enabled, transactional writes are pipelined through Raft consensus application
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
<tr><td><div id="setting-kv-transaction-max-refresh-spans-bytes" class="anchored"><code>kv.transaction.max_refresh_spans_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-randomized-anchor-key-enabled" class="anchored"><code>kv.transaction.randomized_anchor_key.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>dictates whether a transactions anchor key is randomized or not</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-reject-over-max-intents-budget-enabled" class="anchored"><code>kv.transaction.reject_over_max_intents_budget.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-buffering-enabled" class="anchored"><code>kv.transaction.write_buffering.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if enabled, transactional writes are buffered on the client</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-pipelining-locking-reads-enabled" class="anchored"><code>kv.transaction.write_pipelining.locking_reads.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional locking reads are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-pipelining-ranged-writes-enabled" class="anchored"><code>kv.transaction.write_pipelining.ranged_writes.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional ranged writes are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-pipelining-enabled" class="anchored"><code>kv.transaction.write_pipelining.enabled<br />(alias: kv.transaction.write_pipelining_enabled)</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"txn_interceptor_pipeliner.go",
"txn_interceptor_seq_num_allocator.go",
"txn_interceptor_span_refresher.go",
"txn_interceptor_write_buffer.go",
"txn_lock_gatekeeper.go",
"txn_metrics.go",
":gen-txnstate-stringer", # keep
Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,10 @@ type TxnCoordSender struct {
// additional heap allocations necessary.
interceptorStack []txnInterceptor
interceptorAlloc struct {
arr [6]txnInterceptor
arr [7]txnInterceptor
txnHeartbeater
txnSeqNumAllocator
txnWriteBuffer
txnPipeliner
txnCommitter
txnSpanRefresher
Expand Down Expand Up @@ -275,6 +276,10 @@ func newRootTxnCoordSender(
// Various interceptors below rely on sequence number allocation,
// so the sequence number allocator is near the top of the stack.
&tcs.interceptorAlloc.txnSeqNumAllocator,
// The write buffer sits above the pipeliner to ensure it doesn't need to
// know how to handle QueryIntentRequests, as those are only generated (and
// handled) by the pipeliner.
&tcs.interceptorAlloc.txnWriteBuffer,
// The pipeliner sits above the span refresher because it will
// never generate transaction retry errors that could be avoided
// with a refresh.
Expand Down Expand Up @@ -312,6 +317,9 @@ func (tc *TxnCoordSender) initCommonInterceptors(
if ds, ok := tcf.wrapped.(*DistSender); ok {
riGen.ds = ds
}
tc.interceptorAlloc.txnWriteBuffer = txnWriteBuffer{
enabled: BufferedWritesEnabled.Get(&tcf.st.SV),
}
tc.interceptorAlloc.txnPipeliner = txnPipeliner{
st: tcf.st,
riGen: riGen,
Expand Down
143 changes: 143 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package kvcoord

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
)

// BufferedWritesEnabled is used to enable write buffering.
var BufferedWritesEnabled = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.transaction.write_buffering.enabled",
"if enabled, transactional writes are buffered on the client",
false,
settings.WithPublic,
)

// txnWriteBuffer is a txnInterceptor that buffers transactional writes until
// commit time. Moreover, it also decomposes read-write KV operations (e.g.
// CPuts, InitPuts) into separate (locking) read and write operations, buffering
// the latter until commit time.
//
// Buffering writes until commit time has four main benefits:
//
// 1. It allows for more batching of writes, which can be more efficient.
// Instead of sending write batches one at a time, we can batch all write
// batches and send the in a single batch at commit time. This is a win even if
// writes would otherwise be pipelined through raft.
//
// 2. It allows for the elimination of redundant writes. If a client writes to
// the same key multiple times in a transaction, only the last write needs to be
// written to the KV layer.
//
// 3. It allows the client to serve read-your-own-writes locally, which can be
// much faster and cheaper than sending them to the leaseholder. This is
// especially true when the leaseholder isn't colocated with the client.
//
// By serving read-your-own-writes locally from the gateway, write buffering
// also avoids the problem of pipeline stalls that can occur when a client reads
// a pipelined write before it has finished replicating through raft. For
// details on pipeline stalls, see txnPipeliner.
//
// 4. It allows clients to passively hit the 1-phase commit fast path, instead
// of requiring clients to carefully construct "auto-commit" BatchRequests to
// make use of the optimization. By buffering writes on the client before
// commit, we avoid immediately disabling the fast path when the client issues
// their first write. Then, at commit time, we flush the buffer and will happen
// to hit the 1-phase commit fast path if all writes end up going to the same
// range.
//
// However, buffering writes comes with some challenges.
//
// The first challenge is that read-only requests need to be aware of any
// buffered writes, as they may need to serve some reads from the buffer
// instead of the KV layer (read-your-own-writes).
//
// Similarly, any read-write requests, such as CPuts, that we decompose into
// separate read and write operations, need to be aware of any buffered writes
// that may affect their read half. The read portion must be served from the
// buffer instead of the KV-layer if the key has already been written to
// previously. However, we aren't guaranteed to have acquired a corresponding
// lock on the key if a buffered write exists for the key -- as such, we must
// still send a locking read request to the KV layer to acquire a lock.
//
// The picture is further complicated when distributed execution is introduced
// into the mix. A read request that is distributed by the client also needs to
// be aware of the write buffer. As such, when constructing a leaf transaction
// to serve a distributed execution read, we must also ship[1] the write buffer
// along.
//
// The second challenge is around the handling of savepoints. In particular,
// when a savepoint is rolled back, we must clear out any writes that happened
// after the (now) rolled back savepoint. This means that if a key is written to
// multiple times in a transaction, we must retain all writes to the key. Note
// that this is only required when the transaction is in progress -- once the
// transaction is ready to commit, only the last value written to the key needs
// to be flushed to the KV layer.
//
// The third challenge is around memory limits and preventing OOMs. As writes
// are buffered in-memory, per-transaction, we need to be careful not to OOM
// nodes by buffering too many writes. To that end, a per-transaction memory
// limit on the write buffer must be enforced. If this limit is exceeded, no
// more writes are buffered, and the buffer (either in its entirety or
// partially[2]) must be force flushed. Force flushing entails sending all
// buffered writes to the KV layer prematurely (i.e. before commit time).
//
// [1] Instead of shipping the entire write buffer, we can constrain this to
// just the portion that overlaps with the span of the read request that's being
// distributed for evaluation by the client.
//
// [2] The decision to flush the buffer in its entirety vs. partially is a
// tradeoff. Flushing the entire buffer is simpler and frees up more memory.
// Flushing the buffer partially preserves some (all but the fourth) of the
// listed benefits of buffering writes for the unflushed portion of the buffer.
type txnWriteBuffer struct {
enabled bool

wrapped lockedSender
}

func (twb *txnWriteBuffer) SendLocked(
ctx context.Context, ba *kvpb.BatchRequest,
) (*kvpb.BatchResponse, *kvpb.Error) {
if !twb.enabled {
return twb.wrapped.SendLocked(ctx, ba)
}
panic("unimplemented")
}

// setWrapped implements the txnInterceptor interface.
func (twb *txnWriteBuffer) setWrapped(wrapped lockedSender) {
twb.wrapped = wrapped
}

// populateLeafInputState is part of the txnInterceptor interface.
func (twb *txnWriteBuffer) populateLeafInputState(*roachpb.LeafTxnInputState) {}

// populateLeafFinalState is part of the txnInterceptor interface.
func (twb *txnWriteBuffer) populateLeafFinalState(*roachpb.LeafTxnFinalState) {}

// importLeafFinalState is part of the txnInterceptor interface.
func (twb *txnWriteBuffer) importLeafFinalState(context.Context, *roachpb.LeafTxnFinalState) error {
return nil
}

// epochBumpedLocked implements the txnInterceptor interface.
func (twb *txnWriteBuffer) epochBumpedLocked() {}

// createSavepointLocked is part of the txnInterceptor interface.
func (twb *txnWriteBuffer) createSavepointLocked(context.Context, *savepoint) {}

// rollbackToSavepointLocked is part of the txnInterceptor interface.
func (twb *txnWriteBuffer) rollbackToSavepointLocked(ctx context.Context, s savepoint) {}

// closeLocked implements the txnInterceptor interface.
func (twb *txnWriteBuffer) closeLocked() {}
Loading