Skip to content

Commit

Permalink
external: onefile writer should have onClose, and add KV count to Wri…
Browse files Browse the repository at this point in the history
…teSummary (#50955)

ref #50832
  • Loading branch information
lance6716 authored Feb 4, 2024
1 parent 69917c0 commit ba77807
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 51 deletions.
22 changes: 1 addition & 21 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
Expand Down Expand Up @@ -119,19 +118,16 @@ func mergeOverlappingFilesInternal(
SetWriterBatchCount(writeBatchCount).
SetPropKeysDistance(propKeysDist).
SetPropSizeDistance(propSizeDist).
SetOnCloseFunc(onClose).
BuildOneFile(store, newFilePrefix, writerID)
err = writer.Init(ctx, partSize)
if err != nil {
return nil
}
var minKey, maxKey tidbkv.Key

// currently use same goroutine to do read and write. The main advantage is
// there's no KV copy and iter can reuse the buffer.
for iter.Next() {
if len(minKey) == 0 {
minKey = tidbkv.Key(iter.Key()).Clone()
}
err = writer.WriteRow(ctx, iter.Key(), iter.Value())
if err != nil {
return err
Expand All @@ -141,22 +137,6 @@ func mergeOverlappingFilesInternal(
if err != nil {
return err
}
maxKey = tidbkv.Key(iter.Key()).Clone()

var stat MultipleFilesStat
stat.Filenames = append(stat.Filenames,
[2]string{writer.dataFile, writer.statFile})
stat.build([]tidbkv.Key{minKey}, []tidbkv.Key{maxKey})
if onClose != nil {
onClose(&WriterSummary{
WriterID: writer.writerID,
Seq: 0,
Min: minKey,
Max: maxKey,
TotalSize: writer.totalSize,
MultipleFilesStats: []MultipleFilesStat{stat},
})
}

return writer.Close(ctx)
}
26 changes: 1 addition & 25 deletions br/pkg/lightning/backend/external/merge_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ func MergeOverlappingFilesV2(
SetPropKeysDistance(propKeysDist).
SetPropSizeDistance(propSizeDist).
SetOnCloseFunc(onClose).
BuildOneFile(
store,
newFilePrefix,
writerID)
BuildOneFile(store, newFilePrefix, writerID)
defer func() {
err = splitter.Close()
if err != nil {
Expand All @@ -102,7 +99,6 @@ func MergeOverlappingFilesV2(
loaded := &memKVsAndBuffers{}
curStart := kv.Key(startKey).Clone()
var curEnd kv.Key
var maxKey, minKey kv.Key

for {
endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err1 := splitter.SplitOneRangesGroup()
Expand Down Expand Up @@ -159,10 +155,6 @@ func MergeOverlappingFilesV2(
zap.Duration("write time", writeTime),
zap.Int("key len", len(loaded.keys)))

if len(minKey) == 0 {
minKey = kv.Key(loaded.keys[0]).Clone()
}
maxKey = kv.Key(loaded.keys[len(loaded.keys)-1]).Clone()
curStart = curEnd.Clone()
loaded.keys = nil
loaded.values = nil
Expand All @@ -172,21 +164,5 @@ func MergeOverlappingFilesV2(
break
}
}

var stat MultipleFilesStat
stat.Filenames = append(stat.Filenames,
[2]string{writer.dataFile, writer.statFile})

stat.build([]kv.Key{minKey}, []kv.Key{curEnd})
if onClose != nil {
onClose(&WriterSummary{
WriterID: writer.writerID,
Seq: 0,
Min: minKey,
Max: maxKey,
TotalSize: writer.totalSize,
MultipleFilesStats: []MultipleFilesStat{stat},
})
}
return
}
29 changes: 28 additions & 1 deletion br/pkg/lightning/backend/external/onefile_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"context"
"encoding/binary"
"path/filepath"
"slices"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
Expand All @@ -37,6 +39,7 @@ type OneFileWriter struct {

// Statistic information per writer.
totalSize uint64
totalCnt uint64
rc *rangePropertiesCollector

// file information.
Expand All @@ -47,7 +50,11 @@ type OneFileWriter struct {
dataWriter storage.ExternalFileWriter
statWriter storage.ExternalFileWriter

closed bool
onClose OnCloseFunc
closed bool

minKey []byte
maxKey []byte

logger *zap.Logger
}
Expand Down Expand Up @@ -86,6 +93,9 @@ func (w *OneFileWriter) Init(ctx context.Context, partSize int64) (err error) {

// WriteRow implements ingest.Writer.
func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) error {
if w.minKey == nil {
w.minKey = slices.Clone(idxKey)
}
// 1. encode data and write to kvStore.
keyLen := len(idxKey)
length := len(idxKey) + len(idxVal) + lengthBytes*2
Expand All @@ -111,11 +121,13 @@ func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) err
binary.BigEndian.AppendUint64(buf[:0], uint64(keyLen))
binary.BigEndian.AppendUint64(buf[lengthBytes:lengthBytes], uint64(len(idxVal)))
copy(buf[lengthBytes*2:], idxKey)
w.maxKey = buf[lengthBytes*2 : lengthBytes*2+keyLen]
copy(buf[lengthBytes*2+keyLen:], idxVal)
err := w.kvStore.addEncodedData(buf[:length])
if err != nil {
return err
}
w.totalCnt += 1
w.totalSize += uint64(keyLen + len(idxVal))
return nil
}
Expand All @@ -132,6 +144,21 @@ func (w *OneFileWriter) Close(ctx context.Context) error {
w.logger.Info("close one file writer",
zap.String("writerID", w.writerID))

maxKey := slices.Clone(w.maxKey)
var stat MultipleFilesStat
stat.Filenames = append(stat.Filenames,
[2]string{w.dataFile, w.statFile})
stat.build([]tidbkv.Key{w.minKey}, []tidbkv.Key{maxKey})
w.onClose(&WriterSummary{
WriterID: w.writerID,
Seq: 0,
Min: w.minKey,
Max: maxKey,
TotalSize: w.totalSize,
TotalCnt: w.totalCnt,
MultipleFilesStats: []MultipleFilesStat{stat},
})
w.totalCnt = 0
w.totalSize = 0
w.closed = true
return nil
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/onefile_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func TestOnefileWriterManyRows(t *testing.T) {
require.Equal(t, expected.Filenames, resSummary.MultipleFilesStats[0].Filenames)
require.Equal(t, expected.MaxOverlappingNum, resSummary.MultipleFilesStats[0].MaxOverlappingNum)
require.EqualValues(t, expectedTotalSize, resSummary.TotalSize)
require.EqualValues(t, kvCnt, resSummary.TotalCnt)
}

func TestOnefilePropOffset(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ type SortedKVMeta struct {
StartKey []byte `json:"start-key"`
EndKey []byte `json:"end-key"` // exclusive
TotalKVSize uint64 `json:"total-kv-size"`
TotalKVCnt uint64 `json:"total-kv-cnt"`
MultipleFilesStats []MultipleFilesStat `json:"multiple-files-stats"`
}

Expand All @@ -257,6 +258,7 @@ func NewSortedKVMeta(summary *WriterSummary) *SortedKVMeta {
StartKey: summary.Min.Clone(),
EndKey: summary.Max.Clone().Next(),
TotalKVSize: summary.TotalSize,
TotalKVCnt: summary.TotalCnt,
MultipleFilesStats: summary.MultipleFilesStats,
}
}
Expand All @@ -274,6 +276,7 @@ func (m *SortedKVMeta) Merge(other *SortedKVMeta) {
m.StartKey = BytesMin(m.StartKey, other.StartKey)
m.EndKey = BytesMax(m.EndKey, other.EndKey)
m.TotalKVSize += other.TotalKVSize
m.TotalKVCnt += other.TotalKVCnt

m.MultipleFilesStats = append(m.MultipleFilesStats, other.MultipleFilesStats...)
}
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type WriterSummary struct {
Min tidbkv.Key
Max tidbkv.Key
TotalSize uint64
TotalCnt uint64
MultipleFilesStats []MultipleFilesStat
}

Expand Down Expand Up @@ -211,8 +212,6 @@ func (b *WriterBuilder) Build(

// BuildOneFile builds a new one file Writer. The writer will create only one
// file under the prefix of "{prefix}/{writerID}".
//
// BuildOneFile will ignore SetOnCloseFunc.
func (b *WriterBuilder) BuildOneFile(
store storage.ExternalStorage,
prefix string,
Expand All @@ -233,6 +232,7 @@ func (b *WriterBuilder) BuildOneFile(
filenamePrefix: filenamePrefix,
writerID: writerID,
kvStore: nil,
onClose: b.onClose,
closed: false,
}
return ret
Expand Down Expand Up @@ -341,6 +341,7 @@ type Writer struct {
minKey tidbkv.Key
maxKey tidbkv.Key
totalSize uint64
totalCnt uint64
}

// WriteRow implements ingest.Writer.
Expand Down Expand Up @@ -372,6 +373,7 @@ func (w *Writer) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tid
w.kvLocations = append(w.kvLocations, loc)
w.kvSize += int64(encodedKeyLen + len(idxVal))
w.batchSize += uint64(length)
w.totalCnt += 1
return nil
}

Expand Down Expand Up @@ -409,6 +411,7 @@ func (w *Writer) Close(ctx context.Context) error {
Min: w.minKey,
Max: w.maxKey,
TotalSize: w.totalSize,
TotalCnt: w.totalCnt,
MultipleFilesStats: w.multiFileStats,
})
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//br/pkg/lightning/metric",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/verification",
"//br/pkg/logutil",
"//br/pkg/storage",
"//br/pkg/utils",
"//pkg/disttask/framework/handle",
Expand Down
14 changes: 12 additions & 2 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
brlogutil "github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
Expand Down Expand Up @@ -308,8 +309,8 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S

logger.Info("merge sort partSize", zap.String("size", units.BytesSize(float64(m.partSize))))

return external.MergeOverlappingFiles(
ctx,
err = external.MergeOverlappingFiles(
logutil.WithFields(ctx, zap.String("kv-group", sm.KVGroup), zap.Int64("subtask-id", subtask.ID)),
sm.DataFiles,
m.controller.GlobalSortStore,
m.partSize,
Expand All @@ -323,6 +324,15 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S
onClose,
m.taskMeta.Plan.ThreadCnt,
false)
logger.Info(
"merge sort finished",
zap.String("kv-group", sm.KVGroup),
zap.Uint64("total-kv-size", m.subtaskSortedKVMeta.TotalKVSize),
zap.Uint64("total-kv-count", m.subtaskSortedKVMeta.TotalKVCnt),
brlogutil.Key("start-key", m.subtaskSortedKVMeta.StartKey),
brlogutil.Key("end-key", m.subtaskSortedKVMeta.EndKey),
)
return err
}

func (m *mergeSortStepExecutor) OnFinished(_ context.Context, subtask *proto.Subtask) error {
Expand Down

0 comments on commit ba77807

Please sign in to comment.