From d250a3fc161d9710354c432206980e9cb95f25f1 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 11 Apr 2023 12:11:01 +0800 Subject: [PATCH] lightning: refactor to reuse in load data part 5 (#42856) ref pingcap/tidb#40499 --- Makefile | 3 +- br/pkg/lightning/backend/BUILD.bazel | 1 - br/pkg/lightning/backend/backend.go | 229 ++++-------------- br/pkg/lightning/backend/backend_test.go | 130 +++------- br/pkg/lightning/backend/local/BUILD.bazel | 3 + br/pkg/lightning/backend/local/disk_quota.go | 59 +++++ .../backend/local/disk_quota_test.go | 99 ++++++++ br/pkg/lightning/backend/local/engine.go | 2 +- br/pkg/lightning/backend/local/local.go | 109 +++++---- br/pkg/lightning/backend/local/local_test.go | 16 +- br/pkg/lightning/backend/local/localhelper.go | 14 +- .../backend/local/localhelper_test.go | 6 +- br/pkg/lightning/backend/noop/BUILD.bazel | 16 -- br/pkg/lightning/backend/noop/noop.go | 198 --------------- br/pkg/lightning/backend/tidb/tidb.go | 26 +- br/pkg/lightning/backend/tidb/tidb_test.go | 50 ++-- br/pkg/lightning/importer/BUILD.bazel | 1 - br/pkg/lightning/importer/chunk_process.go | 10 +- .../lightning/importer/chunk_process_test.go | 25 +- br/pkg/lightning/importer/import.go | 11 +- .../lightning/importer/restore_schema_test.go | 5 +- br/pkg/lightning/importer/table_import.go | 15 +- .../lightning/importer/table_import_test.go | 43 +++- br/pkg/mock/backend.go | 77 +++--- br/pkg/mock/mocklocal/BUILD.bazel | 12 + br/pkg/mock/mocklocal/local.go | 49 ++++ ddl/ingest/backend.go | 5 +- ddl/ingest/backend_mgr.go | 15 +- ddl/ingest/engine.go | 8 +- ddl/ingest/engine_mgr.go | 4 +- disttask/loaddata/proto.go | 2 +- executor/importer/chunk_process.go | 8 +- executor/importer/engine_process.go | 7 +- executor/importer/table_import.go | 7 +- 34 files changed, 567 insertions(+), 698 deletions(-) create mode 100644 br/pkg/lightning/backend/local/disk_quota.go create mode 100644 br/pkg/lightning/backend/local/disk_quota_test.go delete mode 100644 br/pkg/lightning/backend/noop/BUILD.bazel delete mode 100644 br/pkg/lightning/backend/noop/noop.go create mode 100644 br/pkg/mock/mocklocal/BUILD.bazel create mode 100644 br/pkg/mock/mocklocal/local.go diff --git a/Makefile b/Makefile index 48bedf5a4d59e..1ad3be982367b 100644 --- a/Makefile +++ b/Makefile @@ -334,8 +334,9 @@ mock_s3iface: @mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go mock_lightning: - @mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter,TargetInfoGetter > br/pkg/mock/backend.go + @mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go @mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go + @mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage > br/pkg/mock/mocklocal/local.go # There is no FreeBSD environment for GitHub actions. So cross-compile on Linux # but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have diff --git a/br/pkg/lightning/backend/BUILD.bazel b/br/pkg/lightning/backend/BUILD.bazel index e98f2f9b604db..f595301d8b96c 100644 --- a/br/pkg/lightning/backend/BUILD.bazel +++ b/br/pkg/lightning/backend/BUILD.bazel @@ -16,7 +16,6 @@ go_library( "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", - "@org_golang_x_exp//slices", "@org_uber_go_zap//:zap", ], ) diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index 6763886d28e2e..2e2256715adc4 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -30,39 +30,12 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/parser/model" "go.uber.org/zap" - "golang.org/x/exp/slices" ) const ( importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times. ) -/* - -Usual workflow: - -1. Create a `Backend` for the whole process. - -2. For each table, - - i. Split into multiple "batches" consisting of data files with roughly equal total size. - - ii. For each batch, - - a. Create an `OpenedEngine` via `backend.OpenEngine()` - - b. For each chunk, deliver data into the engine via `engine.WriteRows()` - - c. When all chunks are written, obtain a `ClosedEngine` via `engine.Close()` - - d. Import data via `engine.Import()` - - e. Cleanup via `engine.Cleanup()` - -3. Close the connection via `backend.Close()` - -*/ - func makeTag(tableName string, engineID int32) string { return fmt.Sprintf("%s:%d", tableName, engineID) } @@ -99,7 +72,10 @@ type EngineFileSize struct { // LocalWriterConfig defines the configuration to open a LocalWriter type LocalWriterConfig struct { // is the chunk KV written to this LocalWriter sent in order + // only needed for local backend, can omit for tidb backend IsKVSorted bool + // only needed for tidb backend, can omit for local backend + TableName string } // EngineConfig defines configuration used for open engine @@ -145,10 +121,21 @@ type TargetInfoGetter interface { CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error } -// AbstractBackend is the abstract interface behind Backend. +// Backend defines the interface for a backend. // Implementations of this interface must be goroutine safe: you can share an // instance and execute any method anywhere. -type AbstractBackend interface { +// Usual workflow: +// 1. Create a `Backend` for the whole process. +// 2. For each table, +// i. Split into multiple "batches" consisting of data files with roughly equal total size. +// ii. For each batch, +// a. Create an `OpenedEngine` via `backend.OpenEngine()` +// b. For each chunk, deliver data into the engine via `engine.WriteRows()` +// c. When all chunks are written, obtain a `ClosedEngine` via `engine.Close()` +// d. Import data via `engine.Import()` +// e. Cleanup via `engine.Cleanup()` +// 3. Close the connection via `backend.Close()` +type Backend interface { // Close the connection to the backend. Close() @@ -183,28 +170,22 @@ type AbstractBackend interface { // (e.g. preparing to resolve a disk quota violation). FlushAllEngines(ctx context.Context) error - // EngineFileSizes obtains the size occupied locally of all engines managed - // by this backend. This method is used to compute disk quota. - // It can return nil if the content are all stored remotely. - EngineFileSizes() []EngineFileSize - // ResetEngine clears all written KV pairs in this opened engine. ResetEngine(ctx context.Context, engineUUID uuid.UUID) error // LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine. LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error) - - // TotalMemoryConsume counts total memory usage. This is only used for local backend. - TotalMemoryConsume() int64 } -// Backend is the delivery target for Lightning -type Backend struct { - abstract AbstractBackend +// EngineManager is the manager of engines. +// this is a wrapper of Backend, which provides some common methods for managing engines. +// and it has no states, can be created on demand +type EngineManager struct { + backend Backend } type engine struct { - backend AbstractBackend + backend Backend logger log.Logger uuid uuid.UUID // id of the engine, used to generate uuid and stored in checkpoint @@ -221,106 +202,17 @@ type OpenedEngine struct { config *EngineConfig } -// // import_ the data written to the engine into the target. -// import_(ctx context.Context) error - -// // cleanup deletes the imported data. -// cleanup(ctx context.Context) error - -// ClosedEngine represents a closed engine, allowing ingestion into the target. -// This type is goroutine safe: you can share an instance and execute any method -// anywhere. -type ClosedEngine struct { - engine -} - -// LocalEngineWriter is a thread-local writer for writing rows into a single engine. -type LocalEngineWriter struct { - writer EngineWriter - tableName string -} - -// MakeBackend creates a new Backend from an AbstractBackend. -func MakeBackend(ab AbstractBackend) Backend { - return Backend{abstract: ab} -} - -// Close the connection to the backend. -func (be Backend) Close() { - be.abstract.Close() -} - -// ShouldPostProcess returns whether KV-specific post-processing should be -func (be Backend) ShouldPostProcess() bool { - return be.abstract.ShouldPostProcess() -} - -// FlushAll flushes all opened engines. -func (be Backend) FlushAll(ctx context.Context) error { - return be.abstract.FlushAllEngines(ctx) -} - -// TotalMemoryConsume returns the total memory consumed by the backend. -func (be Backend) TotalMemoryConsume() int64 { - return be.abstract.TotalMemoryConsume() -} - -// CheckDiskQuota verifies if the total engine file size is below the given -// quota. If the quota is exceeded, this method returns an array of engines, -// which after importing can decrease the total size below quota. -func (be Backend) CheckDiskQuota(quota int64) ( - largeEngines []uuid.UUID, - inProgressLargeEngines int, - totalDiskSize int64, - totalMemSize int64, -) { - sizes := be.abstract.EngineFileSizes() - slices.SortFunc(sizes, func(i, j EngineFileSize) bool { - if i.IsImporting != j.IsImporting { - return i.IsImporting - } - return i.DiskSize+i.MemSize < j.DiskSize+j.MemSize - }) - for _, size := range sizes { - totalDiskSize += size.DiskSize - totalMemSize += size.MemSize - if totalDiskSize+totalMemSize > quota { - if size.IsImporting { - inProgressLargeEngines++ - } else { - largeEngines = append(largeEngines, size.UUID) - } - } - } - return -} - -// UnsafeImportAndReset forces the backend to import the content of an engine -// into the target and then reset the engine to empty. This method will not -// close the engine. Make sure the engine is flushed manually before calling -// this method. -func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { - // DO NOT call be.abstract.CloseEngine()! The engine should still be writable after - // calling UnsafeImportAndReset(). - closedEngine := ClosedEngine{ - engine: engine{ - backend: be.abstract, - logger: makeLogger(log.FromContext(ctx), "", engineUUID), - uuid: engineUUID, - }, - } - if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil { - return err - } - return be.abstract.ResetEngine(ctx, engineUUID) +// MakeEngineManager creates a new Backend from an Backend. +func MakeEngineManager(ab Backend) EngineManager { + return EngineManager{backend: ab} } // OpenEngine opens an engine with the given table name and engine ID. -func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) { +func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) { tag, engineUUID := MakeUUID(tableName, engineID) logger := makeLogger(log.FromContext(ctx), tag, engineUUID) - if err := be.abstract.OpenEngine(ctx, config, engineUUID); err != nil { + if err := be.backend.OpenEngine(ctx, config, engineUUID); err != nil { return nil, err } @@ -346,7 +238,7 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam return &OpenedEngine{ engine: engine{ - backend: be.abstract, + backend: be.backend, logger: logger, uuid: engineUUID, id: engineID, @@ -356,11 +248,6 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam }, nil } -// Inner returns the underlying abstract backend. -func (be Backend) Inner() AbstractBackend { - return be.abstract -} - // Close the opened engine to prepare it for importing. func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) { closedEngine, err := engine.unsafeClose(ctx, engine.config) @@ -378,32 +265,8 @@ func (engine *OpenedEngine) Flush(ctx context.Context) error { } // LocalWriter returns a writer that writes to the local backend. -func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (*LocalEngineWriter, error) { - w, err := engine.backend.LocalWriter(ctx, cfg, engine.uuid) - if err != nil { - return nil, err - } - return &LocalEngineWriter{writer: w, tableName: engine.tableName}, nil -} - -// TotalMemoryConsume returns the total memory consumed by the engine. -func (engine *OpenedEngine) TotalMemoryConsume() int64 { - return engine.engine.backend.TotalMemoryConsume() -} - -// WriteRows writes a collection of encoded rows into the engine. -func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows encode.Rows) error { - return w.writer.AppendRows(ctx, w.tableName, columnNames, rows) -} - -// Close closes the engine and returns the status of the engine. -func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) { - return w.writer.Close(ctx) -} - -// IsSynced returns whether the engine is synced. -func (w *LocalEngineWriter) IsSynced() bool { - return w.writer.IsSynced() +func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (EngineWriter, error) { + return engine.backend.LocalWriter(ctx, cfg, engine.uuid) } // UnsafeCloseEngine closes the engine without first opening it. @@ -411,7 +274,7 @@ func (w *LocalEngineWriter) IsSynced() bool { // (Open -> Write -> Close -> Import). This method should only be used when one // knows via other ways that the engine has already been opened, e.g. when // resuming from a checkpoint. -func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error) { +func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error) { tag, engineUUID := MakeUUID(tableName, engineID) return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID, engineID) } @@ -421,9 +284,9 @@ func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tabl // (Open -> Write -> Close -> Import). This method should only be used when one // knows via other ways that the engine has already been opened, e.g. when // resuming from a checkpoint. -func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, id int32) (*ClosedEngine, error) { +func (be EngineManager) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, id int32) (*ClosedEngine, error) { return engine{ - backend: be.abstract, + backend: be.backend, logger: makeLogger(log.FromContext(ctx), tag, engineUUID), uuid: engineUUID, id: id, @@ -445,6 +308,25 @@ func (en engine) GetID() int32 { return en.id } +// ClosedEngine represents a closed engine, allowing ingestion into the target. +// This type is goroutine safe: you can share an instance and execute any method +// anywhere. +type ClosedEngine struct { + engine +} + +// NewClosedEngine creates a new ClosedEngine. +func NewClosedEngine(backend Backend, logger log.Logger, uuid uuid.UUID, id int32) *ClosedEngine { + return &ClosedEngine{ + engine: engine{ + backend: backend, + logger: logger, + uuid: uuid, + id: id, + }, + } +} + // Import the data written to the engine into the target. func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error { var err error @@ -483,12 +365,7 @@ type ChunkFlushStatus interface { // EngineWriter is the interface for writing data to an engine. type EngineWriter interface { - AppendRows( - ctx context.Context, - tableName string, - columnNames []string, - rows encode.Rows, - ) error + AppendRows(ctx context.Context, columnNames []string, rows encode.Rows) error IsSynced() bool Close(ctx context.Context) (ChunkFlushStatus, error) } diff --git a/br/pkg/lightning/backend/backend_test.go b/br/pkg/lightning/backend/backend_test.go index 4b96d1fe6f0ae..c4c62166ab377 100644 --- a/br/pkg/lightning/backend/backend_test.go +++ b/br/pkg/lightning/backend/backend_test.go @@ -22,6 +22,7 @@ type backendSuite struct { controller *gomock.Controller mockBackend *mock.MockBackend encBuilder *mock.MockEncodingBuilder + engineMgr backend.EngineManager backend backend.Backend ts uint64 } @@ -32,7 +33,8 @@ func createBackendSuite(c gomock.TestReporter) *backendSuite { return &backendSuite{ controller: controller, mockBackend: mockBackend, - backend: backend.MakeBackend(mockBackend), + engineMgr: backend.MakeEngineManager(mockBackend), + backend: mockBackend, encBuilder: mock.NewMockEncodingBuilder(controller), ts: oracle.ComposeTS(time.Now().Unix()*1000, 0), } @@ -64,7 +66,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) { Return(nil). After(importCall) - engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) + engine, err := s.engineMgr.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) require.NoError(t, err) closedEngine, err := engine.Close(ctx) require.NoError(t, err) @@ -89,7 +91,7 @@ func TestUnsafeCloseEngine(t *testing.T) { Return(nil). After(closeCall) - closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", -1) + closedEngine, err := s.engineMgr.UnsafeCloseEngine(ctx, nil, "`db`.`table`", -1) require.NoError(t, err) err = closedEngine.Cleanup(ctx) require.NoError(t, err) @@ -110,7 +112,7 @@ func TestUnsafeCloseEngineWithUUID(t *testing.T) { Return(nil). After(closeCall) - closedEngine, err := s.backend.UnsafeCloseEngineWithUUID(ctx, nil, "some_tag", engineUUID, 0) + closedEngine, err := s.engineMgr.UnsafeCloseEngineWithUUID(ctx, nil, "some_tag", engineUUID, 0) require.NoError(t, err) err = closedEngine.Cleanup(ctx) require.NoError(t, err) @@ -135,20 +137,20 @@ func TestWriteEngine(t *testing.T) { s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()). Return(mockWriter, nil).AnyTimes() mockWriter.EXPECT(). - AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, rows1). + AppendRows(ctx, []string{"c1", "c2"}, rows1). Return(nil) mockWriter.EXPECT().Close(ctx).Return(nil, nil).AnyTimes() mockWriter.EXPECT(). - AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, rows2). + AppendRows(ctx, []string{"c1", "c2"}, rows2). Return(nil) - engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) + engine, err := s.engineMgr.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) require.NoError(t, err) - writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{}) + writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`db`.`table`"}) require.NoError(t, err) - err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows1) + err = writer.AppendRows(ctx, []string{"c1", "c2"}, rows1) require.NoError(t, err) - err = writer.WriteRows(ctx, []string{"c1", "c2"}, rows2) + err = writer.AppendRows(ctx, []string{"c1", "c2"}, rows2) require.NoError(t, err) _, err = writer.Close(ctx) require.NoError(t, err) @@ -163,15 +165,15 @@ func TestWriteToEngineWithNothing(t *testing.T) { mockWriter := mock.NewMockEngineWriter(s.controller) s.mockBackend.EXPECT().OpenEngine(ctx, &backend.EngineConfig{}, gomock.Any()).Return(nil) - mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), emptyRows).Return(nil) + mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), emptyRows).Return(nil) mockWriter.EXPECT().Close(ctx).Return(nil, nil) - s.mockBackend.EXPECT().LocalWriter(ctx, &backend.LocalWriterConfig{}, gomock.Any()).Return(mockWriter, nil) + s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()).Return(mockWriter, nil) - engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) + engine, err := s.engineMgr.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) require.NoError(t, err) - writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{}) + writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`db`.`table`"}) require.NoError(t, err) - err = writer.WriteRows(ctx, nil, emptyRows) + err = writer.AppendRows(ctx, nil, emptyRows) require.NoError(t, err) _, err = writer.Close(ctx) require.NoError(t, err) @@ -186,7 +188,7 @@ func TestOpenEngineFailed(t *testing.T) { s.mockBackend.EXPECT().OpenEngine(ctx, &backend.EngineConfig{}, gomock.Any()). Return(errors.New("fake unrecoverable open error")) - _, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) + _, err := s.engineMgr.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) require.EqualError(t, err, "fake unrecoverable open error") } @@ -202,15 +204,15 @@ func TestWriteEngineFailed(t *testing.T) { s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()).Return(mockWriter, nil).AnyTimes() mockWriter.EXPECT(). - AppendRows(ctx, gomock.Any(), gomock.Any(), rows). + AppendRows(ctx, gomock.Any(), rows). Return(errors.Annotate(context.Canceled, "fake unrecoverable write error")) mockWriter.EXPECT().Close(ctx).Return(nil, nil) - engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) + engine, err := s.engineMgr.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) require.NoError(t, err) - writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{}) + writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`db`.`table`"}) require.NoError(t, err) - err = writer.WriteRows(ctx, nil, rows) + err = writer.AppendRows(ctx, nil, rows) require.Error(t, err) require.Regexp(t, "^fake unrecoverable write error", err.Error()) _, err = writer.Close(ctx) @@ -228,16 +230,16 @@ func TestWriteBatchSendFailedWithRetry(t *testing.T) { mockWriter := mock.NewMockEngineWriter(s.controller) s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()).Return(mockWriter, nil).AnyTimes() - mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), rows). + mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), rows). Return(errors.New("fake recoverable write batch error")). MinTimes(1) mockWriter.EXPECT().Close(ctx).Return(nil, nil).MinTimes(1) - engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) + engine, err := s.engineMgr.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1) require.NoError(t, err) - writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{}) + writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`db`.`table`"}) require.NoError(t, err) - err = writer.WriteRows(ctx, nil, rows) + err = writer.AppendRows(ctx, nil, rows) require.Error(t, err) require.Regexp(t, "fake recoverable write batch error$", err.Error()) _, err = writer.Close(ctx) @@ -255,7 +257,7 @@ func TestImportFailedNoRetry(t *testing.T) { ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()). Return(errors.Annotate(context.Canceled, "fake unrecoverable import error")) - closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) + closedEngine, err := s.engineMgr.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) require.NoError(t, err) err = closedEngine.Import(ctx, 1, 1) require.Error(t, err) @@ -275,7 +277,7 @@ func TestImportFailedWithRetry(t *testing.T) { MinTimes(2) s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes() - closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) + closedEngine, err := s.engineMgr.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) require.NoError(t, err) err = closedEngine.Import(ctx, 1, 1) require.Error(t, err) @@ -297,7 +299,7 @@ func TestImportFailedRecovered(t *testing.T) { Return(nil) s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes() - closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) + closedEngine, err := s.engineMgr.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) require.NoError(t, err) err = closedEngine.Import(ctx, 1, 1) require.NoError(t, err) @@ -336,77 +338,3 @@ func TestNewEncoder(t *testing.T) { require.Equal(t, realEncoder, encoder) require.NoError(t, err) } - -func TestCheckDiskQuota(t *testing.T) { - s := createBackendSuite(t) - defer s.tearDownTest() - - uuid1 := uuid.MustParse("11111111-1111-1111-1111-111111111111") - uuid3 := uuid.MustParse("33333333-3333-3333-3333-333333333333") - uuid5 := uuid.MustParse("55555555-5555-5555-5555-555555555555") - uuid7 := uuid.MustParse("77777777-7777-7777-7777-777777777777") - uuid9 := uuid.MustParse("99999999-9999-9999-9999-999999999999") - - fileSizes := []backend.EngineFileSize{ - { - UUID: uuid1, - DiskSize: 1000, - MemSize: 0, - IsImporting: false, - }, - { - UUID: uuid3, - DiskSize: 2000, - MemSize: 1000, - IsImporting: true, - }, - { - UUID: uuid5, - DiskSize: 1500, - MemSize: 3500, - IsImporting: false, - }, - { - UUID: uuid7, - DiskSize: 0, - MemSize: 7000, - IsImporting: true, - }, - { - UUID: uuid9, - DiskSize: 4500, - MemSize: 4500, - IsImporting: false, - }, - } - - s.mockBackend.EXPECT().EngineFileSizes().Return(fileSizes).Times(4) - - // No quota exceeded - le, iple, ds, ms := s.backend.CheckDiskQuota(30000) - require.Len(t, le, 0) - require.Equal(t, 0, iple) - require.Equal(t, int64(9000), ds) - require.Equal(t, int64(16000), ms) - - // Quota exceeded, the largest one is out - le, iple, ds, ms = s.backend.CheckDiskQuota(20000) - require.Equal(t, []uuid.UUID{uuid9}, le) - require.Equal(t, 0, iple) - require.Equal(t, int64(9000), ds) - require.Equal(t, int64(16000), ms) - - // Quota exceeded, the importing one should be ranked least priority - le, iple, ds, ms = s.backend.CheckDiskQuota(12000) - require.Equal(t, []uuid.UUID{uuid5, uuid9}, le) - require.Equal(t, 0, iple) - require.Equal(t, int64(9000), ds) - require.Equal(t, int64(16000), ms) - - // Quota exceeded, the importing ones should not be visible - le, iple, ds, ms = s.backend.CheckDiskQuota(5000) - require.Equal(t, []uuid.UUID{uuid1, uuid5, uuid9}, le) - require.Equal(t, 1, iple) - require.Equal(t, int64(9000), ds) - require.Equal(t, int64(16000), ms) -} diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 301be25a9ff36..feb72837a6b29 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "local", srcs = [ "compress.go", + "disk_quota.go", "duplicate.go", "engine.go", "iterator.go", @@ -91,6 +92,7 @@ go_test( timeout = "short", srcs = [ "compress_test.go", + "disk_quota_test.go", "duplicate_test.go", "engine_test.go", "iterator_test.go", @@ -113,6 +115,7 @@ go_test( "//br/pkg/lightning/mydump", "//br/pkg/membuf", "//br/pkg/mock", + "//br/pkg/mock/mocklocal", "//br/pkg/pdutil", "//br/pkg/restore/split", "//br/pkg/utils", diff --git a/br/pkg/lightning/backend/local/disk_quota.go b/br/pkg/lightning/backend/local/disk_quota.go new file mode 100644 index 0000000000000..9f224665b7661 --- /dev/null +++ b/br/pkg/lightning/backend/local/disk_quota.go @@ -0,0 +1,59 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import ( + "github.com/google/uuid" + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "golang.org/x/exp/slices" +) + +// DiskUsage is an interface to obtain the size occupied locally of all engines +type DiskUsage interface { + // EngineFileSizes obtains the size occupied locally of all engines managed + // by this backend. This method is used to compute disk quota. + // It can return nil if the content are all stored remotely. + EngineFileSizes() (res []backend.EngineFileSize) +} + +// CheckDiskQuota verifies if the total engine file size is below the given +// quota. If the quota is exceeded, this method returns an array of engines, +// which after importing can decrease the total size below quota. +func CheckDiskQuota(mgr DiskUsage, quota int64) ( + largeEngines []uuid.UUID, + inProgressLargeEngines int, + totalDiskSize int64, + totalMemSize int64, +) { + sizes := mgr.EngineFileSizes() + slices.SortFunc(sizes, func(i, j backend.EngineFileSize) bool { + if i.IsImporting != j.IsImporting { + return i.IsImporting + } + return i.DiskSize+i.MemSize < j.DiskSize+j.MemSize + }) + for _, size := range sizes { + totalDiskSize += size.DiskSize + totalMemSize += size.MemSize + if totalDiskSize+totalMemSize > quota { + if size.IsImporting { + inProgressLargeEngines++ + } else { + largeEngines = append(largeEngines, size.UUID) + } + } + } + return +} diff --git a/br/pkg/lightning/backend/local/disk_quota_test.go b/br/pkg/lightning/backend/local/disk_quota_test.go new file mode 100644 index 0000000000000..00f9710dab8bf --- /dev/null +++ b/br/pkg/lightning/backend/local/disk_quota_test.go @@ -0,0 +1,99 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import ( + "testing" + + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/mock/mocklocal" + "github.com/stretchr/testify/require" +) + +func TestCheckDiskQuota(t *testing.T) { + controller := gomock.NewController(t) + mockDiskUsage := mocklocal.NewMockDiskUsage(controller) + + uuid1 := uuid.MustParse("11111111-1111-1111-1111-111111111111") + uuid3 := uuid.MustParse("33333333-3333-3333-3333-333333333333") + uuid5 := uuid.MustParse("55555555-5555-5555-5555-555555555555") + uuid7 := uuid.MustParse("77777777-7777-7777-7777-777777777777") + uuid9 := uuid.MustParse("99999999-9999-9999-9999-999999999999") + + fileSizes := []backend.EngineFileSize{ + { + UUID: uuid1, + DiskSize: 1000, + MemSize: 0, + IsImporting: false, + }, + { + UUID: uuid3, + DiskSize: 2000, + MemSize: 1000, + IsImporting: true, + }, + { + UUID: uuid5, + DiskSize: 1500, + MemSize: 3500, + IsImporting: false, + }, + { + UUID: uuid7, + DiskSize: 0, + MemSize: 7000, + IsImporting: true, + }, + { + UUID: uuid9, + DiskSize: 4500, + MemSize: 4500, + IsImporting: false, + }, + } + + mockDiskUsage.EXPECT().EngineFileSizes().Return(fileSizes).Times(4) + + // No quota exceeded + le, iple, ds, ms := CheckDiskQuota(mockDiskUsage, 30000) + require.Len(t, le, 0) + require.Equal(t, 0, iple) + require.Equal(t, int64(9000), ds) + require.Equal(t, int64(16000), ms) + + // Quota exceeded, the largest one is out + le, iple, ds, ms = CheckDiskQuota(mockDiskUsage, 20000) + require.Equal(t, []uuid.UUID{uuid9}, le) + require.Equal(t, 0, iple) + require.Equal(t, int64(9000), ds) + require.Equal(t, int64(16000), ms) + + // Quota exceeded, the importing one should be ranked least priority + le, iple, ds, ms = CheckDiskQuota(mockDiskUsage, 12000) + require.Equal(t, []uuid.UUID{uuid5, uuid9}, le) + require.Equal(t, 0, iple) + require.Equal(t, int64(9000), ds) + require.Equal(t, int64(16000), ms) + + // Quota exceeded, the importing ones should not be visible + le, iple, ds, ms = CheckDiskQuota(mockDiskUsage, 5000) + require.Equal(t, []uuid.UUID{uuid1, uuid5, uuid9}, le) + require.Equal(t, 1, iple) + require.Equal(t, int64(9000), ds) + require.Equal(t, int64(16000), ms) +} diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index beaa45b866d95..57958d74d64a2 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -1146,7 +1146,7 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er } // AppendRows appends rows to the SST file. -func (w *Writer) AppendRows(ctx context.Context, _ string, columnNames []string, rows encode.Rows) error { +func (w *Writer) AppendRows(ctx context.Context, columnNames []string, rows encode.Rows) error { kvs := kv.Rows2KvPairs(rows) if len(kvs) == 0 { return nil diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 60c1ddf1365e5..3550473fd3737 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -440,8 +440,8 @@ func (c *BackendConfig) adjust() { c.MaxOpenFiles = mathutil.Max(c.MaxOpenFiles, openFilesLowerThreshold) } -// Local is a local backend. -type Local struct { +// Backend is a local backend. +type Backend struct { engines sync.Map // sync version of map[uuid.UUID]*Engine pdCtl *pdutil.PdController @@ -464,6 +464,9 @@ type Local struct { logger log.Logger } +var _ DiskUsage = (*Backend)(nil) +var _ backend.Backend = (*Backend)(nil) + func openDuplicateDB(storeDir string) (*pebble.DB, error) { dbPath := filepath.Join(storeDir, duplicateDBName) // TODO: Optimize the opts for better write. @@ -482,17 +485,17 @@ var ( LastAlloc manual.Allocator ) -// NewLocalBackend creates new connections to tikv. -func NewLocalBackend( +// NewBackend creates new connections to tikv. +func NewBackend( ctx context.Context, tls *common.TLS, config BackendConfig, regionSizeGetter TableRegionSizeGetter, -) (backend.Backend, error) { +) (*Backend, error) { config.adjust() pdCtl, err := pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption()) if err != nil { - return backend.MakeBackend(nil), common.NormalizeOrWrapErr(common.ErrCreatePDClient, err) + return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err) } splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false) @@ -500,7 +503,7 @@ func NewLocalBackend( if config.CheckpointEnabled { if info, err := os.Stat(config.LocalStoreDir); err != nil { if !os.IsNotExist(err) { - return backend.MakeBackend(nil), err + return nil, err } } else if info.IsDir() { shouldCreate = false @@ -510,7 +513,7 @@ func NewLocalBackend( if shouldCreate { err = os.Mkdir(config.LocalStoreDir, 0o700) if err != nil { - return backend.MakeBackend(nil), common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(config.LocalStoreDir) + return nil, common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(config.LocalStoreDir) } } @@ -518,14 +521,14 @@ func NewLocalBackend( if config.DupeDetectEnabled { duplicateDB, err = openDuplicateDB(config.LocalStoreDir) if err != nil { - return backend.MakeBackend(nil), common.ErrOpenDuplicateDB.Wrap(err).GenWithStackByArgs() + return nil, common.ErrOpenDuplicateDB.Wrap(err).GenWithStackByArgs() } } // The following copies tikv.NewTxnClient without creating yet another pdClient. spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig()) if err != nil { - return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() + return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() } var pdCliForTiKV *tikvclient.CodecPDClient @@ -534,7 +537,7 @@ func NewLocalBackend( } else { pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), config.KeyspaceName) if err != nil { - return backend.MakeBackend(nil), common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs() + return nil, common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs() } } @@ -542,7 +545,7 @@ func NewLocalBackend( rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec)) tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli) if err != nil { - return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() + return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() } importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType) keyAdapter := KeyAdapter(noopKeyAdapter{}) @@ -560,7 +563,7 @@ func NewLocalBackend( alloc.RefCnt = new(atomic.Int64) LastAlloc = alloc } - local := &Local{ + local := &Backend{ engines: sync.Map{}, pdCtl: pdCtl, splitCli: splitCli, @@ -582,14 +585,14 @@ func NewLocalBackend( local.metrics = m } if err = local.checkMultiIngestSupport(ctx); err != nil { - return backend.MakeBackend(nil), common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs() + return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs() } - return backend.MakeBackend(local), nil + return local, nil } // TotalMemoryConsume returns the total memory usage of the local backend. -func (local *Local) TotalMemoryConsume() int64 { +func (local *Backend) TotalMemoryConsume() int64 { var memConsume int64 local.engines.Range(func(k, v interface{}) bool { e := v.(*Engine) @@ -601,7 +604,7 @@ func (local *Local) TotalMemoryConsume() int64 { return memConsume + local.bufferPool.TotalSize() } -func (local *Local) checkMultiIngestSupport(ctx context.Context) error { +func (local *Backend) checkMultiIngestSupport(ctx context.Context) error { stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) if err != nil { return errors.Trace(err) @@ -667,7 +670,7 @@ func (local *Local) checkMultiIngestSupport(ctx context.Context) error { } // rlock read locks a local file and returns the Engine instance if it exists. -func (local *Local) rLockEngine(engineID uuid.UUID) *Engine { +func (local *Backend) rLockEngine(engineID uuid.UUID) *Engine { if e, ok := local.engines.Load(engineID); ok { engine := e.(*Engine) engine.rLock() @@ -677,7 +680,7 @@ func (local *Local) rLockEngine(engineID uuid.UUID) *Engine { } // lock locks a local file and returns the Engine instance if it exists. -func (local *Local) lockEngine(engineID uuid.UUID, state importMutexState) *Engine { +func (local *Backend) lockEngine(engineID uuid.UUID, state importMutexState) *Engine { if e, ok := local.engines.Load(engineID); ok { engine := e.(*Engine) engine.lock(state) @@ -687,7 +690,7 @@ func (local *Local) lockEngine(engineID uuid.UUID, state importMutexState) *Engi } // tryRLockAllEngines tries to read lock all engines, return all `Engine`s that are successfully locked. -func (local *Local) tryRLockAllEngines() []*Engine { +func (local *Backend) tryRLockAllEngines() []*Engine { var allEngines []*Engine local.engines.Range(func(k, v interface{}) bool { engine := v.(*Engine) @@ -706,7 +709,7 @@ func (local *Local) tryRLockAllEngines() []*Engine { // lockAllEnginesUnless tries to lock all engines, unless those which are already locked in the // state given by ignoreStateMask. Returns the list of locked engines. -func (local *Local) lockAllEnginesUnless(newState, ignoreStateMask importMutexState) []*Engine { +func (local *Backend) lockAllEnginesUnless(newState, ignoreStateMask importMutexState) []*Engine { var allEngines []*Engine local.engines.Range(func(k, v interface{}) bool { engine := v.(*Engine) @@ -719,7 +722,7 @@ func (local *Local) lockAllEnginesUnless(newState, ignoreStateMask importMutexSt } // Close the local backend. -func (local *Local) Close() { +func (local *Backend) Close() { allEngines := local.lockAllEnginesUnless(importMutexStateClose, 0) local.engines = sync.Map{} @@ -771,7 +774,7 @@ func (local *Local) Close() { } // FlushEngine ensure the written data is saved successfully, to make sure no data lose after restart -func (local *Local) FlushEngine(ctx context.Context, engineID uuid.UUID) error { +func (local *Backend) FlushEngine(ctx context.Context, engineID uuid.UUID) error { engine := local.rLockEngine(engineID) // the engine cannot be deleted after while we've acquired the lock identified by UUID. @@ -786,7 +789,7 @@ func (local *Local) FlushEngine(ctx context.Context, engineID uuid.UUID) error { } // FlushAllEngines flush all engines. -func (local *Local) FlushAllEngines(parentCtx context.Context) (err error) { +func (local *Backend) FlushAllEngines(parentCtx context.Context) (err error) { allEngines := local.tryRLockAllEngines() defer func() { for _, engine := range allEngines { @@ -805,16 +808,16 @@ func (local *Local) FlushAllEngines(parentCtx context.Context) (err error) { } // RetryImportDelay returns the delay time before retrying to import a file. -func (*Local) RetryImportDelay() time.Duration { +func (*Backend) RetryImportDelay() time.Duration { return defaultRetryBackoffTime } // ShouldPostProcess returns true if the backend should post process the data. -func (*Local) ShouldPostProcess() bool { +func (*Backend) ShouldPostProcess() bool { return true } -func (local *Local) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.DB, error) { +func (local *Backend) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.DB, error) { opt := &pebble.Options{ MemTableSize: local.MemTableSize, // the default threshold value may cause write stall. @@ -844,7 +847,7 @@ func (local *Local) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.D } // OpenEngine must be called with holding mutex of Engine. -func (local *Local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error { +func (local *Backend) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error { db, err := local.openEngineDB(engineUUID, false) if err != nil { return err @@ -889,7 +892,7 @@ func (local *Local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e return nil } -func (local *Local) allocateTSIfNotExists(ctx context.Context, engine *Engine) error { +func (local *Backend) allocateTSIfNotExists(ctx context.Context, engine *Engine) error { if engine.TS > 0 { return nil } @@ -903,7 +906,7 @@ func (local *Local) allocateTSIfNotExists(ctx context.Context, engine *Engine) e } // CloseEngine closes backend engine by uuid. -func (local *Local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error { +func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error { // flush mem table to storage, to free memory, // ask others' advise, looks like unnecessary, but with this we can control memory precisely. engineI, ok := local.engines.Load(engineUUID) @@ -955,7 +958,7 @@ func (local *Local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, return engine.ingestErr.Get() } -func (local *Local) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) { +func (local *Backend) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) { return local.importClientFactory.Create(ctx, storeID) } @@ -994,7 +997,7 @@ func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit return ranges } -func (local *Local) readAndSplitIntoRange( +func (local *Backend) readAndSplitIntoRange( ctx context.Context, engine *Engine, sizeLimit int64, @@ -1037,7 +1040,7 @@ func (local *Local) readAndSplitIntoRange( // prepareAndGenerateUnfinishedJob will read the engine to get unfinished key range, // then split and scatter regions for these range and generate region jobs. -func (local *Local) prepareAndGenerateUnfinishedJob( +func (local *Backend) prepareAndGenerateUnfinishedJob( ctx context.Context, engineUUID uuid.UUID, lf *Engine, @@ -1084,7 +1087,7 @@ func (local *Local) prepareAndGenerateUnfinishedJob( } // generateJobInRanges scans the region in ranges and generate region jobs. -func (local *Local) generateJobInRanges( +func (local *Backend) generateJobInRanges( ctx context.Context, engine *Engine, jobRanges []Range, @@ -1167,7 +1170,7 @@ func (local *Local) generateJobInRanges( // stops. // this function must send the job back to jobOutCh after read it from jobInCh, // even if any error happens. -func (local *Local) startWorker( +func (local *Backend) startWorker( ctx context.Context, jobInCh, jobOutCh chan *regionJob, ) error { @@ -1202,7 +1205,7 @@ func (local *Local) startWorker( } } -func (*Local) isRetryableImportTiKVError(err error) bool { +func (*Backend) isRetryableImportTiKVError(err error) bool { err = errors.Cause(err) // io.EOF is not retryable in normal case // but on TiKV restart, if we're writing to TiKV(through GRPC) @@ -1220,7 +1223,7 @@ func (*Local) isRetryableImportTiKVError(err error) bool { // If non-retryable error occurs, it will return the error. // If retryable error occurs, it will return nil and caller should check the stage // of the regionJob to determine what to do with it. -func (local *Local) executeJob( +func (local *Backend) executeJob( ctx context.Context, job *regionJob, ) error { @@ -1290,7 +1293,7 @@ func (local *Local) executeJob( } // ImportEngine imports an engine to TiKV. -func (local *Local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { +func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { lf := local.lockEngine(engineUUID, importMutexStateImport) if lf == nil { // skip if engine not exist. See the comment of `CloseEngine` for more detail. @@ -1472,7 +1475,7 @@ func (local *Local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi } // ResetEngine reset the engine and reclaim the space. -func (local *Local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error { +func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error { // the only way to reset the engine + reclaim the space is to delete and reopen it 🤷 localEngine := local.lockEngine(engineUUID, importMutexStateClose) if localEngine == nil { @@ -1506,7 +1509,7 @@ func (local *Local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error } // CleanupEngine cleanup the engine and reclaim the space. -func (local *Local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error { +func (local *Backend) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error { localEngine := local.lockEngine(engineUUID, importMutexStateClose) // release this engine after import success if localEngine == nil { @@ -1534,7 +1537,7 @@ func (local *Local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) err } // GetDupeController returns a new dupe controller. -func (local *Local) GetDupeController(dupeConcurrency int, errorMgr *errormanager.ErrorManager) *DupeController { +func (local *Backend) GetDupeController(dupeConcurrency int, errorMgr *errormanager.ErrorManager) *DupeController { return &DupeController{ splitCli: local.splitCli, tikvCli: local.tikvCli, @@ -1547,12 +1550,30 @@ func (local *Local) GetDupeController(dupeConcurrency int, errorMgr *errormanage } } +// UnsafeImportAndReset forces the backend to import the content of an engine +// into the target and then reset the engine to empty. This method will not +// close the engine. Make sure the engine is flushed manually before calling +// this method. +func (local *Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { + // DO NOT call be.abstract.CloseEngine()! The engine should still be writable after + // calling UnsafeImportAndReset(). + logger := log.FromContext(ctx).With( + zap.String("engineTag", ""), + zap.Stringer("engineUUID", engineUUID), + ) + closedEngine := backend.NewClosedEngine(local, logger, engineUUID, 0) + if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil { + return err + } + return local.ResetEngine(ctx, engineUUID) +} + func engineSSTDir(storeDir string, engineUUID uuid.UUID) string { return filepath.Join(storeDir, engineUUID.String()+".sst") } // LocalWriter returns a new local writer. -func (local *Local) LocalWriter(_ context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error) { +func (local *Backend) LocalWriter(_ context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error) { e, ok := local.engines.Load(engineUUID) if !ok { return nil, errors.Errorf("could not find engine for %s", engineUUID.String()) @@ -1606,8 +1627,8 @@ func nextKey(key []byte) []byte { return res } -// EngineFileSizes returns the file size of each engine. -func (local *Local) EngineFileSizes() (res []backend.EngineFileSize) { +// EngineFileSizes implements DiskUsage interface. +func (local *Backend) EngineFileSizes() (res []backend.EngineFileSize) { local.engines.Range(func(k, v interface{}) bool { engine := v.(*Engine) res = append(res, engine.getEngineFileSize()) diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index a70fd8d3f1822..87923ad66ed33 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -385,11 +385,11 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { rows2 = kvs[6000:12000] rows3 = kvs[12000:] } - err = w.AppendRows(ctx, "", []string{}, kv.MakeRowsFromKvPairs(rows1)) + err = w.AppendRows(ctx, []string{}, kv.MakeRowsFromKvPairs(rows1)) require.NoError(t, err) - err = w.AppendRows(ctx, "", []string{}, kv.MakeRowsFromKvPairs(rows2)) + err = w.AppendRows(ctx, []string{}, kv.MakeRowsFromKvPairs(rows2)) require.NoError(t, err) - err = w.AppendRows(ctx, "", []string{}, kv.MakeRowsFromKvPairs(rows3)) + err = w.AppendRows(ctx, []string{}, kv.MakeRowsFromKvPairs(rows3)) require.NoError(t, err) flushStatus, err := w.Close(context.Background()) require.NoError(t, err) @@ -1084,7 +1084,7 @@ func TestMultiIngest(t *testing.T) { pdCtl := &pdutil.PdController{} pdCtl.SetPDClient(&mockPdClient{stores: stores}) - local := &Local{ + local := &Backend{ pdCtl: pdCtl, importClientFactory: &mockImportClientFactory{ stores: allStores, @@ -1105,7 +1105,7 @@ func TestMultiIngest(t *testing.T) { } func TestLocalWriteAndIngestPairsFailFast(t *testing.T) { - bak := Local{} + bak := Backend{} require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return(true)")) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace")) @@ -1153,7 +1153,7 @@ func TestGetRegionSplitSizeKeys(t *testing.T) { } func TestLocalIsRetryableTiKVWriteError(t *testing.T) { - l := Local{} + l := Backend{} require.True(t, l.isRetryableImportTiKVError(io.EOF)) require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF))) } @@ -1175,7 +1175,7 @@ func TestCheckPeersBusy(t *testing.T) { }} createTimeStore12 := 0 - local := &Local{ + local := &Backend{ importClientFactory: &mockImportClientFactory{ stores: []*metapb.Store{ {Id: 11}, {Id: 12}, {Id: 13}, // region ["a", "b") @@ -1352,7 +1352,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) { getSizePropertiesFn = backup }) - local := &Local{ + local := &Backend{ splitCli: initTestSplitClient( [][]byte{{1}, {11}}, // we have one big region panicSplitRegionClient{}, // make sure no further split region diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index 187cb64fdd0a2..3545ed2c4c8d4 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -111,7 +111,7 @@ func (g *TableRegionSizeGetterImpl) GetTableRegionSize(ctx context.Context, tabl // SplitAndScatterRegionInBatches splits&scatter regions in batches. // Too many split&scatter requests may put a lot of pressure on TiKV and PD. -func (local *Local) SplitAndScatterRegionInBatches( +func (local *Backend) SplitAndScatterRegionInBatches( ctx context.Context, ranges []Range, tableInfo *checkpoints.TidbTableInfo, @@ -135,7 +135,7 @@ func (local *Local) SplitAndScatterRegionInBatches( // we can simply call br function, but we need to change some function signature of br // When the ranges total size is small, we can skip the split to avoid generate empty regions. // TODO: remove this file and use br internal functions -func (local *Local) SplitAndScatterRegionByRanges( +func (local *Backend) SplitAndScatterRegionByRanges( ctx context.Context, ranges []Range, tableInfo *checkpoints.TidbTableInfo, @@ -393,7 +393,7 @@ func (local *Local) SplitAndScatterRegionByRanges( } // BatchSplitRegions splits the region into multiple regions by given split keys. -func (local *Local) BatchSplitRegions( +func (local *Backend) BatchSplitRegions( ctx context.Context, region *split.RegionInfo, keys [][]byte, @@ -438,7 +438,7 @@ func (local *Local) BatchSplitRegions( return region, newRegions, nil } -func (local *Local) hasRegion(ctx context.Context, regionID uint64) (bool, error) { +func (local *Backend) hasRegion(ctx context.Context, regionID uint64) (bool, error) { regionInfo, err := local.splitCli.GetRegionByID(ctx, regionID) if err != nil { return false, err @@ -446,7 +446,7 @@ func (local *Local) hasRegion(ctx context.Context, regionID uint64) (bool, error return regionInfo != nil, nil } -func (local *Local) waitForSplit(ctx context.Context, regionID uint64) { +func (local *Backend) waitForSplit(ctx context.Context, regionID uint64) { for i := 0; i < split.SplitCheckMaxRetryTimes; i++ { ok, err := local.hasRegion(ctx, regionID) if err != nil { @@ -464,7 +464,7 @@ func (local *Local) waitForSplit(ctx context.Context, regionID uint64) { } } -func (local *Local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) { +func (local *Backend) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) { subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval) defer cancel() @@ -495,7 +495,7 @@ func (local *Local) waitForScatterRegions(ctx context.Context, regions []*split. return scatterCount, nil } -func (local *Local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { +func (local *Backend) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId()) if err != nil { return false, err diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 328ee3e6a930c..5b061f127ca24 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -455,7 +455,7 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} client := initTestSplitClient(keys, hook) - local := &Local{ + local := &Backend{ splitCli: client, regionSizeGetter: &TableRegionSizeGetterImpl{}, logger: log.L(), @@ -629,7 +629,7 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) { keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")} client := initTestSplitClient(keys, nil) - local := &Local{ + local := &Backend{ splitCli: client, regionSizeGetter: &TableRegionSizeGetterImpl{}, logger: log.L(), @@ -716,7 +716,7 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) { } keys = append(keys, tableEndKey, []byte("")) client := initTestSplitClient(keys, hook) - local := &Local{ + local := &Backend{ splitCli: client, regionSizeGetter: &TableRegionSizeGetterImpl{}, logger: log.L(), diff --git a/br/pkg/lightning/backend/noop/BUILD.bazel b/br/pkg/lightning/backend/noop/BUILD.bazel deleted file mode 100644 index b7f46c4c1166d..0000000000000 --- a/br/pkg/lightning/backend/noop/BUILD.bazel +++ /dev/null @@ -1,16 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") - -go_library( - name = "noop", - srcs = ["noop.go"], - importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/noop", - visibility = ["//visibility:public"], - deps = [ - "//br/pkg/lightning/backend", - "//br/pkg/lightning/backend/encode", - "//br/pkg/lightning/verification", - "//parser/model", - "//types", - "@com_github_google_uuid//:uuid", - ], -) diff --git a/br/pkg/lightning/backend/noop/noop.go b/br/pkg/lightning/backend/noop/noop.go deleted file mode 100644 index a639d111b47eb..0000000000000 --- a/br/pkg/lightning/backend/noop/noop.go +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package noop - -import ( - "context" - "time" - - "github.com/google/uuid" - "github.com/pingcap/tidb/br/pkg/lightning/backend" - "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" - "github.com/pingcap/tidb/br/pkg/lightning/verification" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/types" -) - -// NewNoopBackend creates a new backend that does nothing. -func NewNoopBackend() backend.Backend { - return backend.MakeBackend(noopBackend{}) -} - -type noopBackend struct{} - -type noopRows struct{} - -// SplitIntoChunks implements the Rows interface. -func (r noopRows) SplitIntoChunks(int) []encode.Rows { - return []encode.Rows{r} -} - -// Clear returns a new collection with empty content. It may share the -// capacity with the current instance. The typical usage is `x = x.Clear()`. -func (r noopRows) Clear() encode.Rows { - return r -} - -// Close the connection to the backend. -func (noopBackend) Close() {} - -// MakeEmptyRows creates an empty collection of encoded rows. -func (noopBackend) MakeEmptyRows() encode.Rows { - return noopRows{} -} - -// RetryImportDelay returns the duration to sleep when retrying an import -func (noopBackend) RetryImportDelay() time.Duration { - return 0 -} - -// ShouldPostProcess returns whether KV-specific post-processing should be -// performed for this backend. Post-processing includes checksum and analyze. -func (noopBackend) ShouldPostProcess() bool { - return false -} - -// NewEncoder creates an encoder of a TiDB table. -func (noopBackend) NewEncoder(_ context.Context, _ *encode.EncodingConfig) (encode.Encoder, error) { - return noopEncoder{}, nil -} - -// OpenEngine creates a new engine file for the given table. -func (noopBackend) OpenEngine(context.Context, *backend.EngineConfig, uuid.UUID) error { - return nil -} - -// CloseEngine closes the engine file, flushing any remaining data. -func (noopBackend) CloseEngine(_ context.Context, _ *backend.EngineConfig, _ uuid.UUID) error { - return nil -} - -// ImportEngine imports a closed engine file. -func (noopBackend) ImportEngine(_ context.Context, _ uuid.UUID, _, _ int64) error { - return nil -} - -// CleanupEngine removes all data related to the engine. -func (noopBackend) CleanupEngine(_ context.Context, _ uuid.UUID) error { - return nil -} - -// CheckRequirements performs the check whether the backend satisfies the -// version requirements -func (noopBackend) CheckRequirements(context.Context, *backend.CheckCtx) error { - return nil -} - -// FetchRemoteTableModels obtains the models of all tables given the schema -// name. The returned table info does not need to be precise if the encoder, -// is not requiring them, but must at least fill in the following fields for -// TablesFromMeta to succeed: -// - Name -// - State (must be model.StatePublic) -// - ID -// - Columns -// - Name -// - State (must be model.StatePublic) -// - Offset (must be 0, 1, 2, ...) -// - PKIsHandle (true = do not generate _tidb_rowid) -func (noopBackend) FetchRemoteTableModels(_ context.Context, _ string) ([]*model.TableInfo, error) { - return nil, nil -} - -// FlushEngine ensures all KV pairs written to an open engine has been -// synchronized, such that kill-9'ing Lightning afterwards and resuming from -// checkpoint can recover the exact same content. -// -// This method is only relevant for local backend, and is no-op for all -// other backends. -func (noopBackend) FlushEngine(_ context.Context, _ uuid.UUID) error { - return nil -} - -// FlushAllEngines performs FlushEngine on all opened engines. This is a -// very expensive operation and should only be used in some rare situation -// (e.g. preparing to resolve a disk quota violation). -func (noopBackend) FlushAllEngines(_ context.Context) error { - return nil -} - -// EngineFileSizes obtains the size occupied locally of all engines managed -// by this backend. This method is used to compute disk quota. -// It can return nil if the content are all stored remotely. -func (noopBackend) EngineFileSizes() []backend.EngineFileSize { - return nil -} - -// ResetEngine clears all written KV pairs in this opened engine. -func (noopBackend) ResetEngine(_ context.Context, _ uuid.UUID) error { - return nil -} - -// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine. -func (noopBackend) LocalWriter(context.Context, *backend.LocalWriterConfig, uuid.UUID) (backend.EngineWriter, error) { - return Writer{}, nil -} - -// TotalMemoryConsume returns the total memory usage of the backend. -func (noopBackend) TotalMemoryConsume() int64 { - return 0 -} - -type noopEncoder struct{} - -// Close the encoder. -func (noopEncoder) Close() {} - -// Encode encodes a row of SQL values into a backend-friendly format. -func (noopEncoder) Encode([]types.Datum, int64, []int, int64) (encode.Row, error) { - return noopRow{}, nil -} - -type noopRow struct{} - -// Size returns the size of the encoded row. -func (noopRow) Size() uint64 { - return 0 -} - -// ClassifyAndAppend classifies the row into the corresponding collection. -func (noopRow) ClassifyAndAppend(*encode.Rows, *verification.KVChecksum, *encode.Rows, *verification.KVChecksum) { -} - -// Writer define a local writer that do nothing. -type Writer struct{} - -// AppendRows implements the EngineWriter interface. -func (Writer) AppendRows(context.Context, string, []string, encode.Rows) error { - return nil -} - -// IsSynced implements the EngineWriter interface. -func (Writer) IsSynced() bool { - return true -} - -// Close implements the EngineWriter interface. -func (Writer) Close(context.Context) (backend.ChunkFlushStatus, error) { - return trueStatus{}, nil -} - -type trueStatus struct{} - -// Flushed implements the ChunkFlushStatus interface. -func (trueStatus) Flushed() bool { - return true -} diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 65fb37d1e7bd6..40a779467f66e 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -256,6 +256,8 @@ type tidbBackend struct { errorMgr *errormanager.ErrorManager } +var _ backend.Backend = (*tidbBackend)(nil) + // NewTiDBBackend creates a new TiDB backend using the given database. // // The backend does not take ownership of `db`. Caller should close `db` @@ -267,11 +269,11 @@ func NewTiDBBackend(ctx context.Context, db *sql.DB, onDuplicate string, errorMg log.FromContext(ctx).Warn("unsupported action on duplicate, overwrite with `replace`") onDuplicate = config.ReplaceOnDup } - return backend.MakeBackend(&tidbBackend{ + return &tidbBackend{ db: db, onDuplicate: onDuplicate, errorMgr: errorMgr, - }) + } } func (row tidbRow) Size() uint64 { @@ -600,10 +602,6 @@ rowLoop: return nil } -func (*tidbBackend) TotalMemoryConsume() int64 { - return 0 -} - type stmtTask struct { rows tidbRows stmt string @@ -724,11 +722,6 @@ func (be *tidbBackend) execStmts(ctx context.Context, stmtTasks []stmtTask, tabl return nil } -// EngineFileSizes returns the size of each engine file. -func (*tidbBackend) EngineFileSizes() []backend.EngineFileSize { - return nil -} - // FlushEngine flushes the data in the engine to the underlying storage. func (*tidbBackend) FlushEngine(context.Context, uuid.UUID) error { return nil @@ -747,15 +740,16 @@ func (*tidbBackend) ResetEngine(context.Context, uuid.UUID) error { // LocalWriter returns a writer that writes data to local storage. func (be *tidbBackend) LocalWriter( _ context.Context, - _ *backend.LocalWriterConfig, + cfg *backend.LocalWriterConfig, _ uuid.UUID, ) (backend.EngineWriter, error) { - return &Writer{be: be}, nil + return &Writer{be: be, tableName: cfg.TableName}, nil } // Writer is a writer that writes data to local storage. type Writer struct { - be *tidbBackend + be *tidbBackend + tableName string } // Close implements the EngineWriter interface. @@ -764,8 +758,8 @@ func (*Writer) Close(_ context.Context) (backend.ChunkFlushStatus, error) { } // AppendRows implements the EngineWriter interface. -func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, rows encode.Rows) error { - return w.be.WriteRows(ctx, tableName, columnNames, rows) +func (w *Writer) AppendRows(ctx context.Context, columnNames []string, rows encode.Rows) error { + return w.be.WriteRows(ctx, w.tableName, columnNames, rows) } // IsSynced implements the EngineWriter interface. diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index 828920a9323d1..f93a8f60bbe21 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -44,6 +44,7 @@ import ( type mysqlSuite struct { dbHandle *sql.DB mockDB sqlmock.Sqlmock + mgr backend.EngineManager backend backend.Backend encBuilder encode.EncodingBuilder tbl table.Table @@ -65,11 +66,12 @@ func createMysqlSuite(t *testing.T) *mysqlSuite { tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic} tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), tblInfo) require.NoError(t, err) - backend := tidb.NewTiDBBackend(context.Background(), db, config.ReplaceOnDup, errormanager.New(nil, config.NewConfig(), log.L())) + backendObj := tidb.NewTiDBBackend(context.Background(), db, config.ReplaceOnDup, errormanager.New(nil, config.NewConfig(), log.L())) return &mysqlSuite{ dbHandle: db, mockDB: mock, - backend: backend, + mgr: backend.MakeEngineManager(backendObj), + backend: backendObj, encBuilder: tidb.NewEncodingBuilder(), tbl: tbl, } @@ -90,7 +92,7 @@ func TestWriteRowsReplaceOnDup(t *testing.T) { ctx := context.Background() logger := log.L() - engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) + engine, err := s.mgr.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) dataRows := s.encBuilder.MakeEmptyRows() @@ -133,9 +135,9 @@ func TestWriteRowsReplaceOnDup(t *testing.T) { require.NoError(t, err) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) - writer, err := engine.LocalWriter(ctx, nil) + writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"}) require.NoError(t, err) - err = writer.WriteRows(ctx, []string{"b", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) + err = writer.AppendRows(ctx, []string{"b", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) require.NoError(t, err) st, err := writer.Close(ctx) require.NoError(t, err) @@ -154,7 +156,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) { encBuilder := tidb.NewEncodingBuilder() ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, config.IgnoreOnDup, errormanager.New(nil, config.NewConfig(), logger)) - engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) + engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) dataRows := encBuilder.MakeEmptyRows() @@ -170,9 +172,9 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) { require.NoError(t, err) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) - writer, err := engine.LocalWriter(ctx, nil) + writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"}) require.NoError(t, err) - err = writer.WriteRows(ctx, []string{"a"}, dataRows) + err = writer.AppendRows(ctx, []string{"a"}, dataRows) require.NoError(t, err) _, err = writer.Close(ctx) require.NoError(t, err) @@ -201,7 +203,7 @@ func TestWriteRowsErrorOnDup(t *testing.T) { encBuilder := tidb.NewEncodingBuilder() ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig(), logger)) - engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) + engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) dataRows := encBuilder.MakeEmptyRows() @@ -218,9 +220,9 @@ func TestWriteRowsErrorOnDup(t *testing.T) { row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) - writer, err := engine.LocalWriter(ctx, nil) + writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"}) require.NoError(t, err) - err = writer.WriteRows(ctx, []string{"a"}, dataRows) + err = writer.AppendRows(ctx, []string{"a"}, dataRows) require.NoError(t, err) st, err := writer.Close(ctx) require.NoError(t, err) @@ -442,13 +444,13 @@ func TestWriteRowsErrorNoRetry(t *testing.T) { errormanager.New(s.dbHandle, &config.Config{}, log.L()), ) encBuilder := tidb.NewEncodingBuilder() - dataRows := encodeRowsTiDB(t, encBuilder, ignoreBackend, s.tbl) + dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) ctx := context.Background() - engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) + engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) - writer, err := engine.LocalWriter(ctx, nil) + writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"}) require.NoError(t, err) - err = writer.WriteRows(ctx, []string{"a"}, dataRows) + err = writer.AppendRows(ctx, []string{"a"}, dataRows) require.Error(t, err) require.False(t, common.IsRetryableError(err), "err: %v", err) } @@ -510,13 +512,13 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) { }, log.L()), ) encBuilder := tidb.NewEncodingBuilder() - dataRows := encodeRowsTiDB(t, encBuilder, ignoreBackend, s.tbl) + dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) ctx := context.Background() - engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) + engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) - writer, err := engine.LocalWriter(ctx, nil) + writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"}) require.NoError(t, err) - err = writer.WriteRows(ctx, []string{"a"}, dataRows) + err = writer.AppendRows(ctx, []string{"a"}, dataRows) require.NoError(t, err) } @@ -566,20 +568,20 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) { }, log.L()), ) encBuilder := tidb.NewEncodingBuilder() - dataRows := encodeRowsTiDB(t, encBuilder, ignoreBackend, s.tbl) + dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) ctx := context.Background() - engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) + engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) - writer, err := engine.LocalWriter(ctx, nil) + writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"}) require.NoError(t, err) - err = writer.WriteRows(ctx, []string{"a"}, dataRows) + err = writer.AppendRows(ctx, []string{"a"}, dataRows) require.Error(t, err) st, err := writer.Close(ctx) require.NoError(t, err) require.Nil(t, st) } -func encodeRowsTiDB(t *testing.T, encBuilder encode.EncodingBuilder, b backend.Backend, tbl table.Table) encode.Rows { +func encodeRowsTiDB(t *testing.T, encBuilder encode.EncodingBuilder, tbl table.Table) encode.Rows { dataRows := encBuilder.MakeEmptyRows() dataChecksum := verification.MakeKVChecksum(0, 0, 0) indexRows := encBuilder.MakeEmptyRows() diff --git a/br/pkg/lightning/importer/BUILD.bazel b/br/pkg/lightning/importer/BUILD.bazel index b46cba01f40cb..2ba92e6657611 100644 --- a/br/pkg/lightning/importer/BUILD.bazel +++ b/br/pkg/lightning/importer/BUILD.bazel @@ -115,7 +115,6 @@ go_test( "//br/pkg/lightning/backend", "//br/pkg/lightning/backend/encode", "//br/pkg/lightning/backend/kv", - "//br/pkg/lightning/backend/noop", "//br/pkg/lightning/backend/tidb", "//br/pkg/lightning/checkpoints", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/importer/chunk_process.go b/br/pkg/lightning/importer/chunk_process.go index f031a85c8fd2a..94310147eb4ea 100644 --- a/br/pkg/lightning/importer/chunk_process.go +++ b/br/pkg/lightning/importer/chunk_process.go @@ -142,7 +142,7 @@ func (cr *chunkProcessor) process( ctx context.Context, t *TableImporter, engineID int32, - dataEngine, indexEngine *backend.LocalEngineWriter, + dataEngine, indexEngine backend.EngineWriter, rc *Controller, ) error { logger := t.logger.With( @@ -390,7 +390,7 @@ func (cr *chunkProcessor) deliverLoop( kvsCh <-chan []deliveredKVs, t *TableImporter, engineID int32, - dataEngine, indexEngine *backend.LocalEngineWriter, + dataEngine, indexEngine backend.EngineWriter, rc *Controller, ) (deliverTotalDur time.Duration, err error) { deliverLogger := t.logger.With( @@ -471,14 +471,14 @@ func (cr *chunkProcessor) deliverLoop( // Write KVs into the engine start := time.Now() - if err = dataEngine.WriteRows(ctx, columns, dataKVs); err != nil { + if err = dataEngine.AppendRows(ctx, columns, dataKVs); err != nil { if !common.IsContextCanceledError(err) { deliverLogger.Error("write to data engine failed", log.ShortError(err)) } return errors.Trace(err) } - if err = indexEngine.WriteRows(ctx, columns, indexKVs); err != nil { + if err = indexEngine.AppendRows(ctx, columns, indexKVs); err != nil { if !common.IsContextCanceledError(err) { deliverLogger.Error("write to index engine failed", log.ShortError(err)) } @@ -579,7 +579,7 @@ func (*chunkProcessor) maybeSaveCheckpoint( t *TableImporter, engineID int32, chunk *checkpoints.ChunkCheckpoint, - data, index *backend.LocalEngineWriter, + data, index backend.EngineWriter, ) bool { if data.IsSynced() && index.IsSynced() { saveCheckpoint(rc, t, engineID, chunk) diff --git a/br/pkg/lightning/importer/chunk_process_test.go b/br/pkg/lightning/importer/chunk_process_test.go index 70c3d8eb5746c..cb3d24cc48bf6 100644 --- a/br/pkg/lightning/importer/chunk_process_test.go +++ b/br/pkg/lightning/importer/chunk_process_test.go @@ -100,7 +100,7 @@ func (s *chunkRestoreSuite) TestDeliverLoopCancel() { mockEncBuilder := mock.NewMockEncodingBuilder(controller) mockEncBuilder.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() - rc := &Controller{backend: backend.MakeBackend(mockBackend), encBuilder: mockEncBuilder} + rc := &Controller{engineMgr: backend.MakeEngineManager(mockBackend), backend: mockBackend, encBuilder: mockEncBuilder} ctx, cancel := context.WithCancel(context.Background()) kvsCh := make(chan []deliveredKVs) go cancel() @@ -117,14 +117,14 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData() { defer controller.Finish() mockBackend := mock.NewMockBackend(controller) mockEncBuilder := mock.NewMockEncodingBuilder(controller) - importer := backend.MakeBackend(mockBackend) + importer := backend.MakeEngineManager(mockBackend) mockBackend.EXPECT().OpenEngine(ctx, gomock.Any(), gomock.Any()).Return(nil).Times(2) mockEncBuilder.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() mockWriter := mock.NewMockEngineWriter(controller) mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()).Return(mockWriter, nil).AnyTimes() mockWriter.EXPECT(). - AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any()). + AppendRows(ctx, gomock.Any(), gomock.Any()). Return(nil).AnyTimes() mockWriter.EXPECT().IsSynced().Return(true).AnyTimes() @@ -141,7 +141,7 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData() { cfg := &config.Config{} saveCpCh := make(chan saveCp, 16) - rc := &Controller{cfg: cfg, backend: importer, saveCpCh: saveCpCh, encBuilder: mockEncBuilder} + rc := &Controller{cfg: cfg, engineMgr: backend.MakeEngineManager(mockBackend), backend: mockBackend, saveCpCh: saveCpCh, encBuilder: mockEncBuilder} var wg sync.WaitGroup wg.Add(1) @@ -172,7 +172,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop() { controller := gomock.NewController(s.T()) defer controller.Finish() mockBackend := mock.NewMockBackend(controller) - importer := backend.MakeBackend(mockBackend) + importer := backend.MakeEngineManager(mockBackend) mockEncBuilder := mock.NewMockEncodingBuilder(controller) mockBackend.EXPECT().OpenEngine(ctx, gomock.Any(), gomock.Any()).Return(nil).Times(2) @@ -188,7 +188,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop() { indexEngine, err := importer.OpenEngine(ctx, &backend.EngineConfig{}, s.tr.tableName, -1) require.NoError(s.T(), err) - dataWriter, err := dataEngine.LocalWriter(ctx, &backend.LocalWriterConfig{}) + dataWriter, err := dataEngine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: s.tr.tableName}) require.NoError(s.T(), err) indexWriter, err := indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{}) require.NoError(s.T(), err) @@ -196,7 +196,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop() { // Set up the expected API calls to the data engine... mockWriter.EXPECT(). - AppendRows(ctx, s.tr.tableName, mockCols, kv.MakeRowsFromKvPairs([]common.KvPair{ + AppendRows(ctx, mockCols, kv.MakeRowsFromKvPairs([]common.KvPair{ { Key: []byte("txxxxxxxx_ryyyyyyyy"), Val: []byte("value1"), @@ -213,7 +213,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop() { // Note: This test assumes data engine is written before the index engine. mockWriter.EXPECT(). - AppendRows(ctx, s.tr.tableName, mockCols, kv.MakeRowsFromKvPairs([]common.KvPair{ + AppendRows(ctx, mockCols, kv.MakeRowsFromKvPairs([]common.KvPair{ { Key: []byte("txxxxxxxx_izzzzzzzz"), Val: []byte("index1"), @@ -251,7 +251,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop() { }() cfg := &config.Config{} - rc := &Controller{cfg: cfg, saveCpCh: saveCpCh, backend: importer, encBuilder: mockEncBuilder} + rc := &Controller{cfg: cfg, saveCpCh: saveCpCh, engineMgr: backend.MakeEngineManager(mockBackend), backend: mockBackend, encBuilder: mockEncBuilder} _, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc) require.NoError(s.T(), err) @@ -659,7 +659,7 @@ func (s *chunkRestoreSuite) TestRestore() { controller := gomock.NewController(s.T()) defer controller.Finish() mockBackend := mock.NewMockBackend(controller) - importer := backend.MakeBackend(mockBackend) + importer := backend.MakeEngineManager(mockBackend) mockEncBuilder := mock.NewMockEncodingBuilder(controller) mockBackend.EXPECT().OpenEngine(ctx, gomock.Any(), gomock.Any()).Return(nil).Times(2) @@ -670,7 +670,7 @@ func (s *chunkRestoreSuite) TestRestore() { mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()).Return(mockWriter, nil).AnyTimes() mockEncBuilder.EXPECT().NewEncoder(gomock.Any(), gomock.Any()).Return(mockEncoder{}, nil).Times(1) mockWriter.EXPECT().IsSynced().Return(true).AnyTimes() - mockWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() dataEngine, err := importer.OpenEngine(ctx, &backend.EngineConfig{}, s.tr.tableName, 0) require.NoError(s.T(), err) @@ -686,7 +686,8 @@ func (s *chunkRestoreSuite) TestRestore() { err = s.cr.process(ctx, s.tr, 0, dataWriter, indexWriter, &Controller{ cfg: s.cfg, saveCpCh: saveCpCh, - backend: importer, + engineMgr: backend.MakeEngineManager(mockBackend), + backend: mockBackend, pauser: DeliverPauser, encBuilder: mockEncBuilder, }) diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index cb1f53305d7db..a50d68d825ec8 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -202,6 +202,7 @@ type Controller struct { ioWorkers *worker.Pool checksumWorks *worker.Pool pauser *common.Pauser + engineMgr backend.EngineManager backend backend.Backend tidbGlue glue.Glue @@ -369,7 +370,7 @@ func NewImportControllerWithPauser( DB: db, } backendConfig := local.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName) - backendObj, err = local.NewLocalBackend(ctx, tls, backendConfig, regionSizeGetter) + backendObj, err = local.NewBackend(ctx, tls, backendConfig, regionSizeGetter) if err != nil { return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err) } @@ -439,6 +440,7 @@ func NewImportControllerWithPauser( ioWorkers: ioWorkers, checksumWorks: worker.NewPool(ctx, cfg.TiDB.ChecksumTableConcurrency, "checksum"), pauser: p.Pauser, + engineMgr: backend.MakeEngineManager(backendObj), backend: backendObj, tidbGlue: p.Glue, sysVars: common.DefaultImportantVariables, @@ -1989,6 +1991,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) { return } + localBackend := rc.backend.(*local.Backend) go func() { // locker is assigned when we detect the disk quota is exceeded. // before the disk quota is confirmed exceeded, we keep the diskQuotaLock @@ -2016,7 +2019,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) { } quota := int64(rc.cfg.TikvImporter.DiskQuota) - largeEngines, inProgressLargeEngines, totalDiskSize, totalMemSize := rc.backend.CheckDiskQuota(quota) + largeEngines, inProgressLargeEngines, totalDiskSize, totalMemSize := local.CheckDiskQuota(localBackend, quota) if m, ok := metric.FromContext(ctx); ok { m.LocalStorageUsageBytesGauge.WithLabelValues("disk").Set(float64(totalDiskSize)) m.LocalStorageUsageBytesGauge.WithLabelValues("mem").Set(float64(totalMemSize)) @@ -2047,7 +2050,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) { } // flush all engines so that checkpoints can be updated. - if err := rc.backend.FlushAll(ctx); err != nil { + if err := rc.backend.FlushAllEngines(ctx); err != nil { logger.Error("flush engine for disk quota failed, check again later", log.ShortError(err)) return } @@ -2060,7 +2063,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) { var importErr error for _, engine := range largeEngines { // Use a larger split region size to avoid split the same region by many times. - if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio)); err != nil { + if err := localBackend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio)); err != nil { importErr = multierr.Append(importErr, err) } } diff --git a/br/pkg/lightning/importer/restore_schema_test.go b/br/pkg/lightning/importer/restore_schema_test.go index 7a36674a6a4bf..82a30777bb429 100644 --- a/br/pkg/lightning/importer/restore_schema_test.go +++ b/br/pkg/lightning/importer/restore_schema_test.go @@ -140,8 +140,9 @@ func (s *restoreSchemaSuite) SetupTest() { AnyTimes(). Return(s.tableInfos, nil) mockBackend.EXPECT().Close() - theBackend := backend.MakeBackend(mockBackend) - s.rc.backend = theBackend + theBackend := backend.MakeEngineManager(mockBackend) + s.rc.engineMgr = theBackend + s.rc.backend = mockBackend s.targetInfoGetter.backend = mockTargetInfoGetter mockDB, sqlMock, err := sqlmock.New() diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index 0fc9f15283ebd..214bfffb247fb 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -391,7 +391,7 @@ func (tr *TableImporter) importEngines(pCtx context.Context, rc *Controller, cp continue } if engine.Status < checkpoints.CheckpointStatusAllWritten { - indexEngine, err = rc.backend.OpenEngine(ctx, idxEngineCfg, tr.tableName, common.IndexEngineID) + indexEngine, err = rc.engineMgr.OpenEngine(ctx, idxEngineCfg, tr.tableName, common.IndexEngineID) if err != nil { return errors.Trace(err) } @@ -482,7 +482,7 @@ func (tr *TableImporter) importEngines(pCtx context.Context, rc *Controller, cp if indexEngine != nil { closedIndexEngine, restoreErr = indexEngine.Close(ctx) } else { - closedIndexEngine, restoreErr = rc.backend.UnsafeCloseEngine(ctx, idxEngineCfg, tr.tableName, common.IndexEngineID) + closedIndexEngine, restoreErr = rc.engineMgr.UnsafeCloseEngine(ctx, idxEngineCfg, tr.tableName, common.IndexEngineID) } if err = rc.saveStatusCheckpoint(ctx, tr.tableName, common.IndexEngineID, restoreErr, checkpoints.CheckpointStatusClosed); err != nil { @@ -491,7 +491,7 @@ func (tr *TableImporter) importEngines(pCtx context.Context, rc *Controller, cp } else if indexEngineCp.Status == checkpoints.CheckpointStatusClosed { // If index engine file has been closed but not imported only if context cancel occurred // when `importKV()` execution, so `UnsafeCloseEngine` and continue import it. - closedIndexEngine, restoreErr = rc.backend.UnsafeCloseEngine(ctx, idxEngineCfg, tr.tableName, common.IndexEngineID) + closedIndexEngine, restoreErr = rc.engineMgr.UnsafeCloseEngine(ctx, idxEngineCfg, tr.tableName, common.IndexEngineID) } if restoreErr != nil { return errors.Trace(restoreErr) @@ -552,7 +552,7 @@ func (tr *TableImporter) preprocessEngine( engineCfg := &backend.EngineConfig{ TableInfo: tr.tableInfo, } - closedEngine, err := rc.backend.UnsafeCloseEngine(ctx, engineCfg, tr.tableName, engineID) + closedEngine, err := rc.engineMgr.UnsafeCloseEngine(ctx, engineCfg, tr.tableName, engineID) // If any error occurred, recycle worker immediately if err != nil { return closedEngine, errors.Trace(err) @@ -577,6 +577,7 @@ func (tr *TableImporter) preprocessEngine( tr.tableInfo.Core.Partition == nil dataWriterCfg := &backend.LocalWriterConfig{ IsKVSorted: hasAutoIncrementAutoID, + TableName: tr.tableName, } logTask := tr.logger.With(zap.Int32("engineNumber", engineID)).Begin(zap.InfoLevel, "encode kv data and write") @@ -588,7 +589,7 @@ func (tr *TableImporter) preprocessEngine( dataEngineCfg.Local.CompactConcurrency = 4 dataEngineCfg.Local.CompactThreshold = local.CompactionUpperThreshold } - dataEngine, err := rc.backend.OpenEngine(ctx, dataEngineCfg, tr.tableName, engineID) + dataEngine, err := rc.engineMgr.OpenEngine(ctx, dataEngineCfg, tr.tableName, engineID) if err != nil { return nil, errors.Trace(err) } @@ -688,7 +689,7 @@ ChunkLoop: break } - indexWriter, err := indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{}) + indexWriter, err := indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: tr.tableName}) if err != nil { _, _ = dataWriter.Close(ctx) setError(err) @@ -924,7 +925,7 @@ func (tr *TableImporter) postProcess( // if we came here, it must be a local backend. // todo: remove this cast after we refactor the backend interface. Physical mode is so different, we shouldn't // try to abstract it with logical mode. - localBackend := rc.backend.Inner().(*local.Local) + localBackend := rc.backend.(*local.Backend) dupeController := localBackend.GetDupeController(rc.cfg.TikvImporter.RangeConcurrency*2, rc.errorMgr) hasDupe := false if rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone { diff --git a/br/pkg/lightning/importer/table_import_test.go b/br/pkg/lightning/importer/table_import_test.go index bc8c8cca81c1a..8dfef9ed1d654 100644 --- a/br/pkg/lightning/importer/table_import_test.go +++ b/br/pkg/lightning/importer/table_import_test.go @@ -38,7 +38,6 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" - "github.com/pingcap/tidb/br/pkg/lightning/backend/noop" "github.com/pingcap/tidb/br/pkg/lightning/backend/tidb" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" @@ -360,7 +359,9 @@ func (s *tableRestoreSuite) TestPopulateChunks() { type errorLocalWriter struct{} -func (w errorLocalWriter) AppendRows(context.Context, string, []string, encode.Rows) error { +var _ backend.EngineWriter = (*errorLocalWriter)(nil) + +func (w errorLocalWriter) AppendRows(context.Context, []string, encode.Rows) error { return errors.New("mock write rows failed") } @@ -383,7 +384,8 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() { ioWorkers: worker.NewPool(ctx, 1, "io"), regionWorkers: worker.NewPool(ctx, 10, "region"), store: s.store, - backend: backend.MakeBackend(mockBackend), + engineMgr: backend.MakeEngineManager(mockBackend), + backend: mockBackend, errorSummaries: makeErrorSummaries(log.L()), saveCpCh: make(chan saveCp, 1), encBuilder: mockEncBuilder, @@ -401,6 +403,13 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() { err := s.tr.populateChunks(ctx, rc, cp) require.NoError(s.T(), err) + mockChunkFlushStatus := mock.NewMockChunkFlushStatus(ctrl) + mockChunkFlushStatus.EXPECT().Flushed().Return(true).AnyTimes() + mockEngineWriter := mock.NewMockEngineWriter(ctrl) + mockEngineWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockEngineWriter.EXPECT().IsSynced().Return(true).AnyTimes() + mockEngineWriter.EXPECT().Close(gomock.Any()).Return(mockChunkFlushStatus, nil).AnyTimes() + tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), s.tableInfo.Core) require.NoError(s.T(), err) _, indexUUID := backend.MakeUUID("`db`.`table`", -1) @@ -413,10 +422,10 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() { Return(realEncBuilder.NewEncoder(ctx, &encode.EncodingConfig{Table: tbl})). AnyTimes() mockEncBuilder.EXPECT().MakeEmptyRows().Return(realEncBuilder.MakeEmptyRows()).AnyTimes() - mockBackend.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), dataUUID).Return(noop.Writer{}, nil) + mockBackend.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), dataUUID).Return(mockEngineWriter, nil) mockBackend.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), indexUUID). Return(nil, errors.New("mock open index local writer failed")) - openedIdxEngine, err := rc.backend.OpenEngine(ctx, nil, "`db`.`table`", -1) + openedIdxEngine, err := rc.engineMgr.OpenEngine(ctx, nil, "`db`.`table`", -1) require.NoError(s.T(), err) // open the first engine meet error, should directly return the error @@ -429,7 +438,7 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() { case <-ctx.Done(): return nil, errors.New("mock open index local writer failed after ctx.Done") default: - return noop.Writer{}, nil + return mockEngineWriter, nil } } mockBackend.EXPECT().OpenEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) @@ -438,7 +447,7 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() { mockBackend.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), indexUUID). DoAndReturn(localWriter).AnyTimes() - openedIdxEngine, err = rc.backend.OpenEngine(ctx, nil, "`db`.`table`", -1) + openedIdxEngine, err = rc.engineMgr.OpenEngine(ctx, nil, "`db`.`table`", -1) require.NoError(s.T(), err) // open engine failed after write rows failed, should return write rows error @@ -848,7 +857,7 @@ func (s *tableRestoreSuite) TestImportKVSuccess() { controller := gomock.NewController(s.T()) defer controller.Finish() mockBackend := mock.NewMockBackend(controller) - importer := backend.MakeBackend(mockBackend) + importer := backend.MakeEngineManager(mockBackend) chptCh := make(chan saveCp) defer close(chptCh) rc := &Controller{saveCpCh: chptCh, cfg: config.NewConfig()} @@ -883,7 +892,7 @@ func (s *tableRestoreSuite) TestImportKVFailure() { controller := gomock.NewController(s.T()) defer controller.Finish() mockBackend := mock.NewMockBackend(controller) - importer := backend.MakeBackend(mockBackend) + importer := backend.MakeEngineManager(mockBackend) chptCh := make(chan saveCp) defer close(chptCh) rc := &Controller{saveCpCh: chptCh, cfg: config.NewConfig()} @@ -966,6 +975,19 @@ func (s *tableRestoreSuite) TestTableRestoreMetrics() { dbInfos := map[string]*checkpoints.TidbDBInfo{ s.tableInfo.DB: s.dbInfo, } + mockChunkFlushStatus := mock.NewMockChunkFlushStatus(controller) + mockChunkFlushStatus.EXPECT().Flushed().Return(true).AnyTimes() + mockEngineWriter := mock.NewMockEngineWriter(controller) + mockEngineWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockEngineWriter.EXPECT().IsSynced().Return(true).AnyTimes() + mockEngineWriter.EXPECT().Close(gomock.Any()).Return(mockChunkFlushStatus, nil).AnyTimes() + backendObj := mock.NewMockBackend(controller) + backendObj.EXPECT().OpenEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + backendObj.EXPECT().CloseEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + backendObj.EXPECT().ImportEngine(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + backendObj.EXPECT().CleanupEngine(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + backendObj.EXPECT().ShouldPostProcess().Return(false).AnyTimes() + backendObj.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockEngineWriter, nil).AnyTimes() rc := &Controller{ cfg: cfg, dbMetas: dbMetas, @@ -977,7 +999,8 @@ func (s *tableRestoreSuite) TestTableRestoreMetrics() { checksumWorks: worker.NewPool(ctx, 2, "region"), saveCpCh: chptCh, pauser: DeliverPauser, - backend: noop.NewNoopBackend(), + engineMgr: backend.MakeEngineManager(backendObj), + backend: backendObj, tidbGlue: g, errorSummaries: makeErrorSummaries(log.L()), tls: tls, diff --git a/br/pkg/mock/backend.go b/br/pkg/mock/backend.go index f9fecf65866fb..e32c73f3ceb7b 100644 --- a/br/pkg/mock/backend.go +++ b/br/pkg/mock/backend.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/pingcap/tidb/br/pkg/lightning/backend (interfaces: AbstractBackend,EngineWriter,TargetInfoGetter) +// Source: github.com/pingcap/tidb/br/pkg/lightning/backend (interfaces: Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus) // Package mock is a generated GoMock package. package mock @@ -16,7 +16,7 @@ import ( model "github.com/pingcap/tidb/parser/model" ) -// MockBackend is a mock of AbstractBackend interface. +// MockBackend is a mock of Backend interface. type MockBackend struct { ctrl *gomock.Controller recorder *MockBackendMockRecorder @@ -79,20 +79,6 @@ func (mr *MockBackendMockRecorder) CloseEngine(arg0, arg1, arg2 interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseEngine", reflect.TypeOf((*MockBackend)(nil).CloseEngine), arg0, arg1, arg2) } -// EngineFileSizes mocks base method. -func (m *MockBackend) EngineFileSizes() []backend.EngineFileSize { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "EngineFileSizes") - ret0, _ := ret[0].([]backend.EngineFileSize) - return ret0 -} - -// EngineFileSizes indicates an expected call of EngineFileSizes. -func (mr *MockBackendMockRecorder) EngineFileSizes() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EngineFileSizes", reflect.TypeOf((*MockBackend)(nil).EngineFileSizes)) -} - // FlushAllEngines mocks base method. func (m *MockBackend) FlushAllEngines(arg0 context.Context) error { m.ctrl.T.Helper() @@ -206,20 +192,6 @@ func (mr *MockBackendMockRecorder) ShouldPostProcess() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShouldPostProcess", reflect.TypeOf((*MockBackend)(nil).ShouldPostProcess)) } -// TotalMemoryConsume mocks base method. -func (m *MockBackend) TotalMemoryConsume() int64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TotalMemoryConsume") - ret0, _ := ret[0].(int64) - return ret0 -} - -// TotalMemoryConsume indicates an expected call of TotalMemoryConsume. -func (mr *MockBackendMockRecorder) TotalMemoryConsume() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalMemoryConsume", reflect.TypeOf((*MockBackend)(nil).TotalMemoryConsume)) -} - // MockEngineWriter is a mock of EngineWriter interface. type MockEngineWriter struct { ctrl *gomock.Controller @@ -244,17 +216,17 @@ func (m *MockEngineWriter) EXPECT() *MockEngineWriterMockRecorder { } // AppendRows mocks base method. -func (m *MockEngineWriter) AppendRows(arg0 context.Context, arg1 string, arg2 []string, arg3 encode.Rows) error { +func (m *MockEngineWriter) AppendRows(arg0 context.Context, arg1 []string, arg2 encode.Rows) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AppendRows", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "AppendRows", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // AppendRows indicates an expected call of AppendRows. -func (mr *MockEngineWriterMockRecorder) AppendRows(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockEngineWriterMockRecorder) AppendRows(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppendRows", reflect.TypeOf((*MockEngineWriter)(nil).AppendRows), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppendRows", reflect.TypeOf((*MockEngineWriter)(nil).AppendRows), arg0, arg1, arg2) } // Close mocks base method. @@ -337,3 +309,40 @@ func (mr *MockTargetInfoGetterMockRecorder) FetchRemoteTableModels(arg0, arg1 in mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchRemoteTableModels", reflect.TypeOf((*MockTargetInfoGetter)(nil).FetchRemoteTableModels), arg0, arg1) } + +// MockChunkFlushStatus is a mock of ChunkFlushStatus interface. +type MockChunkFlushStatus struct { + ctrl *gomock.Controller + recorder *MockChunkFlushStatusMockRecorder +} + +// MockChunkFlushStatusMockRecorder is the mock recorder for MockChunkFlushStatus. +type MockChunkFlushStatusMockRecorder struct { + mock *MockChunkFlushStatus +} + +// NewMockChunkFlushStatus creates a new mock instance. +func NewMockChunkFlushStatus(ctrl *gomock.Controller) *MockChunkFlushStatus { + mock := &MockChunkFlushStatus{ctrl: ctrl} + mock.recorder = &MockChunkFlushStatusMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChunkFlushStatus) EXPECT() *MockChunkFlushStatusMockRecorder { + return m.recorder +} + +// Flushed mocks base method. +func (m *MockChunkFlushStatus) Flushed() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Flushed") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Flushed indicates an expected call of Flushed. +func (mr *MockChunkFlushStatusMockRecorder) Flushed() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flushed", reflect.TypeOf((*MockChunkFlushStatus)(nil).Flushed)) +} diff --git a/br/pkg/mock/mocklocal/BUILD.bazel b/br/pkg/mock/mocklocal/BUILD.bazel new file mode 100644 index 0000000000000..d59aca0e88554 --- /dev/null +++ b/br/pkg/mock/mocklocal/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "mocklocal", + srcs = ["local.go"], + importpath = "github.com/pingcap/tidb/br/pkg/mock/mocklocal", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/lightning/backend", + "@com_github_golang_mock//gomock", + ], +) diff --git a/br/pkg/mock/mocklocal/local.go b/br/pkg/mock/mocklocal/local.go new file mode 100644 index 0000000000000..555f24cc69f31 --- /dev/null +++ b/br/pkg/mock/mocklocal/local.go @@ -0,0 +1,49 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/pingcap/tidb/br/pkg/lightning/backend/local (interfaces: DiskUsage) + +// Package mocklocal is a generated GoMock package. +package mocklocal + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + backend "github.com/pingcap/tidb/br/pkg/lightning/backend" +) + +// MockDiskUsage is a mock of DiskUsage interface. +type MockDiskUsage struct { + ctrl *gomock.Controller + recorder *MockDiskUsageMockRecorder +} + +// MockDiskUsageMockRecorder is the mock recorder for MockDiskUsage. +type MockDiskUsageMockRecorder struct { + mock *MockDiskUsage +} + +// NewMockDiskUsage creates a new mock instance. +func NewMockDiskUsage(ctrl *gomock.Controller) *MockDiskUsage { + mock := &MockDiskUsage{ctrl: ctrl} + mock.recorder = &MockDiskUsageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDiskUsage) EXPECT() *MockDiskUsageMockRecorder { + return m.recorder +} + +// EngineFileSizes mocks base method. +func (m *MockDiskUsage) EngineFileSizes() []backend.EngineFileSize { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EngineFileSizes") + ret0, _ := ret[0].([]backend.EngineFileSize) + return ret0 +} + +// EngineFileSizes indicates an expected call of EngineFileSizes. +func (mr *MockDiskUsageMockRecorder) EngineFileSizes() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EngineFileSizes", reflect.TypeOf((*MockDiskUsage)(nil).EngineFileSizes)) +} diff --git a/ddl/ingest/backend.go b/ddl/ingest/backend.go index be913a87a7576..e2c73697ff78d 100644 --- a/ddl/ingest/backend.go +++ b/ddl/ingest/backend.go @@ -17,7 +17,6 @@ package ingest import ( "context" - "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" lightning "github.com/pingcap/tidb/br/pkg/lightning/config" @@ -34,7 +33,7 @@ import ( // BackendContext store a backend info for add index reorg task. type BackendContext struct { jobID int64 - backend *backend.Backend + backend *local.Backend ctx context.Context cfg *lightning.Config EngMgr engineManager @@ -62,7 +61,7 @@ func (bc *BackendContext) FinishImport(indexID int64, unique bool, tbl table.Tab // backend must be a local backend. // todo: when we can separate local backend completely from tidb backend, will remove this cast. //nolint:forcetypeassert - dupeController := bc.backend.Inner().(*local.Local).GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr) + dupeController := bc.backend.GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr) hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{ SQLMode: mysql.ModeStrictAllTables, SysVars: bc.sysVars, diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index d5a19a85b485e..5ef5544962896 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -20,7 +20,6 @@ import ( "fmt" "math" - "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/config" @@ -65,7 +64,7 @@ func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int return nil, err } - bcCtx := newBackendContext(ctx, jobID, &bd, cfg.Lightning, defaultImportantVariables, m.memRoot, m.diskRoot) + bcCtx := newBackendContext(ctx, jobID, bd, cfg.Lightning, defaultImportantVariables, m.memRoot, m.diskRoot) m.Store(jobID, bcCtx) m.memRoot.Consume(StructSizeBackendCtx) @@ -78,26 +77,26 @@ func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int return bc, nil } -func createLocalBackend(ctx context.Context, cfg *Config, glue glue.Glue) (backend.Backend, error) { +func createLocalBackend(ctx context.Context, cfg *Config, glue glue.Glue) (*local.Backend, error) { tls, err := cfg.Lightning.ToTLS() if err != nil { logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Error(err)) - return backend.Backend{}, err + return nil, err } logutil.BgLogger().Info("[ddl-ingest] create local backend for adding index", zap.String("keyspaceName", cfg.KeyspaceName)) db, err := glue.GetDB() if err != nil { - return backend.Backend{}, err + return nil, err } regionSizeGetter := &local.TableRegionSizeGetterImpl{ DB: db, } backendConfig := local.NewBackendConfig(cfg.Lightning, int(LitRLimit), cfg.KeyspaceName) - return local.NewLocalBackend(ctx, tls, backendConfig, regionSizeGetter) + return local.NewBackend(ctx, tls, backendConfig, regionSizeGetter) } -func newBackendContext(ctx context.Context, jobID int64, be *backend.Backend, +func newBackendContext(ctx context.Context, jobID int64, be *local.Backend, cfg *config.Config, vars map[string]string, memRoot MemRoot, diskRoot DiskRoot) *BackendContext { bc := &BackendContext{ jobID: jobID, @@ -133,7 +132,7 @@ func (m *backendCtxManager) TotalDiskUsage() uint64 { for _, key := range m.Keys() { bc, exists := m.Load(key) if exists { - _, _, bcDiskUsed, _ := bc.backend.CheckDiskQuota(math.MaxInt64) + _, _, bcDiskUsed, _ := local.CheckDiskQuota(bc.backend, math.MaxInt64) totalDiskUsed += uint64(bcDiskUsed) } } diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index 3d894f867f088..001102674dac7 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -40,7 +40,7 @@ type EngineInfo struct { uuid uuid.UUID cfg *backend.EngineConfig writerCount int - writerCache generic.SyncMap[int, *backend.LocalEngineWriter] + writerCache generic.SyncMap[int, backend.EngineWriter] memRoot MemRoot diskRoot DiskRoot rowSeq atomic.Int64 @@ -58,7 +58,7 @@ func NewEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.Engin openedEngine: en, uuid: uuid, writerCount: wCnt, - writerCache: generic.NewSyncMap[int, *backend.LocalEngineWriter](wCnt), + writerCache: generic.NewSyncMap[int, backend.EngineWriter](wCnt), memRoot: memRoot, diskRoot: diskRoot, } @@ -162,7 +162,7 @@ func (ei *EngineInfo) ImportAndClean() error { type WriterContext struct { ctx context.Context unique bool - lWrite *backend.LocalEngineWriter + lWrite backend.EngineWriter } // NewWriterCtx creates a new WriterContext. @@ -238,5 +238,5 @@ func (wCtx *WriterContext) WriteRow(key, idxVal []byte, handle tidbkv.Handle) er kvs[0].RowID = handle.Encoded() } row := kv.MakeRowsFromKvPairs(kvs) - return wCtx.lWrite.WriteRows(wCtx.ctx, nil, row) + return wCtx.lWrite.AppendRows(wCtx.ctx, nil, row) } diff --git a/ddl/ingest/engine_mgr.go b/ddl/ingest/engine_mgr.go index 9be0849549fdc..bccd17aed6a96 100644 --- a/ddl/ingest/engine_mgr.go +++ b/ddl/ingest/engine_mgr.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/generic" "github.com/pingcap/tidb/util/logutil" @@ -55,8 +56,9 @@ func (m *engineManager) Register(bc *BackendContext, jobID, indexID int64, schem return nil, genEngineAllocMemFailedErr(m.MemRoot, bc.jobID, indexID) } + mgr := backend.MakeEngineManager(bc.backend) cfg := generateLocalEngineConfig(jobID, schemaName, tableName) - openedEn, err := bc.backend.OpenEngine(bc.ctx, cfg, tableName, int32(indexID)) + openedEn, err := mgr.OpenEngine(bc.ctx, cfg, tableName, int32(indexID)) if err != nil { logutil.BgLogger().Warn(LitErrCreateEngineFail, zap.Int64("job ID", jobID), zap.Int64("index ID", indexID), zap.Error(err)) diff --git a/disttask/loaddata/proto.go b/disttask/loaddata/proto.go index 71af86f070ec1..ebb53f061c048 100644 --- a/disttask/loaddata/proto.go +++ b/disttask/loaddata/proto.go @@ -51,7 +51,7 @@ type MinimalTaskMeta struct { Format Format Dir string Chunk Chunk - Writer *backend.LocalEngineWriter + Writer backend.EngineWriter } // IsMinimalTask implements the MinimalTask interface. diff --git a/executor/importer/chunk_process.go b/executor/importer/chunk_process.go index 5e477c84d3fd1..799c55c3ee529 100644 --- a/executor/importer/chunk_process.go +++ b/executor/importer/chunk_process.go @@ -114,8 +114,8 @@ type chunkProcessor struct { chunkInfo *checkpoints.ChunkCheckpoint logger *zap.Logger kvsCh chan []deliveredRow - dataWriter *backend.LocalEngineWriter - indexWriter *backend.LocalEngineWriter + dataWriter backend.EngineWriter + indexWriter backend.EngineWriter checksum verify.KVChecksum encoder kvEncoder @@ -258,13 +258,13 @@ func (p *chunkProcessor) deliverLoop(ctx context.Context) error { err := func() error { // todo: disk quota related code from lightning, removed temporary - if err := p.dataWriter.WriteRows(ctx, nil, &kvBatch.dataKVs); err != nil { + if err := p.dataWriter.AppendRows(ctx, nil, &kvBatch.dataKVs); err != nil { if !common.IsContextCanceledError(err) { p.logger.Error("write to data engine failed", log.ShortError(err)) } return errors.Trace(err) } - if err := p.indexWriter.WriteRows(ctx, nil, &kvBatch.indexKVs); err != nil { + if err := p.indexWriter.AppendRows(ctx, nil, &kvBatch.indexKVs); err != nil { if !common.IsContextCanceledError(err) { p.logger.Error("write to index engine failed", log.ShortError(err)) } diff --git a/executor/importer/engine_process.go b/executor/importer/engine_process.go index 4330d62767078..479e52150980c 100644 --- a/executor/importer/engine_process.go +++ b/executor/importer/engine_process.go @@ -32,7 +32,7 @@ import ( type engineProcessor struct { engineID int32 fullTableName string - backend backend.Backend + backend *local.Backend tableInfo *checkpoints.TidbTableInfo logger *zap.Logger tableImporter *tableImporter @@ -53,7 +53,8 @@ func (ep *engineProcessor) process(ctx context.Context) (*backend.ClosedEngine, dataEngineCfg.Local.CompactConcurrency = 4 dataEngineCfg.Local.CompactThreshold = local.CompactionUpperThreshold } - dataEngine, err := ep.backend.OpenEngine(ctx, dataEngineCfg, ep.fullTableName, ep.engineID) + mgr := backend.MakeEngineManager(ep.backend) + dataEngine, err := mgr.OpenEngine(ctx, dataEngineCfg, ep.fullTableName, ep.engineID) if err != nil { return nil, err } @@ -89,7 +90,7 @@ func (ep *engineProcessor) localSort(ctx context.Context, dataEngine *backend.Op var ( parser mydump.Parser encoder kvEncoder - dataWriter, indexWriter *backend.LocalEngineWriter + dataWriter, indexWriter backend.EngineWriter ) closer.reset() parser, err = ep.tableImporter.getParser(ctx, chunk) diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index 1a4173e466012..90ca76a1df312 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -147,7 +147,7 @@ func newTableImporter(ctx context.Context, e *LoadDataController) (ti *tableImpo // todo: use a real region size getter regionSizeGetter := &local.TableRegionSizeGetterImpl{} - localBackend, err := local.NewLocalBackend(ctx, tls, backendConfig, regionSizeGetter) + localBackend, err := local.NewBackend(ctx, tls, backendConfig, regionSizeGetter) if err != nil { return nil, err } @@ -177,7 +177,7 @@ func newTableImporter(ctx context.Context, e *LoadDataController) (ti *tableImpo type tableImporter struct { *LoadDataController - backend backend.Backend + backend *local.Backend tableCp *checkpoints.TableCheckpoint tableInfo *checkpoints.TidbTableInfo tableMeta *mydump.MDTableMeta @@ -339,7 +339,8 @@ func (ti *tableImporter) importEngines(ctx context.Context) error { // todo: cleanup all engine data on any error since we don't support checkpoint for now // some return path, didn't make sure all data engine and index engine are cleaned up. // maybe we can add this in upper level to clean the whole local-sort directory - indexEngine, err := ti.backend.OpenEngine(ctx, idxEngineCfg, fullTableName, common.IndexEngineID) + mgr := backend.MakeEngineManager(ti.backend) + indexEngine, err := mgr.OpenEngine(ctx, idxEngineCfg, fullTableName, common.IndexEngineID) if err != nil { return errors.Trace(err) }