Skip to content

Commit

Permalink
ddl: flush data in local engine serially (pingcap#43524) (pingcap#43571)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 9, 2023
1 parent b3436ba commit 6041250
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 38 deletions.
4 changes: 4 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -1595,6 +1596,7 @@ type addIndexIngestWorker struct {
writer ingest.Writer
copReqSenderPool *copReqSenderPool
checkpointMgr *ingest.CheckpointManager
flushLock *sync.RWMutex

resultCh chan *backfillResult
jobID int64
Expand Down Expand Up @@ -1652,6 +1654,8 @@ func writeChunkToLocal(writer ingest.Writer,
handleDataBuf := make([]types.Datum, len(copCtx.handleOutputOffsets))
count := 0
var lastHandle kv.Handle
unlock := writer.LockForWrite()
defer unlock()
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
idxDataBuf, handleDataBuf = idxDataBuf[:0], handleDataBuf[:0]
idxDataBuf = extractDatumByOffsets(row, copCtx.idxColOutputOffsets, copCtx.expColInfos, idxDataBuf)
Expand Down
50 changes: 29 additions & 21 deletions ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,38 +133,46 @@ func (bc *litBackendCtx) Flush(indexID int64, force bool) (flushed, imported boo
return false, false, dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found")
}

bc.diskRoot.UpdateUsage()
shouldImport := bc.diskRoot.ShouldImport()
shouldFlush := force ||
shouldImport ||
time.Since(bc.timeOfLastFlush.Load()) >= bc.updateInterval
shouldFlush, shouldImport := bc.ShouldSync(force)
if !shouldFlush {
return false, false, nil
}
if !ei.flushing.CompareAndSwap(false, true) {
return false, false, nil
}
defer ei.flushing.Store(false)
ei.flushLock.Lock()
defer ei.flushLock.Unlock()

bc.timeOfLastFlush.Store(time.Now())
err = ei.Flush()
if err != nil {
return false, false, err
}
bc.timeOfLastFlush.Store(time.Now())

if force || shouldImport {
release := ei.acquireFlushLock()
if release == nil {
return true, false, nil
}
defer release()
logutil.BgLogger().Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID),
if !shouldImport {
return true, false, nil
}
logutil.BgLogger().Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(lightning.SplitRegionSize)*int64(lightning.MaxSplitRegionSizeRatio), int64(lightning.SplitRegionKeys))
if err != nil {
logutil.BgLogger().Error(LitErrIngestDataErr, zap.Int64("index ID", indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(lightning.SplitRegionSize)*int64(lightning.MaxSplitRegionSizeRatio), int64(lightning.SplitRegionKeys))
if err != nil {
logutil.BgLogger().Error(LitErrIngestDataErr, zap.Int64("index ID", indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
return true, false, err
}
return true, true, nil
return true, false, err
}
return true, true, nil
}

func (bc *litBackendCtx) ShouldSync(force bool) (shouldFlush bool, shouldImport bool) {
if force {
return true, true
}
return true, false, nil
bc.diskRoot.UpdateUsage()
shouldImport = bc.diskRoot.ShouldImport()
shouldFlush = shouldImport ||
time.Since(bc.timeOfLastFlush.Load()) >= bc.updateInterval
return shouldFlush, shouldImport
}

// Done returns true if the lightning backfill is done.
Expand Down
16 changes: 15 additions & 1 deletion ddl/ingest/disk_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,32 @@ func (d *diskRootImpl) ShouldImport() bool {
d.mu.RLock()
defer d.mu.RUnlock()
if d.bcUsed > variable.DDLDiskQuota.Load() {
logutil.BgLogger().Info("[ddl-ingest] disk usage is over quota",
zap.Uint64("quota", variable.DDLDiskQuota.Load()),
zap.String("usage", d.usageInfo()))
return true
}
if d.used == 0 && d.capacity == 0 {
return false
}
return float64(d.used) >= float64(d.capacity)*capacityThreshold
if float64(d.used) >= float64(d.capacity)*capacityThreshold {
logutil.BgLogger().Warn("[ddl-ingest] available disk space is less than 10%, "+
"this may degrade the performance, "+
"please make sure the disk available space is larger than @@tidb_ddl_disk_quota before adding index",
zap.String("usage", d.usageInfo()))
return true
}
return false
}

// UsageInfo implements DiskRoot interface.
func (d *diskRootImpl) UsageInfo() string {
d.mu.RLock()
defer d.mu.RUnlock()
return d.usageInfo()
}

func (d *diskRootImpl) usageInfo() string {
return fmt.Sprintf("disk usage: %d/%d, backend usage: %d", d.used, d.capacity, d.bcUsed)
}

Expand Down
30 changes: 15 additions & 15 deletions ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ingest
import (
"context"
"strconv"
"sync"
"sync/atomic"

"github.com/google/uuid"
Expand All @@ -41,6 +42,7 @@ type Engine interface {
// Writer is the interface for the writer that can be used to write key-value pairs.
type Writer interface {
WriteRow(key, idxVal []byte, handle tidbkv.Handle) error
LockForWrite() (unlock func())
}

// engineInfo is the engine for one index reorg task, each task will create several new writers under the
Expand All @@ -55,14 +57,13 @@ type engineInfo struct {
writerCount int
writerCache generic.SyncMap[int, backend.EngineWriter]
memRoot MemRoot
diskRoot DiskRoot
rowSeq atomic.Int64
flushLock *sync.RWMutex
flushing atomic.Bool
}

// newEngineInfo create a new engineInfo struct.
func newEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.EngineConfig,
en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot, diskRoot DiskRoot) *engineInfo {
en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot) *engineInfo {
return &engineInfo{
ctx: ctx,
jobID: jobID,
Expand All @@ -73,7 +74,7 @@ func newEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.Engin
writerCount: wCnt,
writerCache: generic.NewSyncMap[int, backend.EngineWriter](wCnt),
memRoot: memRoot,
diskRoot: diskRoot,
flushLock: &sync.RWMutex{},
}
}

Expand All @@ -88,17 +89,6 @@ func (ei *engineInfo) Flush() error {
return nil
}

// acquireFlushLock acquires the flush lock of the engine.
func (ei *engineInfo) acquireFlushLock() (release func()) {
ok := ei.flushing.CompareAndSwap(false, true)
if !ok {
return nil
}
return func() {
ei.flushing.Store(false)
}
}

// Clean closes the engine and removes the local intermediate files.
func (ei *engineInfo) Clean() {
if ei.openedEngine == nil {
Expand Down Expand Up @@ -169,6 +159,7 @@ type writerContext struct {
ctx context.Context
unique bool
lWrite backend.EngineWriter
fLock *sync.RWMutex
}

// CreateWriter creates a new writerContext.
Expand Down Expand Up @@ -215,6 +206,7 @@ func (ei *engineInfo) newWriterContext(workerID int, unique bool) (*writerContex
ctx: ei.ctx,
unique: unique,
lWrite: lWrite,
fLock: ei.flushLock,
}
return wc, nil
}
Expand Down Expand Up @@ -246,3 +238,11 @@ func (wCtx *writerContext) WriteRow(key, idxVal []byte, handle tidbkv.Handle) er
row := kv.MakeRowsFromKvPairs(kvs)
return wCtx.lWrite.AppendRows(wCtx.ctx, nil, row)
}

// LockForWrite locks the local writer for write.
func (wCtx *writerContext) LockForWrite() (unlock func()) {
wCtx.fLock.RLock()
return func() {
wCtx.fLock.RUnlock()
}
}
2 changes: 1 addition & 1 deletion ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (bc *litBackendCtx) Register(jobID, indexID int64, schemaName, tableName st
return nil, errors.Trace(err)
}
id := openedEn.GetEngineUUID()
en = newEngineInfo(bc.ctx, jobID, indexID, cfg, openedEn, id, 1, bc.MemRoot, bc.DiskRoot)
en = newEngineInfo(bc.ctx, jobID, indexID, cfg, openedEn, id, 1, bc.MemRoot)
bc.Store(indexID, en)
bc.MemRoot.Consume(StructSizeEngineInfo)
bc.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), engineCacheSize)
Expand Down
5 changes: 5 additions & 0 deletions ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,8 @@ func (m *MockWriter) WriteRow(key, idxVal []byte, _ kv.Handle) error {
}
return txn.Set(key, idxVal)
}

// LockForWrite implements Writer.LockForWrite interface.
func (*MockWriter) LockForWrite() func() {
return func() {}
}

0 comments on commit 6041250

Please sign in to comment.