-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathreplica_read.go
213 lines (191 loc) · 8.56 KB
/
replica_read.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver
import (
"context"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/kr/pretty"
)
// executeReadOnlyBatch is the execution logic for client requests which do not
// mutate the range's replicated state. The method uses a single RocksDB
// iterator to evaluate the batch and then updates the timestamp cache to
// reflect the key spans that it read.
func (r *Replica) executeReadOnlyBatch(
ctx context.Context, ba *roachpb.BatchRequest, st kvserverpb.LeaseStatus, g *concurrency.Guard,
) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) {
r.readOnlyCmdMu.RLock()
defer r.readOnlyCmdMu.RUnlock()
// Verify that the batch can be executed.
if err := r.checkExecutionCanProceed(ba, g, &st); err != nil {
return nil, g, roachpb.NewError(err)
}
// Evaluate read-only batch command.
spans := g.LatchSpans()
rec := NewReplicaEvalContext(r, spans)
// TODO(irfansharif): It's unfortunate that in this read-only code path,
// we're stuck with a ReadWriter because of the way evaluateBatch is
// designed.
rw := r.store.Engine().NewReadOnly()
if util.RaceEnabled {
rw = spanset.NewReadWriterAt(rw, spans, ba.Timestamp)
}
defer rw.Close()
// TODO(nvanbenschoten): once all replicated intents are pulled into the
// concurrency manager's lock-table, we can be sure that if we reached this
// point, we will not conflict with any of them during evaluation. This in
// turn means that we can bump the timestamp cache *before* evaluation
// without risk of starving writes. Once we start doing that, we're free to
// release latches immediately after we acquire an engine iterator as long
// as we're performing a non-locking read.
var result result.Result
br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, spans)
// If the request hit a server-side concurrency retry error, immediately
// proagate the error. Don't assume ownership of the concurrency guard.
if isConcurrencyRetryError(pErr) {
return nil, g, pErr
}
// Handle any local (leaseholder-only) side-effects of the request.
intents := result.Local.DetachEncounteredIntents()
if pErr == nil {
pErr = r.handleReadOnlyLocalEvalResult(ctx, ba, result.Local)
}
// Otherwise, update the timestamp cache and release the concurrency guard.
ec, g := endCmds{repl: r, g: g}, nil
ec.done(ctx, ba, br, pErr)
// Semi-synchronously process any intents that need resolving here in
// order to apply back pressure on the client which generated them. The
// resolution is semi-synchronous in that there is a limited number of
// outstanding asynchronous resolution tasks allowed after which
// further calls will block.
if len(intents) > 0 {
log.Eventf(ctx, "submitting %d intents to asynchronous processing", len(intents))
// We only allow synchronous intent resolution for consistent requests.
// Intent resolution is async/best-effort for inconsistent requests.
//
// An important case where this logic is necessary is for RangeLookup
// requests. In their case, synchronous intent resolution can deadlock
// if the request originated from the local node which means the local
// range descriptor cache has an in-flight RangeLookup request which
// prohibits any concurrent requests for the same range. See #17760.
allowSyncProcessing := ba.ReadConsistency == roachpb.CONSISTENT
if err := r.store.intentResolver.CleanupIntentsAsync(ctx, intents, allowSyncProcessing); err != nil {
log.Warningf(ctx, "%v", err)
}
}
if pErr != nil {
log.VErrEventf(ctx, 3, "%v", pErr.String())
} else {
log.Event(ctx, "read completed")
}
return br, nil, pErr
}
// executeReadOnlyBatchWithServersideRefreshes invokes evaluateBatch and retries
// at a higher timestamp in the event of some retriable errors if allowed by the
// batch/txn.
func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
ctx context.Context,
rw storage.ReadWriter,
rec batcheval.EvalContext,
ba *roachpb.BatchRequest,
latchSpans *spanset.SpanSet,
) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) {
log.Event(ctx, "executing read-only batch")
untrack := func(_ context.Context, _ ctpb.Epoch, _ roachpb.RangeID, _ ctpb.LAI) {}
if ba.IsSingleSubsumeRequest() {
// We start tracking SubsumeRequests as part of our guarantee to never
// broadcast a closed timestamp entry for a range that is in the subsumed
// state. See comment block below for details.
_, untrack = r.store.cfg.ClosedTimestamp.Tracker.Track(ctx)
}
for retries := 0; ; retries++ {
if retries > 0 {
log.VEventf(ctx, 2, "server-side retry of batch")
}
br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, true /* readOnly */)
// If we can retry, set a higher batch timestamp and continue.
// Allow one retry only.
if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, latchSpans, nil /* deadline */) {
break
}
}
if pErr != nil {
// Failed read-only batches can't have any Result except for what's
// allowlisted here.
res.Local = result.LocalResult{
EncounteredIntents: res.Local.DetachEncounteredIntents(),
Metrics: res.Local.Metrics,
}
untrack(ctx, 0, 0, 0)
return nil, res, pErr
}
r.mu.Lock()
// We prevent followers of the RHS from being able to serve follower reads on
// timestamps that fall in the timestamp window representing the range's
// subsumed state (i.e. between the subsumption time (FreezeStart) and the
// timestamp at which the merge transaction commits or aborts), by requiring
// follower replicas to catch up to an MLAI that succeeds the range's current
// LeaseAppliedIndex. In case the merge successfully commits, this MLAI will
// never be caught up to since the RHS will be destroyed. In case the merge
// aborts, this ensures that the followers can only activate the newer closed
// timestamps once they catch up to the LAI associated with the merge abort.
// We need to do this because the closed timestamps that are broadcast by RHS
// in this subsumed state are not going to be reflected in the timestamp cache
// of the LHS range after the merge, which can cause a serializability
// violation.
//
// NB: The above statement relies on the invariant that the LAI that follows a
// Subsume request will be applied only after the merge aborts, as the RHS
// replicas will get destroyed if the merge successfully commits. This
// invariant is upheld because the only Raft proposals allowed after a range
// has been subsumed are lease requests, which do not bump the LAI.
untrack(ctx, ctpb.Epoch(r.mu.state.Lease.Epoch), r.RangeID, ctpb.LAI(r.mu.state.LeaseAppliedIndex+1))
r.mu.Unlock()
return br, res, nil
}
func (r *Replica) handleReadOnlyLocalEvalResult(
ctx context.Context, ba *roachpb.BatchRequest, lResult result.LocalResult,
) *roachpb.Error {
// Fields for which no action is taken in this method are zeroed so that
// they don't trigger an assertion at the end of the method (which checks
// that all fields were handled).
{
lResult.Reply = nil
}
if lResult.AcquiredLocks != nil {
// These will all be unreplicated locks.
for i := range lResult.AcquiredLocks {
r.concMgr.OnLockAcquired(ctx, &lResult.AcquiredLocks[i])
}
lResult.AcquiredLocks = nil
}
if lResult.MaybeWatchForMerge {
// A merge is (likely) about to be carried out, and this replica needs
// to block all traffic until the merge either commits or aborts. See
// docs/tech-notes/range-merges.md.
if err := r.maybeWatchForMerge(ctx); err != nil {
return roachpb.NewError(err)
}
lResult.MaybeWatchForMerge = false
}
if !lResult.IsZero() {
log.Fatalf(ctx, "unhandled field in LocalEvalResult: %s", pretty.Diff(lResult, result.LocalResult{}))
}
return nil
}