-
Notifications
You must be signed in to change notification settings - Fork 5.9k
/
Copy pathbackend.go
315 lines (279 loc) · 10.4 KB
/
backend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingest
import (
"context"
"fmt"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
tikv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/common"
lightning "github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/errormanager"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
// BackendCtx is the backend context for add index reorg task.
type BackendCtx interface {
Register(jobID, indexID int64, schemaName, tableName string) (Engine, error)
Unregister(jobID, indexID int64)
CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error
FinishImport(indexID int64, unique bool, tbl table.Table) error
ResetWorkers(jobID int64)
Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error)
Done() bool
SetDone()
AttachCheckpointManager(*CheckpointManager)
GetCheckpointManager() *CheckpointManager
GetLocalBackend() *local.Backend
}
// FlushMode is used to control how to flush.
type FlushMode byte
const (
// FlushModeAuto means caller does not enforce any flush, the implementation can
// decide it.
FlushModeAuto FlushMode = iota
// FlushModeForceFlushNoImport means flush all data to local storage, but don't
// import the data to TiKV.
FlushModeForceFlushNoImport
// FlushModeForceFlushAndImport means flush and import all data to TiKV.
FlushModeForceFlushAndImport
)
// litBackendCtx store a backend info for add index reorg task.
type litBackendCtx struct {
generic.SyncMap[int64, *engineInfo]
MemRoot MemRoot
DiskRoot DiskRoot
jobID int64
backend *local.Backend
ctx context.Context
cfg *lightning.Config
sysVars map[string]string
diskRoot DiskRoot
done bool
timeOfLastFlush atomicutil.Time
updateInterval time.Duration
checkpointMgr *CheckpointManager
etcdClient *clientv3.Client
}
func (bc *litBackendCtx) handleErrorAfterCollectRemoteDuplicateRows(err error, indexID int64, tbl table.Table, hasDupe bool) error {
if err != nil && !common.ErrFoundIndexConflictRecords.Equal(err) {
logutil.Logger(bc.ctx).Error(LitInfoRemoteDupCheck, zap.Error(err),
zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID))
return errors.Trace(err)
} else if hasDupe {
logutil.Logger(bc.ctx).Error(LitErrRemoteDupExistErr,
zap.String("table", tbl.Meta().Name.O), zap.Int64("index ID", indexID))
if common.ErrFoundIndexConflictRecords.Equal(err) {
tErr, ok := errors.Cause(err).(*terror.Error)
if !ok {
return errors.Trace(tikv.ErrKeyExists)
}
if len(tErr.Args()) != 4 {
return errors.Trace(tikv.ErrKeyExists)
}
indexName := tErr.Args()[1]
valueStr := tErr.Args()[2]
return errors.Trace(tikv.ErrKeyExists.FastGenByArgs(valueStr, indexName))
}
return errors.Trace(tikv.ErrKeyExists)
}
return nil
}
// CollectRemoteDuplicateRows collects duplicate rows from remote TiKV.
func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error {
errorMgr := errormanager.New(nil, bc.cfg, log.Logger{Logger: logutil.Logger(bc.ctx)})
// backend must be a local backend.
dupeController := bc.backend.GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr)
hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
IndexID: indexID,
}, lightning.ErrorOnDup)
return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe)
}
// FinishImport imports all the key-values in engine into the storage, collects the duplicate errors if any, and
// removes the engine from the backend context.
func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Table) error {
ei, exist := bc.Load(indexID)
if !exist {
return dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found")
}
err := ei.ImportAndClean()
if err != nil {
return err
}
failpoint.Inject("mockFinishImportErr", func() {
failpoint.Return(fmt.Errorf("mock finish import error"))
})
// Check remote duplicate value for the index.
if unique {
errorMgr := errormanager.New(nil, bc.cfg, log.Logger{Logger: logutil.Logger(bc.ctx)})
// backend must be a local backend.
// todo: when we can separate local backend completely from tidb backend, will remove this cast.
//nolint:forcetypeassert
dupeController := bc.backend.GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr)
hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
IndexID: ei.indexID,
}, lightning.ErrorOnDup)
return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe)
}
return nil
}
func acquireLock(ctx context.Context, se *concurrency.Session, key string) (*concurrency.Mutex, error) {
mu := concurrency.NewMutex(se, key)
err := mu.Lock(ctx)
if err != nil {
return nil, err
}
return mu, nil
}
// Flush checks the disk quota and imports the current key-values in engine to the storage.
func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) {
ei, exist := bc.Load(indexID)
if !exist {
logutil.Logger(bc.ctx).Error(LitErrGetEngineFail, zap.Int64("index ID", indexID))
return false, false, dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found")
}
shouldFlush, shouldImport := bc.checkFlush(mode)
if !shouldFlush {
return false, false, nil
}
if !ei.flushing.CompareAndSwap(false, true) {
return false, false, nil
}
defer ei.flushing.Store(false)
ei.flushLock.Lock()
defer ei.flushLock.Unlock()
err = ei.Flush()
if err != nil {
return false, false, err
}
bc.timeOfLastFlush.Store(time.Now())
if !shouldImport {
return true, false, nil
}
// Use distributed lock if run in distributed mode).
if bc.etcdClient != nil {
distLockKey := fmt.Sprintf("/tidb/distributeLock/%d/%d", bc.jobID, indexID)
se, _ := concurrency.NewSession(bc.etcdClient)
mu, err := acquireLock(bc.ctx, se, distLockKey)
if err != nil {
return true, false, errors.Trace(err)
}
logutil.Logger(bc.ctx).Info("acquire distributed flush lock success", zap.Int64("jobID", bc.jobID))
defer func() {
err = mu.Unlock(bc.ctx)
if err != nil {
logutil.Logger(bc.ctx).Warn("release distributed flush lock error", zap.Error(err), zap.Int64("jobID", bc.jobID))
} else {
logutil.Logger(bc.ctx).Info("release distributed flush lock success", zap.Int64("jobID", bc.jobID))
}
err = se.Close()
if err != nil {
logutil.Logger(bc.ctx).Warn("close session error", zap.Error(err))
}
}()
}
err = bc.unsafeImportAndReset(ei)
if err != nil {
return true, false, err
}
return true, true, nil
}
func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
logutil.Logger(bc.ctx).Info(LitInfoUnsafeImport, zap.Int64("index ID", ei.indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
logger := log.FromContext(bc.ctx).With(
zap.Stringer("engineUUID", ei.uuid),
)
ei.closedEngine = backend.NewClosedEngine(bc.backend, logger, ei.uuid, 0)
regionSplitSize := int64(lightning.SplitRegionSize) * int64(lightning.MaxSplitRegionSizeRatio)
regionSplitKeys := int64(lightning.SplitRegionKeys)
if err := ei.closedEngine.Import(bc.ctx, regionSplitSize, regionSplitKeys); err != nil {
logutil.Logger(bc.ctx).Error(LitErrIngestDataErr, zap.Int64("index ID", ei.indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
return err
}
err := bc.backend.ResetEngine(bc.ctx, ei.uuid)
if err != nil {
logutil.Logger(bc.ctx).Error(LitErrResetEngineFail, zap.Int64("index ID", ei.indexID))
err1 := ei.closedEngine.Cleanup(bc.ctx)
if err1 != nil {
logutil.Logger(ei.ctx).Error(LitErrCleanEngineErr, zap.Error(err1),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
ei.openedEngine = nil
ei.closedEngine = nil
return err
}
return nil
}
// ForceSyncFlagForTest is a flag to force sync only for test.
var ForceSyncFlagForTest = false
func (bc *litBackendCtx) checkFlush(mode FlushMode) (shouldFlush bool, shouldImport bool) {
if mode == FlushModeForceFlushAndImport || ForceSyncFlagForTest {
return true, true
}
if mode == FlushModeForceFlushNoImport {
return true, false
}
bc.diskRoot.UpdateUsage()
shouldImport = bc.diskRoot.ShouldImport()
interval := bc.updateInterval
// This failpoint will be manually set through HTTP status port.
failpoint.Inject("mockSyncIntervalMs", func(val failpoint.Value) {
if v, ok := val.(int); ok {
interval = time.Duration(v) * time.Millisecond
}
})
shouldFlush = shouldImport ||
time.Since(bc.timeOfLastFlush.Load()) >= interval
return shouldFlush, shouldImport
}
// Done returns true if the lightning backfill is done.
func (bc *litBackendCtx) Done() bool {
return bc.done
}
// SetDone sets the done flag.
func (bc *litBackendCtx) SetDone() {
bc.done = true
}
// AttachCheckpointManager attaches a checkpoint manager to the backend context.
func (bc *litBackendCtx) AttachCheckpointManager(mgr *CheckpointManager) {
bc.checkpointMgr = mgr
}
// GetCheckpointManager returns the checkpoint manager attached to the backend context.
func (bc *litBackendCtx) GetCheckpointManager() *CheckpointManager {
return bc.checkpointMgr
}
// GetLocalBackend returns the local backend.
func (bc *litBackendCtx) GetLocalBackend() *local.Backend {
return bc.backend
}