Skip to content

Commit

Permalink
lightning: refactor to reuse in load data part 5 (#42856)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
D3Hunter authored Apr 11, 2023
1 parent 9949a54 commit d250a3f
Show file tree
Hide file tree
Showing 34 changed files with 567 additions and 698 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,9 @@ mock_s3iface:
@mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

mock_lightning:
@mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter,TargetInfoGetter > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
@mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage > br/pkg/mock/mocklocal/local.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_golang_x_exp//slices",
"@org_uber_go_zap//:zap",
],
)
Expand Down
229 changes: 53 additions & 176 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,12 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/parser/model"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

const (
importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
)

/*
Usual workflow:
1. Create a `Backend` for the whole process.
2. For each table,
i. Split into multiple "batches" consisting of data files with roughly equal total size.
ii. For each batch,
a. Create an `OpenedEngine` via `backend.OpenEngine()`
b. For each chunk, deliver data into the engine via `engine.WriteRows()`
c. When all chunks are written, obtain a `ClosedEngine` via `engine.Close()`
d. Import data via `engine.Import()`
e. Cleanup via `engine.Cleanup()`
3. Close the connection via `backend.Close()`
*/

func makeTag(tableName string, engineID int32) string {
return fmt.Sprintf("%s:%d", tableName, engineID)
}
Expand Down Expand Up @@ -99,7 +72,10 @@ type EngineFileSize struct {
// LocalWriterConfig defines the configuration to open a LocalWriter
type LocalWriterConfig struct {
// is the chunk KV written to this LocalWriter sent in order
// only needed for local backend, can omit for tidb backend
IsKVSorted bool
// only needed for tidb backend, can omit for local backend
TableName string
}

// EngineConfig defines configuration used for open engine
Expand Down Expand Up @@ -145,10 +121,21 @@ type TargetInfoGetter interface {
CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error
}

// AbstractBackend is the abstract interface behind Backend.
// Backend defines the interface for a backend.
// Implementations of this interface must be goroutine safe: you can share an
// instance and execute any method anywhere.
type AbstractBackend interface {
// Usual workflow:
// 1. Create a `Backend` for the whole process.
// 2. For each table,
// i. Split into multiple "batches" consisting of data files with roughly equal total size.
// ii. For each batch,
// a. Create an `OpenedEngine` via `backend.OpenEngine()`
// b. For each chunk, deliver data into the engine via `engine.WriteRows()`
// c. When all chunks are written, obtain a `ClosedEngine` via `engine.Close()`
// d. Import data via `engine.Import()`
// e. Cleanup via `engine.Cleanup()`
// 3. Close the connection via `backend.Close()`
type Backend interface {
// Close the connection to the backend.
Close()

Expand Down Expand Up @@ -183,28 +170,22 @@ type AbstractBackend interface {
// (e.g. preparing to resolve a disk quota violation).
FlushAllEngines(ctx context.Context) error

// EngineFileSizes obtains the size occupied locally of all engines managed
// by this backend. This method is used to compute disk quota.
// It can return nil if the content are all stored remotely.
EngineFileSizes() []EngineFileSize

// ResetEngine clears all written KV pairs in this opened engine.
ResetEngine(ctx context.Context, engineUUID uuid.UUID) error

// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error)

// TotalMemoryConsume counts total memory usage. This is only used for local backend.
TotalMemoryConsume() int64
}

// Backend is the delivery target for Lightning
type Backend struct {
abstract AbstractBackend
// EngineManager is the manager of engines.
// this is a wrapper of Backend, which provides some common methods for managing engines.
// and it has no states, can be created on demand
type EngineManager struct {
backend Backend
}

type engine struct {
backend AbstractBackend
backend Backend
logger log.Logger
uuid uuid.UUID
// id of the engine, used to generate uuid and stored in checkpoint
Expand All @@ -221,106 +202,17 @@ type OpenedEngine struct {
config *EngineConfig
}

// // import_ the data written to the engine into the target.
// import_(ctx context.Context) error

// // cleanup deletes the imported data.
// cleanup(ctx context.Context) error

// ClosedEngine represents a closed engine, allowing ingestion into the target.
// This type is goroutine safe: you can share an instance and execute any method
// anywhere.
type ClosedEngine struct {
engine
}

// LocalEngineWriter is a thread-local writer for writing rows into a single engine.
type LocalEngineWriter struct {
writer EngineWriter
tableName string
}

// MakeBackend creates a new Backend from an AbstractBackend.
func MakeBackend(ab AbstractBackend) Backend {
return Backend{abstract: ab}
}

// Close the connection to the backend.
func (be Backend) Close() {
be.abstract.Close()
}

// ShouldPostProcess returns whether KV-specific post-processing should be
func (be Backend) ShouldPostProcess() bool {
return be.abstract.ShouldPostProcess()
}

// FlushAll flushes all opened engines.
func (be Backend) FlushAll(ctx context.Context) error {
return be.abstract.FlushAllEngines(ctx)
}

// TotalMemoryConsume returns the total memory consumed by the backend.
func (be Backend) TotalMemoryConsume() int64 {
return be.abstract.TotalMemoryConsume()
}

// CheckDiskQuota verifies if the total engine file size is below the given
// quota. If the quota is exceeded, this method returns an array of engines,
// which after importing can decrease the total size below quota.
func (be Backend) CheckDiskQuota(quota int64) (
largeEngines []uuid.UUID,
inProgressLargeEngines int,
totalDiskSize int64,
totalMemSize int64,
) {
sizes := be.abstract.EngineFileSizes()
slices.SortFunc(sizes, func(i, j EngineFileSize) bool {
if i.IsImporting != j.IsImporting {
return i.IsImporting
}
return i.DiskSize+i.MemSize < j.DiskSize+j.MemSize
})
for _, size := range sizes {
totalDiskSize += size.DiskSize
totalMemSize += size.MemSize
if totalDiskSize+totalMemSize > quota {
if size.IsImporting {
inProgressLargeEngines++
} else {
largeEngines = append(largeEngines, size.UUID)
}
}
}
return
}

// UnsafeImportAndReset forces the backend to import the content of an engine
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
engine: engine{
backend: be.abstract,
logger: makeLogger(log.FromContext(ctx), "<import-and-reset>", engineUUID),
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
// MakeEngineManager creates a new Backend from an Backend.
func MakeEngineManager(ab Backend) EngineManager {
return EngineManager{backend: ab}
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) {
func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
logger := makeLogger(log.FromContext(ctx), tag, engineUUID)

if err := be.abstract.OpenEngine(ctx, config, engineUUID); err != nil {
if err := be.backend.OpenEngine(ctx, config, engineUUID); err != nil {
return nil, err
}

Expand All @@ -346,7 +238,7 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam

return &OpenedEngine{
engine: engine{
backend: be.abstract,
backend: be.backend,
logger: logger,
uuid: engineUUID,
id: engineID,
Expand All @@ -356,11 +248,6 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam
}, nil
}

// Inner returns the underlying abstract backend.
func (be Backend) Inner() AbstractBackend {
return be.abstract
}

// Close the opened engine to prepare it for importing.
func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) {
closedEngine, err := engine.unsafeClose(ctx, engine.config)
Expand All @@ -378,40 +265,16 @@ func (engine *OpenedEngine) Flush(ctx context.Context) error {
}

// LocalWriter returns a writer that writes to the local backend.
func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (*LocalEngineWriter, error) {
w, err := engine.backend.LocalWriter(ctx, cfg, engine.uuid)
if err != nil {
return nil, err
}
return &LocalEngineWriter{writer: w, tableName: engine.tableName}, nil
}

// TotalMemoryConsume returns the total memory consumed by the engine.
func (engine *OpenedEngine) TotalMemoryConsume() int64 {
return engine.engine.backend.TotalMemoryConsume()
}

// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows encode.Rows) error {
return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
}

// Close closes the engine and returns the status of the engine.
func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
return w.writer.Close(ctx)
}

// IsSynced returns whether the engine is synced.
func (w *LocalEngineWriter) IsSynced() bool {
return w.writer.IsSynced()
func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (EngineWriter, error) {
return engine.backend.LocalWriter(ctx, cfg, engine.uuid)
}

// UnsafeCloseEngine closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error) {
func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID, engineID)
}
Expand All @@ -421,9 +284,9 @@ func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tabl
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, id int32) (*ClosedEngine, error) {
func (be EngineManager) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, id int32) (*ClosedEngine, error) {
return engine{
backend: be.abstract,
backend: be.backend,
logger: makeLogger(log.FromContext(ctx), tag, engineUUID),
uuid: engineUUID,
id: id,
Expand All @@ -445,6 +308,25 @@ func (en engine) GetID() int32 {
return en.id
}

// ClosedEngine represents a closed engine, allowing ingestion into the target.
// This type is goroutine safe: you can share an instance and execute any method
// anywhere.
type ClosedEngine struct {
engine
}

// NewClosedEngine creates a new ClosedEngine.
func NewClosedEngine(backend Backend, logger log.Logger, uuid uuid.UUID, id int32) *ClosedEngine {
return &ClosedEngine{
engine: engine{
backend: backend,
logger: logger,
uuid: uuid,
id: id,
},
}
}

// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
var err error
Expand Down Expand Up @@ -483,12 +365,7 @@ type ChunkFlushStatus interface {

// EngineWriter is the interface for writing data to an engine.
type EngineWriter interface {
AppendRows(
ctx context.Context,
tableName string,
columnNames []string,
rows encode.Rows,
) error
AppendRows(ctx context.Context, columnNames []string, rows encode.Rows) error
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
}
Expand Down
Loading

0 comments on commit d250a3f

Please sign in to comment.