From 85d11978aeef1c0370702a693dbe749d2830cf2c Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 22 Sep 2022 12:06:40 +0800 Subject: [PATCH 01/19] add log rate limit for table actor --- cdc/processor/pipeline/table_actor.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index cefe3797d0c..7ff82f3d0d6 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -38,6 +38,7 @@ import ( uberatomic "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" ) var ( @@ -417,16 +418,21 @@ func (t *tableActor) CheckpointTs() model.Ts { return t.sinkNode.CheckpointTs() } +// if the actor system is slow, too many warn log will be printed add rate limit +var updateBarrierTsLogRateLimiter = rate.NewLimiter(rate.Every(time.Millisecond*500), 1) + // UpdateBarrierTs updates the barrier ts in this table pipeline func (t *tableActor) UpdateBarrierTs(ts model.Ts) { msg := pmessage.BarrierMessage(ts) err := t.router.Send(t.actorID, message.ValueMessage(msg)) if err != nil { - log.Warn("send fails", - zap.Any("msg", msg), - zap.String("tableName", t.tableName), - zap.Int64("tableID", t.tableID), - zap.Error(err)) + if updateBarrierTsLogRateLimiter.Allow() { + log.Warn("send fails", + zap.Any("msg", msg), + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) + } } } From f9e91070314639f52c251885fe75a96f66508a25 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Wed, 28 Sep 2022 17:28:14 +0800 Subject: [PATCH 02/19] add rate limiter for resolve lock --- cdc/kv/region_worker.go | 31 +++++++++++++++++++------------ pkg/txnutil/lock_resolver.go | 2 +- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 550ba151246..7efe7cb0137 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -35,6 +35,7 @@ import ( "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" ) var ( @@ -298,6 +299,9 @@ type rtsUpdateEvent struct { resolvedTs uint64 } +// too many repeated logs when TiDB OOM +var resolveLockLogRateLimiter = rate.NewLimiter(rate.Every(time.Millisecond*500), 10) + func (w *regionWorker) resolveLock(ctx context.Context) error { // tikv resolved update interval is 1s, use half of the resolve lock interval // as lock penalty. @@ -376,22 +380,25 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { w.rtsManager.Insert(rts) continue } - log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock", - zap.String("namespace", w.session.client.changefeed.Namespace), - zap.String("changefeed", w.session.client.changefeed.ID), - zap.String("addr", w.storeAddr), - zap.Uint64("regionID", rts.regionID), - zap.Stringer("span", state.getRegionSpan()), - zap.Duration("duration", sinceLastResolvedTs), - zap.Duration("lastEvent", sinceLastEvent), - zap.Uint64("resolvedTs", lastResolvedTs), - ) + if resolveLockLogRateLimiter.Allow() { + log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.String("addr", w.storeAddr), + zap.Uint64("regionID", rts.regionID), + zap.Stringer("span", state.getRegionSpan()), + zap.Duration("duration", sinceLastResolvedTs), + zap.Duration("lastEvent", sinceLastEvent), + zap.Uint64("resolvedTs", lastResolvedTs), + ) + } err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion) if err != nil { log.Warn("failed to resolve lock", - zap.Uint64("regionID", rts.regionID), zap.Error(err), + zap.Uint64("regionID", rts.regionID), zap.String("namespace", w.session.client.changefeed.Namespace), - zap.String("changefeed", w.session.client.changefeed.ID)) + zap.String("changefeed", w.session.client.changefeed.ID), + zap.Error(err)) continue } rts.ts.penalty = 0 diff --git a/pkg/txnutil/lock_resolver.go b/pkg/txnutil/lock_resolver.go index c0b0aa49884..caa8bebed09 100644 --- a/pkg/txnutil/lock_resolver.go +++ b/pkg/txnutil/lock_resolver.go @@ -131,7 +131,7 @@ func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint } bo = tikv.NewGcResolveLockMaxBackoffer(ctx) } - log.Info("resolve lock successfully", + log.Debug("resolve lock successfully", zap.Uint64("regionID", regionID), zap.Int("lockCount", lockCount), zap.Uint64("maxVersion", maxVersion), From 19a00daded1de4d4b17804e0b92bf36327c48eba Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 29 Sep 2022 13:14:42 +0800 Subject: [PATCH 03/19] improve frontier --- cdc/kv/client.go | 12 ++-- cdc/kv/region_worker.go | 10 ++-- cdc/model/kv.go | 11 +++- cdc/puller/frontier/frontier.go | 42 ++++++++++---- cdc/puller/frontier/frontier_bench_test.go | 4 +- cdc/puller/frontier/frontier_test.go | 66 +++++++++++----------- cdc/puller/frontier/list.go | 16 ++++-- cdc/puller/frontier/list_test.go | 50 ++++++++-------- cdc/puller/puller.go | 56 +++++++++--------- 9 files changed, 153 insertions(+), 114 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 7f1ebdc6903..fac2d2f9a9b 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -878,10 +878,14 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { // After this resolved ts event is sent, we don't need to send one more // resolved ts event when the region starts to work. resolvedEv := model.RegionFeedEvent{ - Resolved: []*model.ResolvedSpan{{ - Span: sri.span, - ResolvedTs: sri.ts, - }}, + Resolved: &model.ResolvedSpan{ + Spans: []model.ResolvedComparableSpan{ + { + Span: sri.span, + Region: sri.verID.GetID(), + }, + }, + ResolvedTs: sri.ts}, } select { case s.eventCh <- resolvedEv: diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 7efe7cb0137..69ec6e60ab9 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -785,7 +785,7 @@ func (w *regionWorker) handleResolvedTs( revents *resolvedTsEvent, ) error { resolvedTs := revents.resolvedTs - resolvedSpans := make([]*model.ResolvedSpan, 0, len(revents.regions)) + resolvedSpans := make([]model.ResolvedComparableSpan, 0, len(revents.regions)) regions := make([]uint64, 0, len(revents.regions)) for _, state := range revents.regions { @@ -805,9 +805,9 @@ func (w *regionWorker) handleResolvedTs( continue } // emit a checkpointTs - resolvedSpans = append(resolvedSpans, &model.ResolvedSpan{ - Span: state.sri.span, - ResolvedTs: resolvedTs, + resolvedSpans = append(resolvedSpans, model.ResolvedComparableSpan{ + Span: state.sri.span, + Region: regionID, }) } if len(resolvedSpans) == 0 { @@ -828,7 +828,7 @@ func (w *regionWorker) handleResolvedTs( state.lock.Unlock() } // emit a checkpointTs - revent := model.RegionFeedEvent{Resolved: resolvedSpans} + revent := model.RegionFeedEvent{Resolved: &model.ResolvedSpan{ResolvedTs: resolvedTs, Spans: resolvedSpans}} select { case w.outputCh <- revent: w.metrics.metricSendEventResolvedCounter.Add(float64(len(resolvedSpans))) diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 00919cf6fb9..b3d673460dc 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -38,7 +38,7 @@ const ( //msgp:ignore RegionFeedEvent type RegionFeedEvent struct { Val *RawKVEntry - Resolved []*ResolvedSpan + Resolved *ResolvedSpan // Additional debug info, not used RegionID uint64 @@ -60,13 +60,18 @@ func (e *RegionFeedEvent) GetValue() interface{} { // //msgp:ignore ResolvedSpan type ResolvedSpan struct { - Span regionspan.ComparableSpan + Spans []ResolvedComparableSpan ResolvedTs uint64 } +type ResolvedComparableSpan struct { + Span regionspan.ComparableSpan + Region uint64 +} + // String implements fmt.Stringer interface. func (rs *ResolvedSpan) String() string { - return fmt.Sprintf("span: %s, resolved-ts: %d", rs.Span, rs.ResolvedTs) + return fmt.Sprintf("span: %v, resolved-ts: %d", rs.Spans, rs.ResolvedTs) } // RawKVEntry notify the KV operator diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index 08d51266397..a1cfa9a8269 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -24,7 +24,7 @@ import ( // Frontier checks resolved event of spans and moves the global resolved ts ahead type Frontier interface { - Forward(span regionspan.ComparableSpan, ts uint64) + Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) Frontier() uint64 String() string } @@ -33,6 +33,8 @@ type Frontier interface { type spanFrontier struct { spanList skipList minTsHeap fibonacciHeap + result []*skipListNode + nodes map[uint64]*skipListNode } // NewFrontier creates Frontier from the given spans. @@ -42,6 +44,8 @@ type spanFrontier struct { func NewFrontier(checkpointTs uint64, spans ...regionspan.ComparableSpan) Frontier { s := &spanFrontier{ spanList: *newSpanList(), + result: make(seekResult, maxHeight), + nodes: map[uint64]*skipListNode{}, } firstSpan := true for _, span := range spans { @@ -51,7 +55,7 @@ func NewFrontier(checkpointTs uint64, spans ...regionspan.ComparableSpan) Fronti firstSpan = false continue } - s.insert(span, checkpointTs) + s.insert(0, span, checkpointTs) } return s @@ -63,20 +67,35 @@ func (s *spanFrontier) Frontier() uint64 { } // Forward advances the timestamp for a span. -func (s *spanFrontier) Forward(span regionspan.ComparableSpan, ts uint64) { - s.insert(span, ts) +func (s *spanFrontier) Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) { + if n, ok := s.nodes[regionID]; ok { + if bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) { + s.minTsHeap.UpdateKey(n.Value(), ts) + return + } + } + s.insert(regionID, span, ts) } -func (s *spanFrontier) insert(span regionspan.ComparableSpan, ts uint64) { - seekRes := s.spanList.Seek(span.Start) +func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, ts uint64) { + for i := 0; i < len(s.result); i++ { + s.result[i] = nil + } + seekRes := s.spanList.Seek(span.Start, s.result) + for _, n := range seekRes { + if n == nil { + break + } + if n.regionID > 0 { + delete(s.nodes, n.regionID) + } + } // if there is no change in the region span // We just need to update the ts corresponding to the span in list next := seekRes.Node().Next() if next != nil { - cmpStart := bytes.Compare(seekRes.Node().Key(), span.Start) - cmpEnd := bytes.Compare(next.Key(), span.End) - if cmpStart == 0 && cmpEnd == 0 { + if bytes.Equal(seekRes.Node().Key(), span.Start) && bytes.Equal(next.Key(), span.End) { s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts) return } @@ -107,7 +126,10 @@ func (s *spanFrontier) insert(span regionspan.ComparableSpan, ts uint64) { } } if shouldInsertStartNode { - s.spanList.InsertNextToNode(seekRes, span.Start, s.minTsHeap.Insert(ts)) + n := s.spanList.InsertNextToNode(seekRes, span.Start, s.minTsHeap.Insert(ts)) + n.regionID = regionID + n.end = span.End + s.nodes[regionID] = n seekRes.Next() } s.spanList.InsertNextToNode(seekRes, span.End, s.minTsHeap.Insert(lastNodeTs)) diff --git a/cdc/puller/frontier/frontier_bench_test.go b/cdc/puller/frontier/frontier_bench_test.go index 158122129b1..33daca21436 100644 --- a/cdc/puller/frontier/frontier_bench_test.go +++ b/cdc/puller/frontier/frontier_bench_test.go @@ -54,7 +54,7 @@ func BenchmarkSpanFrontier(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - f.Forward(spans[i%n], uint64(i)) + f.Forward(0, spans[i%n], uint64(i)) } }) } @@ -96,7 +96,7 @@ func BenchmarkSpanFrontierOverlap(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - f.Forward(forward[i%n], uint64(i)) + f.Forward(0, forward[i%n], uint64(i)) } }) } diff --git a/cdc/puller/frontier/frontier_test.go b/cdc/puller/frontier/frontier_test.go index 6d753db2b67..d458b583a9b 100644 --- a/cdc/puller/frontier/frontier_test.go +++ b/cdc/puller/frontier/frontier_test.go @@ -44,7 +44,7 @@ func TestSpanFrontier(t *testing.T) { checkFrontier(t, f) f.Forward( - regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}, + 0, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}, 100, ) require.Equal(t, uint64(5), f.Frontier()) @@ -52,7 +52,7 @@ func TestSpanFrontier(t *testing.T) { checkFrontier(t, f) f.Forward( - regionspan.ComparableSpan{Start: []byte("g"), End: []byte("h")}, + 0, regionspan.ComparableSpan{Start: []byte("g"), End: []byte("h")}, 200, ) require.Equal(t, uint64(5), f.Frontier()) @@ -61,7 +61,7 @@ func TestSpanFrontier(t *testing.T) { // Forward the tracked span space. f.Forward( - regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, + 0, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 1, ) require.Equal(t, uint64(1), f.Frontier()) @@ -70,7 +70,7 @@ func TestSpanFrontier(t *testing.T) { // // Forward it again f.Forward( - regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, + 0, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 2, ) require.Equal(t, uint64(2), f.Frontier()) @@ -79,7 +79,7 @@ func TestSpanFrontier(t *testing.T) { // // Forward to smaller ts f.Forward( - regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, + 0, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 1, ) require.Equal(t, uint64(1), f.Frontier()) @@ -87,70 +87,70 @@ func TestSpanFrontier(t *testing.T) { checkFrontier(t, f) // // Forward b-c - f.Forward(spBC, 3) + f.Forward(0, spBC, 3) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, `[a @ 1] [b @ 3] [c @ 1] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward b-c more to be 4 - f.Forward(spBC, 4) + f.Forward(0, spBC, 4) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, `[a @ 1] [b @ 4] [c @ 1] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward all to at least 3 - f.Forward(spAD, 3) + f.Forward(0, spAD, 3) require.Equal(t, uint64(3), f.Frontier()) require.Equal(t, `[a @ 3] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward AB and CD to be 5, keep BC at 4 - f.Forward(spAB, 5) + f.Forward(0, spAB, 5) require.Equal(t, uint64(3), f.Frontier()) require.Equal(t, `[a @ 5] [b @ 3] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) - f.Forward(spCD, 5) + f.Forward(0, spCD, 5) require.Equal(t, uint64(3), f.Frontier()) require.Equal(t, `[a @ 5] [b @ 3] [c @ 5] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Catch BC to be 5 too - f.Forward(spBC, 5) + f.Forward(0, spBC, 5) require.Equal(t, uint64(5), f.Frontier()) require.Equal(t, `[a @ 5] [b @ 5] [c @ 5] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward all to be 6 - f.Forward(spAD, 6) + f.Forward(0, spAD, 6) require.Equal(t, uint64(6), f.Frontier()) require.Equal(t, `[a @ 6] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward ac to 7 - f.Forward(spAC, 7) + f.Forward(0, spAC, 7) require.Equal(t, uint64(6), f.Frontier()) require.Equal(t, `[a @ 7] [c @ 6] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward bd to 8 - f.Forward(spBD, 8) + f.Forward(0, spBD, 8) require.Equal(t, uint64(7), f.Frontier()) require.Equal(t, `[a @ 7] [b @ 8] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) // Forward ab to 8 - f.Forward(spAB, 8) + f.Forward(0, spAB, 8) require.Equal(t, uint64(8), f.Frontier()) require.Equal(t, `[a @ 8] [b @ 8] [d @ 100] [e @ Max] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) - f.Forward(regionspan.ComparableSpan{Start: []byte("1"), End: []byte("g")}, 9) + f.Forward(0, regionspan.ComparableSpan{Start: []byte("1"), End: []byte("g")}, 9) require.Equal(t, uint64(9), f.Frontier()) require.Equal(t, `[1 @ 9] [g @ 200] [h @ Max] `, f.String()) checkFrontier(t, f) - f.Forward(regionspan.ComparableSpan{Start: []byte("g"), End: []byte("i")}, 10) + f.Forward(0, regionspan.ComparableSpan{Start: []byte("g"), End: []byte("i")}, 10) require.Equal(t, uint64(9), f.Frontier()) require.Equal(t, `[1 @ 9] [g @ 10] [i @ Max] `, f.String()) checkFrontier(t, f) @@ -170,9 +170,9 @@ func TestSpanFrontierFallback(t *testing.T) { spDE := regionspan.ComparableSpan{Start: keyD, End: keyE} f := NewFrontier(20, spAB).(*spanFrontier) - f.Forward(spBC, 20) - f.Forward(spCD, 10) - f.Forward(spDE, 20) + f.Forward(0, spBC, 20) + f.Forward(0, spCD, 10) + f.Forward(0, spDE, 20) // [A, B) [B, C) [C, D) [D, E) // 20 20 10 20 @@ -187,7 +187,7 @@ func TestSpanFrontierFallback(t *testing.T) { // [A, B) [B, C) [C, D) [D, E) // 20 10 10 20 // [B, C) does not forward, because of merge into [A, C) immediately - f.Forward(spCD, 20) + f.Forward(0, spCD, 20) require.Equal(t, uint64(20), f.Frontier()) // the frontier stoes [A, B) and [B, C) but they are not correct exactly require.Equal(t, `[a @ 20] [b @ 20] [c @ 20] [d @ 20] [e @ Max] `, f.String()) @@ -216,22 +216,22 @@ func TestMinMax(t *testing.T) { require.Equal(t, "[ @ 0] [\xff\xff\xff\xff\xff @ Max] ", f.String()) checkFrontier(t, f) - f.Forward(spMinMax, 1) + f.Forward(0, spMinMax, 1) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, "[ @ 1] [\xff\xff\xff\xff\xff @ Max] ", f.String()) checkFrontier(t, f) - f.Forward(spMinMid, 2) + f.Forward(0, spMinMid, 2) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, "[ @ 2] [m @ 1] [\xff\xff\xff\xff\xff @ Max] ", f.String()) checkFrontier(t, f) - f.Forward(spMidMax, 2) + f.Forward(0, spMidMax, 2) require.Equal(t, uint64(2), f.Frontier()) require.Equal(t, "[ @ 2] [m @ 2] [\xff\xff\xff\xff\xff @ Max] ", f.String()) checkFrontier(t, f) - f.Forward(spMinMax, 3) + f.Forward(0, spMinMax, 3) require.Equal(t, uint64(3), f.Frontier()) require.Equal(t, "[ @ 3] [\xff\xff\xff\xff\xff @ Max] ", f.String()) checkFrontier(t, f) @@ -262,41 +262,41 @@ func TestSpanFrontierDisjoinSpans(t *testing.T) { checkFrontier(t, f) // Advance the tracked spans - f.Forward(spAB, 1) + f.Forward(0, spAB, 1) require.Equal(t, uint64(0), f.Frontier()) require.Equal(t, `[a @ 1] [b @ Max] [c @ 0] [e @ Max] `, f.String()) checkFrontier(t, f) - f.Forward(spCE, 1) + f.Forward(0, spCE, 1) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, `[a @ 1] [b @ Max] [c @ 1] [e @ Max] `, f.String()) checkFrontier(t, f) // Advance d-e split c-e to c-d and d-e - f.Forward(spDE, 2) + f.Forward(0, spDE, 2) require.Equal(t, uint64(1), f.Frontier()) require.Equal(t, `[a @ 1] [b @ Max] [c @ 1] [d @ 2] [e @ Max] `, f.String()) checkFrontier(t, f) // Advance a-d cover a-b and c-d - f.Forward(spAD, 3) + f.Forward(0, spAD, 3) require.Equal(t, uint64(2), f.Frontier()) require.Equal(t, `[a @ 3] [d @ 2] [e @ Max] `, f.String()) checkFrontier(t, f) // Advance one cover all 3 span - f.Forward(spAE, 4) + f.Forward(0, spAE, 4) require.Equal(t, uint64(4), f.Frontier()) require.Equal(t, `[a @ 4] [e @ Max] `, f.String()) checkFrontier(t, f) // Advance all with a larger span - f.Forward(sp1F, 5) + f.Forward(0, sp1F, 5) require.Equal(t, uint64(5), f.Frontier()) require.Equal(t, `[1 @ 5] [f @ Max] `, f.String()) checkFrontier(t, f) // Advance span smaller than all tracked spans - f.Forward(sp12, 6) + f.Forward(0, sp12, 6) require.Equal(t, uint64(5), f.Frontier()) require.Equal(t, `[1 @ 6] [2 @ 5] [f @ Max] `, f.String()) checkFrontier(t, f) @@ -328,7 +328,7 @@ func TestSpanFrontierRandomly(t *testing.T) { ts := rand.Uint64() - f.Forward(span, ts) + f.Forward(0, span, ts) checkFrontier(t, f) } } diff --git a/cdc/puller/frontier/list.go b/cdc/puller/frontier/list.go index 812c5778e1e..51e85f8f55a 100644 --- a/cdc/puller/frontier/list.go +++ b/cdc/puller/frontier/list.go @@ -31,6 +31,9 @@ type skipListNode struct { key []byte value *fibonacciHeapNode + end []byte + regionID uint64 + nexts []*skipListNode } @@ -39,6 +42,10 @@ func (s *skipListNode) Key() []byte { return s.key } +func (s *skipListNode) End() []byte { + return s.end +} + // Value is the value of the node func (s *skipListNode) Value() *fibonacciHeapNode { return s.value @@ -92,10 +99,10 @@ func fastrand() uint32 // Seek returns the seek result // the seek result is a slice of nodes, // Each element in the slice represents the nearest(left) node to the target value at each level of the skip list. -func (l *skipList) Seek(key []byte) seekResult { +func (l *skipList) Seek(key []byte, result []*skipListNode) seekResult { head := &l.head current := head - result := make(seekResult, maxHeight) + //result := make(seekResult, maxHeight) LevelLoop: for level := l.height - 1; level >= 0; level-- { @@ -124,7 +131,7 @@ LevelLoop: } // InsertNextToNode insert the specified node after the seek result -func (l *skipList) InsertNextToNode(seekR seekResult, key []byte, value *fibonacciHeapNode) { +func (l *skipList) InsertNextToNode(seekR seekResult, key []byte, value *fibonacciHeapNode) *skipListNode { if seekR.Node() != nil && !nextTo(seekR.Node(), key) { log.Panic("the InsertNextToNode function can only append node to the seek result.") } @@ -147,11 +154,12 @@ func (l *skipList) InsertNextToNode(seekR seekResult, key []byte, value *fibonac n.nexts[level] = prev.nexts[level] prev.nexts[level] = n } + return n } // Insert inserts the specified node func (l *skipList) Insert(key []byte, value *fibonacciHeapNode) { - seekR := l.Seek(key) + seekR := l.Seek(key, make(seekResult, maxHeight)) l.InsertNextToNode(seekR, key, value) } diff --git a/cdc/puller/frontier/list_test.go b/cdc/puller/frontier/list_test.go index e575ae2a725..39dfb996c5e 100644 --- a/cdc/puller/frontier/list_test.go +++ b/cdc/puller/frontier/list_test.go @@ -40,7 +40,7 @@ func TestInsertAndRemove(t *testing.T) { // check all the keys are exist in list for _, k := range keys { - a := list.Seek(k).Node().Key() + a := list.Seek(k, make(seekResult, maxHeight)).Node().Key() cmp := bytes.Compare(a, k) require.Equal(t, 0, cmp) } @@ -48,7 +48,7 @@ func TestInsertAndRemove(t *testing.T) { for i := 0; i < 10000; i++ { indexToRemove := rand.Intn(10000) - seekRes := list.Seek(keys[indexToRemove]) + seekRes := list.Seek(keys[indexToRemove], make(seekResult, maxHeight)) if seekRes.Node().Next() == nil { break } @@ -56,7 +56,7 @@ func TestInsertAndRemove(t *testing.T) { list.Remove(seekRes, seekRes.Node().Next()) // check the node is already removed - a := list.Seek(removedKey).Node().Key() + a := list.Seek(removedKey, make(seekResult, maxHeight)).Node().Key() cmp := bytes.Compare(a, removedKey) require.LessOrEqual(t, cmp, 0) } @@ -105,53 +105,53 @@ func TestSeek(t *testing.T) { list := newSpanList() - require.Nil(t, list.Seek(keyA).Node()) + require.Nil(t, list.Seek(keyA, make(seekResult, maxHeight)).Node()) // insert keyA to keyH insertIntoList(list, keyC, keyF, keyE, keyH, keyG, keyD, keyA, keyB) // Point to the first node, if seek key is smaller than the first key in list. - require.Nil(t, list.Seek(key1).Node().Key()) + require.Nil(t, list.Seek(key1, make(seekResult, maxHeight)).Node().Key()) // Point to the last node with key smaller than seek key. - require.Equal(t, keyH, list.Seek(keyH).Node().key) + require.Equal(t, keyH, list.Seek(keyH, make(seekResult, maxHeight)).Node().key) // Point to itself. - require.Equal(t, keyG, list.Seek(keyG).Node().key) + require.Equal(t, keyG, list.Seek(keyG, make(seekResult, maxHeight)).Node().key) // Ensure there is no problem to seek a larger key. - require.Equal(t, keyH, list.Seek(keyZ).Node().key) - - require.Equal(t, keyA, list.Seek([]byte("b0")).Node().key) - require.Equal(t, keyB, list.Seek([]byte("c0")).Node().key) - require.Equal(t, keyC, list.Seek([]byte("d0")).Node().key) - require.Equal(t, keyD, list.Seek([]byte("e0")).Node().key) - require.Equal(t, keyE, list.Seek([]byte("f0")).Node().key) - require.Equal(t, keyF, list.Seek([]byte("g0")).Node().key) - require.Equal(t, keyG, list.Seek([]byte("h0")).Node().key) - require.Equal(t, keyH, list.Seek([]byte("i0")).Node().key) + require.Equal(t, keyH, list.Seek(keyZ, make(seekResult, maxHeight)).Node().key) + + require.Equal(t, keyA, list.Seek([]byte("b0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyB, list.Seek([]byte("c0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyC, list.Seek([]byte("d0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyD, list.Seek([]byte("e0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyE, list.Seek([]byte("f0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyF, list.Seek([]byte("g0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyG, list.Seek([]byte("h0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyH, list.Seek([]byte("i0"), make(seekResult, maxHeight)).Node().key) require.Equal(t, "[a5] [b5] [c5] [d5] [e5] [f5] [g5] [h5] ", list.String()) checkList(t, list) // remove c5 - seekRes := list.Seek([]byte("c0")) + seekRes := list.Seek([]byte("c0"), make(seekResult, maxHeight)) list.Remove(seekRes, seekRes.Node().Next()) - require.Equal(t, keyB, list.Seek([]byte("c0")).Node().key) - require.Equal(t, keyB, list.Seek([]byte("d0")).Node().key) - require.Equal(t, keyD, list.Seek([]byte("e0")).Node().key) + require.Equal(t, keyB, list.Seek([]byte("c0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyB, list.Seek([]byte("d0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyD, list.Seek([]byte("e0"), make(seekResult, maxHeight)).Node().key) require.Equal(t, "[a5] [b5] [d5] [e5] [f5] [g5] [h5] ", list.String()) checkList(t, list) // remove d5 list.Remove(seekRes, seekRes.Node().Next()) - require.Equal(t, keyB, list.Seek([]byte("d0")).Node().key) - require.Equal(t, keyB, list.Seek([]byte("e0")).Node().key) - require.Equal(t, keyE, list.Seek([]byte("f0")).Node().key) + require.Equal(t, keyB, list.Seek([]byte("d0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyB, list.Seek([]byte("e0"), make(seekResult, maxHeight)).Node().key) + require.Equal(t, keyE, list.Seek([]byte("f0"), make(seekResult, maxHeight)).Node().key) require.Equal(t, "[a5] [b5] [e5] [f5] [g5] [h5] ", list.String()) checkList(t, list) // remove the first node - seekRes = list.Seek([]byte("10")) + seekRes = list.Seek([]byte("10"), make(seekResult, maxHeight)) list.Remove(seekRes, seekRes.Node().Next()) require.Equal(t, "[b5] [e5] [f5] [g5] [h5] ", list.String()) checkList(t, list) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 60951313211..1b2c6bade18 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -195,8 +195,8 @@ func (p *pullerImpl) Run(ctx context.Context) error { } if e.Resolved != nil { - metricTxnCollectCounterResolved.Add(float64(len(e.Resolved))) - for _, resolvedSpan := range e.Resolved { + metricTxnCollectCounterResolved.Add(float64(len(e.Resolved.Spans))) + for _, resolvedSpan := range e.Resolved.Spans { if !regionspan.IsSubSpan(resolvedSpan.Span, p.spans...) { log.Panic("the resolved span is not in the total span", zap.String("namespace", p.changefeed.Namespace), @@ -208,34 +208,34 @@ func (p *pullerImpl) Run(ctx context.Context) error { ) } // Forward is called in a single thread - p.tsTracker.Forward(resolvedSpan.Span, resolvedSpan.ResolvedTs) - resolvedTs := p.tsTracker.Frontier() - if resolvedTs > 0 && !initialized { - initialized = true - - spans := make([]string, 0, len(p.spans)) - for i := range p.spans { - spans = append(spans, p.spans[i].String()) - } - log.Info("puller is initialized", - zap.String("namespace", p.changefeed.Namespace), - zap.String("changefeed", p.changefeed.ID), - zap.Int64("tableID", p.tableID), - zap.String("tableName", p.tableName), - zap.Uint64("resolvedTs", resolvedTs), - zap.Duration("duration", time.Since(start)), - zap.Strings("spans", spans)) - } - if !initialized || resolvedTs == lastResolvedTs { - continue - } - lastResolvedTs = resolvedTs - err := output(&model.RawKVEntry{CRTs: resolvedTs, OpType: model.OpTypeResolved, RegionID: e.RegionID}) - if err != nil { - return errors.Trace(err) + p.tsTracker.Forward(resolvedSpan.Region, resolvedSpan.Span, e.Resolved.ResolvedTs) + } + resolvedTs := p.tsTracker.Frontier() + if resolvedTs > 0 && !initialized { + initialized = true + + spans := make([]string, 0, len(p.spans)) + for i := range p.spans { + spans = append(spans, p.spans[i].String()) } - atomic.StoreUint64(&p.resolvedTs, resolvedTs) + log.Info("puller is initialized", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.Int64("tableID", p.tableID), + zap.String("tableName", p.tableName), + zap.Uint64("resolvedTs", resolvedTs), + zap.Duration("duration", time.Since(start)), + zap.Strings("spans", spans)) + } + if !initialized || resolvedTs == lastResolvedTs { + continue + } + lastResolvedTs = resolvedTs + err := output(&model.RawKVEntry{CRTs: resolvedTs, OpType: model.OpTypeResolved, RegionID: e.RegionID}) + if err != nil { + return errors.Trace(err) } + atomic.StoreUint64(&p.resolvedTs, resolvedTs) } } }) From c33dc39641881ea4fd15d16f999ef0f806951523 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 29 Sep 2022 13:23:53 +0800 Subject: [PATCH 04/19] improve frontier --- cdc/puller/frontier/frontier.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index a1cfa9a8269..1d43e27ea7a 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -68,7 +68,7 @@ func (s *spanFrontier) Frontier() uint64 { // Forward advances the timestamp for a span. func (s *spanFrontier) Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) { - if n, ok := s.nodes[regionID]; ok { + if n, ok := s.nodes[regionID]; ok && n.regionID > 0 && n.end != nil { if bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) { s.minTsHeap.UpdateKey(n.Value(), ts) return @@ -82,15 +82,6 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t s.result[i] = nil } seekRes := s.spanList.Seek(span.Start, s.result) - for _, n := range seekRes { - if n == nil { - break - } - if n.regionID > 0 { - delete(s.nodes, n.regionID) - } - } - // if there is no change in the region span // We just need to update the ts corresponding to the span in list next := seekRes.Node().Next() @@ -100,6 +91,14 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t return } } + for _, n := range seekRes { + if n == nil { + break + } + if n.regionID > 0 { + delete(s.nodes, n.regionID) + } + } // regions are merged or split, overwrite span into list node := seekRes.Node() From 9ef6307f2fb38eb92e2975fd21931480e9556fb0 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 29 Sep 2022 13:57:18 +0800 Subject: [PATCH 05/19] improve frontier --- cdc/puller/frontier/frontier.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index 1d43e27ea7a..085f880ed14 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -88,6 +88,8 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t if next != nil { if bytes.Equal(seekRes.Node().Key(), span.Start) && bytes.Equal(next.Key(), span.End) { s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts) + s.nodes[regionID] = seekRes.Node() + s.nodes[regionID].regionID = regionID return } } From 69a013cfc0cf696053b348120ea2a9df571ecc98 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 29 Sep 2022 14:51:56 +0800 Subject: [PATCH 06/19] add metrics --- cdc/puller/frontier/frontier.go | 16 +++++++++++----- cdc/puller/metrics.go | 8 ++++++++ cdc/puller/puller.go | 8 +++++++- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index 085f880ed14..ef570bb98ec 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/pingcap/tiflow/pkg/regionspan" + "github.com/prometheus/client_golang/prometheus" ) // Frontier checks resolved event of spans and moves the global resolved ts ahead @@ -31,21 +32,24 @@ type Frontier interface { // spanFrontier tracks the minimum timestamp of a set of spans. type spanFrontier struct { - spanList skipList - minTsHeap fibonacciHeap - result []*skipListNode - nodes map[uint64]*skipListNode + spanList skipList + minTsHeap fibonacciHeap + result []*skipListNode + nodes map[uint64]*skipListNode + metricResolvedRegionCachedCounterResolved prometheus.Counter } // NewFrontier creates Frontier from the given spans. // spanFrontier don't support use Nil as the maximum key of End range // So we use set it as util.UpperBoundKey, the means the real use case *should not* have an // End key bigger than util.UpperBoundKey -func NewFrontier(checkpointTs uint64, spans ...regionspan.ComparableSpan) Frontier { +func NewFrontier(checkpointTs uint64, metricResolvedRegionCachedCounterResolved prometheus.Counter, spans ...regionspan.ComparableSpan) Frontier { s := &spanFrontier{ spanList: *newSpanList(), result: make(seekResult, maxHeight), nodes: map[uint64]*skipListNode{}, + + metricResolvedRegionCachedCounterResolved: metricResolvedRegionCachedCounterResolved, } firstSpan := true for _, span := range spans { @@ -71,6 +75,7 @@ func (s *spanFrontier) Forward(regionID uint64, span regionspan.ComparableSpan, if n, ok := s.nodes[regionID]; ok && n.regionID > 0 && n.end != nil { if bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) { s.minTsHeap.UpdateKey(n.Value(), ts) + s.metricResolvedRegionCachedCounterResolved.Inc() return } } @@ -90,6 +95,7 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts) s.nodes[regionID] = seekRes.Node() s.nodes[regionID].regionID = regionID + s.nodes[regionID].end = next.key return } } diff --git a/cdc/puller/metrics.go b/cdc/puller/metrics.go index b05e039312a..dd48121384b 100644 --- a/cdc/puller/metrics.go +++ b/cdc/puller/metrics.go @@ -25,6 +25,13 @@ var ( Name: "txn_collect_event_count", Help: "The number of events received from txn collector", }, []string{"namespace", "changefeed", "type"}) + regionCollectCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "puller", + Name: "region_resolved_cached_count", + Help: "The number of events received from txn collector", + }, []string{"namespace", "changefeed", "type"}) pullerResolvedTsGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -67,6 +74,7 @@ var ( // InitMetrics registers all metrics in this file func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(txnCollectCounter) + registry.MustRegister(regionCollectCounter) registry.MustRegister(pullerResolvedTsGauge) registry.MustRegister(memBufferSizeGauge) registry.MustRegister(outputChanSizeHistogram) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 1b2c6bade18..04008d8a1f1 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -88,7 +88,13 @@ func New(ctx context.Context, // To make puller level resolved ts initialization distinguishable, we set // the initial ts for frontier to 0. Once the puller level resolved ts // initialized, the ts should advance to a non-zero value. - tsTracker := frontier.NewFrontier(0, comparableSpans...) + pullerType := "dml" + if len(spans) > 1 { + pullerType = "ddl" + } + metricTxnCollectCounterResolved := regionCollectCounter. + WithLabelValues(changefeed.Namespace, changefeed.ID, pullerType) + tsTracker := frontier.NewFrontier(0, metricTxnCollectCounterResolved, comparableSpans...) kvCli := kv.NewCDCKVClient( ctx, pdCli, grpcPool, regionCache, pdClock, cfg, changefeed, tableID, tableName) p := &pullerImpl{ From d8376204889bc6d871457fa3118ff56ca0ad257e Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 29 Sep 2022 16:27:32 +0800 Subject: [PATCH 07/19] add ut --- cdc/puller/frontier/frontier.go | 10 ++-- cdc/puller/frontier/frontier_bench_test.go | 4 +- cdc/puller/frontier/frontier_test.go | 53 ++++++++++++++++++++-- 3 files changed, 55 insertions(+), 12 deletions(-) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index ef570bb98ec..13b0cc5410e 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -133,11 +133,11 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t } } if shouldInsertStartNode { - n := s.spanList.InsertNextToNode(seekRes, span.Start, s.minTsHeap.Insert(ts)) - n.regionID = regionID - n.end = span.End - s.nodes[regionID] = n - seekRes.Next() + s.spanList.InsertNextToNode(seekRes, span.Start, s.minTsHeap.Insert(ts)) + //n.regionID = regionID + //n.end = span.End + //s.nodes[regionID] = n + //seekRes.Next() } s.spanList.InsertNextToNode(seekRes, span.End, s.minTsHeap.Insert(lastNodeTs)) } diff --git a/cdc/puller/frontier/frontier_bench_test.go b/cdc/puller/frontier/frontier_bench_test.go index 33daca21436..4aa69965d02 100644 --- a/cdc/puller/frontier/frontier_bench_test.go +++ b/cdc/puller/frontier/frontier_bench_test.go @@ -49,7 +49,7 @@ func BenchmarkSpanFrontier(b *testing.B) { spans = append(spans, span) } - f := NewFrontier(0, spans...) + f := NewFrontier(0, c, spans...) b.ResetTimer() @@ -91,7 +91,7 @@ func BenchmarkSpanFrontierOverlap(b *testing.B) { }) } - f := NewFrontier(0, spans...) + f := NewFrontier(0, c, spans...) b.ResetTimer() diff --git a/cdc/puller/frontier/frontier_test.go b/cdc/puller/frontier/frontier_test.go index d458b583a9b..3b24cb20892 100644 --- a/cdc/puller/frontier/frontier_test.go +++ b/cdc/puller/frontier/frontier_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/pingcap/tiflow/pkg/regionspan" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -37,7 +38,7 @@ func TestSpanFrontier(t *testing.T) { spBD := regionspan.ComparableSpan{Start: keyB, End: keyD} spCD := regionspan.ComparableSpan{Start: keyC, End: keyD} - f := NewFrontier(5, spAD).(*spanFrontier) + f := NewFrontier(5, c, spAD).(*spanFrontier) require.Equal(t, uint64(5), f.Frontier()) require.Equal(t, `[a @ 5] [d @ Max] `, f.String()) @@ -169,7 +170,7 @@ func TestSpanFrontierFallback(t *testing.T) { spCD := regionspan.ComparableSpan{Start: keyC, End: keyD} spDE := regionspan.ComparableSpan{Start: keyD, End: keyE} - f := NewFrontier(20, spAB).(*spanFrontier) + f := NewFrontier(20, c, spAB).(*spanFrontier) f.Forward(0, spBC, 20) f.Forward(0, spCD, 10) f.Forward(0, spDE, 20) @@ -211,7 +212,7 @@ func TestMinMax(t *testing.T) { spMinMax := regionspan.ComparableSpan{Start: keyMin, End: keyMax} spMinMax = spMinMax.Hack() - f := NewFrontier(0, spMinMax) + f := NewFrontier(0, c, spMinMax) require.Equal(t, uint64(0), f.Frontier()) require.Equal(t, "[ @ 0] [\xff\xff\xff\xff\xff @ Max] ", f.String()) checkFrontier(t, f) @@ -256,7 +257,7 @@ func TestSpanFrontierDisjoinSpans(t *testing.T) { sp12 := regionspan.ComparableSpan{Start: key1, End: key2} sp1F := regionspan.ComparableSpan{Start: key1, End: keyF} - f := NewFrontier(0, spAB, spCE) + f := NewFrontier(0, c, spAB, spCE) require.Equal(t, uint64(0), f.Frontier()) require.Equal(t, `[a @ 0] [b @ Max] [c @ 0] [e @ Max] `, f.String()) checkFrontier(t, f) @@ -302,12 +303,14 @@ func TestSpanFrontierDisjoinSpans(t *testing.T) { checkFrontier(t, f) } +var c = prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"}).WithLabelValues("a") + func TestSpanFrontierRandomly(t *testing.T) { t.Parallel() var keyMin []byte var keyMax []byte spMinMax := regionspan.ComparableSpan{Start: keyMin, End: keyMax} - f := NewFrontier(0, spMinMax) + f := NewFrontier(0, c, spMinMax) var spans []regionspan.ComparableSpan for len(spans) < 500000 { @@ -350,3 +353,43 @@ func checkFrontier(t *testing.T, f Frontier) { require.Equal(t, tsInHeap, tsInList) require.Equal(t, tsInList[0], f.Frontier()) } + +func TestMinMaxWithRegion(t *testing.T) { + t.Parallel() + + ab := regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")} + bc := regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")} + cd := regionspan.ComparableSpan{Start: []byte("c"), End: []byte("d")} + de := regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")} + ef := regionspan.ComparableSpan{Start: []byte("e"), End: []byte("f")} + af := regionspan.ComparableSpan{Start: []byte("a"), End: []byte("f")} + + f := NewFrontier(0, c, af) + require.Equal(t, uint64(0), f.Frontier()) + f.Forward(1, ab, 1) + require.Equal(t, uint64(0), f.Frontier()) + f.Forward(2, bc, 1) + require.Equal(t, uint64(0), f.Frontier()) + f.Forward(3, cd, 1) + require.Equal(t, uint64(0), f.Frontier()) + f.Forward(4, de, 1) + require.Equal(t, uint64(0), f.Frontier()) + f.Forward(5, ef, 1) + require.Equal(t, uint64(1), f.Frontier()) + f.Forward(6, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 6) + require.Equal(t, uint64(1), f.Frontier()) + f.Forward(7, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("f")}, 2) + require.Equal(t, uint64(2), f.Frontier()) + f.Forward(7, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("f")}, 3) + require.Equal(t, uint64(3), f.Frontier()) + f.Forward(7, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("f")}, 4) + require.Equal(t, uint64(4), f.Frontier()) + f.Forward(8, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}, 4) + require.Equal(t, uint64(4), f.Frontier()) + f.Forward(9, regionspan.ComparableSpan{Start: []byte("e"), End: []byte("f")}, 4) + require.Equal(t, uint64(4), f.Frontier()) + f.Forward(9, regionspan.ComparableSpan{Start: []byte("e"), End: []byte("f")}, 7) + require.Equal(t, uint64(4), f.Frontier()) + f.Forward(8, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}, 5) + require.Equal(t, uint64(5), f.Frontier()) +} From 39e2938d09ccb81a0a31c4fe8cb535f6b0b1e38f Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 29 Sep 2022 16:34:49 +0800 Subject: [PATCH 08/19] add ut --- cdc/puller/frontier/frontier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index 13b0cc5410e..e341ccdfcdf 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -73,7 +73,7 @@ func (s *spanFrontier) Frontier() uint64 { // Forward advances the timestamp for a span. func (s *spanFrontier) Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) { if n, ok := s.nodes[regionID]; ok && n.regionID > 0 && n.end != nil { - if bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) { + if regionID == n.regionID && bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) { s.minTsHeap.UpdateKey(n.Value(), ts) s.metricResolvedRegionCachedCounterResolved.Inc() return From 253c3c0a5c52958b13616486e7fe4ca2b4ba0edd Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 29 Sep 2022 17:08:29 +0800 Subject: [PATCH 09/19] add ut --- cdc/puller/frontier/frontier.go | 14 ++------------ cdc/puller/frontier/list.go | 3 +-- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index e341ccdfcdf..34def96afa7 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -99,23 +99,17 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t return } } - for _, n := range seekRes { - if n == nil { - break - } - if n.regionID > 0 { - delete(s.nodes, n.regionID) - } - } // regions are merged or split, overwrite span into list node := seekRes.Node() + delete(s.nodes, node.regionID) lastNodeTs := uint64(math.MaxUint64) shouldInsertStartNode := true if node.Value() != nil { lastNodeTs = node.Value().key } for ; node != nil; node = node.Next() { + delete(s.nodes, node.regionID) cmpStart := bytes.Compare(node.Key(), span.Start) if cmpStart < 0 { continue @@ -134,10 +128,6 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t } if shouldInsertStartNode { s.spanList.InsertNextToNode(seekRes, span.Start, s.minTsHeap.Insert(ts)) - //n.regionID = regionID - //n.end = span.End - //s.nodes[regionID] = n - //seekRes.Next() } s.spanList.InsertNextToNode(seekRes, span.End, s.minTsHeap.Insert(lastNodeTs)) } diff --git a/cdc/puller/frontier/list.go b/cdc/puller/frontier/list.go index 51e85f8f55a..679fd9d1d30 100644 --- a/cdc/puller/frontier/list.go +++ b/cdc/puller/frontier/list.go @@ -131,7 +131,7 @@ LevelLoop: } // InsertNextToNode insert the specified node after the seek result -func (l *skipList) InsertNextToNode(seekR seekResult, key []byte, value *fibonacciHeapNode) *skipListNode { +func (l *skipList) InsertNextToNode(seekR seekResult, key []byte, value *fibonacciHeapNode) { if seekR.Node() != nil && !nextTo(seekR.Node(), key) { log.Panic("the InsertNextToNode function can only append node to the seek result.") } @@ -154,7 +154,6 @@ func (l *skipList) InsertNextToNode(seekR seekResult, key []byte, value *fibonac n.nexts[level] = prev.nexts[level] prev.nexts[level] = n } - return n } // Insert inserts the specified node From 89338c25fc6218f4265f124c5e5d83046ac65021 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 29 Sep 2022 17:12:40 +0800 Subject: [PATCH 10/19] add ut --- cdc/puller/frontier/frontier.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index 34def96afa7..3e82463a33e 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -128,6 +128,7 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t } if shouldInsertStartNode { s.spanList.InsertNextToNode(seekRes, span.Start, s.minTsHeap.Insert(ts)) + seekRes.Next() } s.spanList.InsertNextToNode(seekRes, span.End, s.minTsHeap.Insert(lastNodeTs)) } From 0a0e268e7c6519d4f6d938092296f3542e1feb61 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 29 Sep 2022 17:17:36 +0800 Subject: [PATCH 11/19] add ut --- cdc/puller/frontier/frontier.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index 3e82463a33e..1ba71d700a6 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -93,9 +93,11 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t if next != nil { if bytes.Equal(seekRes.Node().Key(), span.Start) && bytes.Equal(next.Key(), span.End) { s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts) - s.nodes[regionID] = seekRes.Node() - s.nodes[regionID].regionID = regionID - s.nodes[regionID].end = next.key + if regionID > 0 { + s.nodes[regionID] = seekRes.Node() + s.nodes[regionID].regionID = regionID + s.nodes[regionID].end = next.key + } return } } From 02ec537ff938efcf61d093dd4e44568973cc24c2 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Fri, 30 Sep 2022 17:18:36 +0800 Subject: [PATCH 12/19] fix lint --- cdc/kv/client.go | 5 +- cdc/kv/region_worker.go | 4 +- cdc/model/kv.go | 13 +-- cdc/model/kv_gen.go | 132 +++++++++++++++++++++++++++ cdc/model/kv_gen_test.go | 113 +++++++++++++++++++++++ cdc/puller/frontier/frontier.go | 48 ++++++---- cdc/puller/frontier/frontier_test.go | 2 +- cdc/puller/frontier/list.go | 13 ++- cdc/puller/metrics.go | 6 +- cdc/puller/puller.go | 4 +- pkg/txnutil/lock_resolver.go | 2 +- 11 files changed, 298 insertions(+), 44 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index fac2d2f9a9b..c4faa3e6b17 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -879,13 +879,14 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { // resolved ts event when the region starts to work. resolvedEv := model.RegionFeedEvent{ Resolved: &model.ResolvedSpan{ - Spans: []model.ResolvedComparableSpan{ + Spans: []model.RegionComparableSpan{ { Span: sri.span, Region: sri.verID.GetID(), }, }, - ResolvedTs: sri.ts}, + ResolvedTs: sri.ts, + }, } select { case s.eventCh <- resolvedEv: diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 7afc5bf136e..0ed86f97b82 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -785,7 +785,7 @@ func (w *regionWorker) handleResolvedTs( revents *resolvedTsEvent, ) error { resolvedTs := revents.resolvedTs - resolvedSpans := make([]model.ResolvedComparableSpan, 0, len(revents.regions)) + resolvedSpans := make([]model.RegionComparableSpan, 0, len(revents.regions)) regions := make([]uint64, 0, len(revents.regions)) for _, state := range revents.regions { @@ -805,7 +805,7 @@ func (w *regionWorker) handleResolvedTs( continue } // emit a checkpointTs - resolvedSpans = append(resolvedSpans, model.ResolvedComparableSpan{ + resolvedSpans = append(resolvedSpans, model.RegionComparableSpan{ Span: state.sri.span, Region: regionID, }) diff --git a/cdc/model/kv.go b/cdc/model/kv.go index b3d673460dc..59186b7706e 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -60,20 +60,21 @@ func (e *RegionFeedEvent) GetValue() interface{} { // //msgp:ignore ResolvedSpan type ResolvedSpan struct { - Spans []ResolvedComparableSpan + Spans []RegionComparableSpan ResolvedTs uint64 } -type ResolvedComparableSpan struct { - Span regionspan.ComparableSpan - Region uint64 -} - // String implements fmt.Stringer interface. func (rs *ResolvedSpan) String() string { return fmt.Sprintf("span: %v, resolved-ts: %d", rs.Spans, rs.ResolvedTs) } +// RegionComparableSpan contains a comparable span and a region id of that span +type RegionComparableSpan struct { + Span regionspan.ComparableSpan + Region uint64 +} + // RawKVEntry notify the KV operator type RawKVEntry struct { OpType OpType `msg:"op_type"` diff --git a/cdc/model/kv_gen.go b/cdc/model/kv_gen.go index d78e50d3604..cf0600e2f32 100644 --- a/cdc/model/kv_gen.go +++ b/cdc/model/kv_gen.go @@ -318,3 +318,135 @@ func (z *RawKVEntry) Msgsize() (s int) { s = 1 + 8 + msgp.IntSize + 4 + msgp.BytesPrefixSize + len(z.Key) + 6 + msgp.BytesPrefixSize + len(z.Value) + 10 + msgp.BytesPrefixSize + len(z.OldValue) + 9 + msgp.Uint64Size + 5 + msgp.Uint64Size + 10 + msgp.Uint64Size return } + +// DecodeMsg implements msgp.Decodable +func (z *RegionComparableSpan) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Span": + err = z.Span.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Span") + return + } + case "Region": + z.Region, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Region") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *RegionComparableSpan) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "Span" + err = en.Append(0x82, 0xa4, 0x53, 0x70, 0x61, 0x6e) + if err != nil { + return + } + err = z.Span.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Span") + return + } + // write "Region" + err = en.Append(0xa6, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteUint64(z.Region) + if err != nil { + err = msgp.WrapError(err, "Region") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *RegionComparableSpan) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "Span" + o = append(o, 0x82, 0xa4, 0x53, 0x70, 0x61, 0x6e) + o, err = z.Span.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Span") + return + } + // string "Region" + o = append(o, 0xa6, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e) + o = msgp.AppendUint64(o, z.Region) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *RegionComparableSpan) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Span": + bts, err = z.Span.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Span") + return + } + case "Region": + z.Region, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Region") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *RegionComparableSpan) Msgsize() (s int) { + s = 1 + 5 + z.Span.Msgsize() + 7 + msgp.Uint64Size + return +} diff --git a/cdc/model/kv_gen_test.go b/cdc/model/kv_gen_test.go index dbf9aefd376..e59512c360c 100644 --- a/cdc/model/kv_gen_test.go +++ b/cdc/model/kv_gen_test.go @@ -121,3 +121,116 @@ func BenchmarkDecodeRawKVEntry(b *testing.B) { } } } + +func TestMarshalUnmarshalRegionComparableSpan(t *testing.T) { + v := RegionComparableSpan{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgRegionComparableSpan(b *testing.B) { + v := RegionComparableSpan{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgRegionComparableSpan(b *testing.B) { + v := RegionComparableSpan{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalRegionComparableSpan(b *testing.B) { + v := RegionComparableSpan{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeRegionComparableSpan(t *testing.T) { + v := RegionComparableSpan{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeRegionComparableSpan Msgsize() is inaccurate") + } + + vn := RegionComparableSpan{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeRegionComparableSpan(b *testing.B) { + v := RegionComparableSpan{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeRegionComparableSpan(b *testing.B) { + v := RegionComparableSpan{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index 1ba71d700a6..dc3144af2f0 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -32,24 +32,29 @@ type Frontier interface { // spanFrontier tracks the minimum timestamp of a set of spans. type spanFrontier struct { - spanList skipList - minTsHeap fibonacciHeap - result []*skipListNode - nodes map[uint64]*skipListNode - metricResolvedRegionCachedCounterResolved prometheus.Counter + spanList skipList + minTsHeap fibonacciHeap + + seekTempResult []*skipListNode + + cachedRegions map[uint64]*skipListNode + metricResolvedRegionCachedCounter prometheus.Counter } // NewFrontier creates Frontier from the given spans. // spanFrontier don't support use Nil as the maximum key of End range // So we use set it as util.UpperBoundKey, the means the real use case *should not* have an // End key bigger than util.UpperBoundKey -func NewFrontier(checkpointTs uint64, metricResolvedRegionCachedCounterResolved prometheus.Counter, spans ...regionspan.ComparableSpan) Frontier { +func NewFrontier(checkpointTs uint64, + metricResolvedRegionCachedCounter prometheus.Counter, + spans ...regionspan.ComparableSpan, +) Frontier { s := &spanFrontier{ - spanList: *newSpanList(), - result: make(seekResult, maxHeight), - nodes: map[uint64]*skipListNode{}, + spanList: *newSpanList(), + seekTempResult: make(seekResult, maxHeight), + cachedRegions: map[uint64]*skipListNode{}, - metricResolvedRegionCachedCounterResolved: metricResolvedRegionCachedCounterResolved, + metricResolvedRegionCachedCounter: metricResolvedRegionCachedCounter, } firstSpan := true for _, span := range spans { @@ -72,10 +77,12 @@ func (s *spanFrontier) Frontier() uint64 { // Forward advances the timestamp for a span. func (s *spanFrontier) Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) { - if n, ok := s.nodes[regionID]; ok && n.regionID > 0 && n.end != nil { + // it's the fast part to detect if the region is split or merged, + // if not we can update the minTsHeap with use new ts directly + if n, ok := s.cachedRegions[regionID]; ok && n.regionID > 0 && n.end != nil { if regionID == n.regionID && bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) { s.minTsHeap.UpdateKey(n.Value(), ts) - s.metricResolvedRegionCachedCounterResolved.Inc() + s.metricResolvedRegionCachedCounter.Inc() return } } @@ -83,10 +90,11 @@ func (s *spanFrontier) Forward(regionID uint64, span regionspan.ComparableSpan, } func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, ts uint64) { - for i := 0; i < len(s.result); i++ { - s.result[i] = nil + // clear the seek result + for i := 0; i < len(s.seekTempResult); i++ { + s.seekTempResult[i] = nil } - seekRes := s.spanList.Seek(span.Start, s.result) + seekRes := s.spanList.Seek(span.Start, s.seekTempResult) // if there is no change in the region span // We just need to update the ts corresponding to the span in list next := seekRes.Node().Next() @@ -94,9 +102,9 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t if bytes.Equal(seekRes.Node().Key(), span.Start) && bytes.Equal(next.Key(), span.End) { s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts) if regionID > 0 { - s.nodes[regionID] = seekRes.Node() - s.nodes[regionID].regionID = regionID - s.nodes[regionID].end = next.key + s.cachedRegions[regionID] = seekRes.Node() + s.cachedRegions[regionID].regionID = regionID + s.cachedRegions[regionID].end = next.key } return } @@ -104,14 +112,14 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t // regions are merged or split, overwrite span into list node := seekRes.Node() - delete(s.nodes, node.regionID) + delete(s.cachedRegions, node.regionID) lastNodeTs := uint64(math.MaxUint64) shouldInsertStartNode := true if node.Value() != nil { lastNodeTs = node.Value().key } for ; node != nil; node = node.Next() { - delete(s.nodes, node.regionID) + delete(s.cachedRegions, node.regionID) cmpStart := bytes.Compare(node.Key(), span.Start) if cmpStart < 0 { continue diff --git a/cdc/puller/frontier/frontier_test.go b/cdc/puller/frontier/frontier_test.go index 3b24cb20892..70f8903915b 100644 --- a/cdc/puller/frontier/frontier_test.go +++ b/cdc/puller/frontier/frontier_test.go @@ -354,7 +354,7 @@ func checkFrontier(t *testing.T, f Frontier) { require.Equal(t, tsInList[0], f.Frontier()) } -func TestMinMaxWithRegion(t *testing.T) { +func TestMinMaxWithRegionSplitMerge(t *testing.T) { t.Parallel() ab := regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")} diff --git a/cdc/puller/frontier/list.go b/cdc/puller/frontier/list.go index 679fd9d1d30..ce8c0713f99 100644 --- a/cdc/puller/frontier/list.go +++ b/cdc/puller/frontier/list.go @@ -58,7 +58,7 @@ func (s *skipListNode) Next() *skipListNode { type seekResult []*skipListNode -// Next points to the next seek result +// Next points to the next seek seekTempResult func (s seekResult) Next() { next := s.Node().Next() for i := range next.nexts { @@ -66,7 +66,7 @@ func (s seekResult) Next() { } } -// Node returns the node point by the seek result +// Node returns the node point by the seek seekTempResult func (s seekResult) Node() *skipListNode { if len(s) == 0 { return nil @@ -96,13 +96,12 @@ func (l *skipList) randomHeight() int { //go:linkname fastrand runtime.fastrand func fastrand() uint32 -// Seek returns the seek result -// the seek result is a slice of nodes, +// Seek returns the seek seekTempResult +// the seek seekTempResult is a slice of nodes, // Each element in the slice represents the nearest(left) node to the target value at each level of the skip list. func (l *skipList) Seek(key []byte, result []*skipListNode) seekResult { head := &l.head current := head - //result := make(seekResult, maxHeight) LevelLoop: for level := l.height - 1; level >= 0; level-- { @@ -130,7 +129,7 @@ LevelLoop: return result } -// InsertNextToNode insert the specified node after the seek result +// InsertNextToNode insert the specified node after the seek seekTempResult func (l *skipList) InsertNextToNode(seekR seekResult, key []byte, value *fibonacciHeapNode) { if seekR.Node() != nil && !nextTo(seekR.Node(), key) { log.Panic("the InsertNextToNode function can only append node to the seek result.") @@ -162,7 +161,7 @@ func (l *skipList) Insert(key []byte, value *fibonacciHeapNode) { l.InsertNextToNode(seekR, key, value) } -// Remove removes the specified node after the seek result +// Remove removes the specified node after the seek seekTempResult func (l *skipList) Remove(seekR seekResult, toRemove *skipListNode) { seekCurrent := seekR.Node() if seekCurrent == nil || seekCurrent.Next() != toRemove { diff --git a/cdc/puller/metrics.go b/cdc/puller/metrics.go index dd48121384b..20502bf472f 100644 --- a/cdc/puller/metrics.go +++ b/cdc/puller/metrics.go @@ -25,12 +25,12 @@ var ( Name: "txn_collect_event_count", Help: "The number of events received from txn collector", }, []string{"namespace", "changefeed", "type"}) - regionCollectCounter = prometheus.NewCounterVec( + cachedRegionCollectCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "ticdc", Subsystem: "puller", Name: "region_resolved_cached_count", - Help: "The number of events received from txn collector", + Help: "The number of regions cached when forward resolved ts", }, []string{"namespace", "changefeed", "type"}) pullerResolvedTsGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -74,7 +74,7 @@ var ( // InitMetrics registers all metrics in this file func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(txnCollectCounter) - registry.MustRegister(regionCollectCounter) + registry.MustRegister(cachedRegionCollectCounter) registry.MustRegister(pullerResolvedTsGauge) registry.MustRegister(memBufferSizeGauge) registry.MustRegister(outputChanSizeHistogram) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 04008d8a1f1..40c1a4954d9 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -92,9 +92,9 @@ func New(ctx context.Context, if len(spans) > 1 { pullerType = "ddl" } - metricTxnCollectCounterResolved := regionCollectCounter. + metricCachedRegionCollectCounter := cachedRegionCollectCounter. WithLabelValues(changefeed.Namespace, changefeed.ID, pullerType) - tsTracker := frontier.NewFrontier(0, metricTxnCollectCounterResolved, comparableSpans...) + tsTracker := frontier.NewFrontier(0, metricCachedRegionCollectCounter, comparableSpans...) kvCli := kv.NewCDCKVClient( ctx, pdCli, grpcPool, regionCache, pdClock, cfg, changefeed, tableID, tableName) p := &pullerImpl{ diff --git a/pkg/txnutil/lock_resolver.go b/pkg/txnutil/lock_resolver.go index caa8bebed09..c0b0aa49884 100644 --- a/pkg/txnutil/lock_resolver.go +++ b/pkg/txnutil/lock_resolver.go @@ -131,7 +131,7 @@ func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint } bo = tikv.NewGcResolveLockMaxBackoffer(ctx) } - log.Debug("resolve lock successfully", + log.Info("resolve lock successfully", zap.Uint64("regionID", regionID), zap.Int("lockCount", lockCount), zap.Uint64("maxVersion", maxVersion), From 0c3b4b0684d8089485839f67e13a95f815ef3064 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Fri, 30 Sep 2022 17:33:56 +0800 Subject: [PATCH 13/19] fix lint --- cdc/model/kv_gen.go | 132 -------------------------------------------- 1 file changed, 132 deletions(-) diff --git a/cdc/model/kv_gen.go b/cdc/model/kv_gen.go index cf0600e2f32..d78e50d3604 100644 --- a/cdc/model/kv_gen.go +++ b/cdc/model/kv_gen.go @@ -318,135 +318,3 @@ func (z *RawKVEntry) Msgsize() (s int) { s = 1 + 8 + msgp.IntSize + 4 + msgp.BytesPrefixSize + len(z.Key) + 6 + msgp.BytesPrefixSize + len(z.Value) + 10 + msgp.BytesPrefixSize + len(z.OldValue) + 9 + msgp.Uint64Size + 5 + msgp.Uint64Size + 10 + msgp.Uint64Size return } - -// DecodeMsg implements msgp.Decodable -func (z *RegionComparableSpan) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Span": - err = z.Span.DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "Span") - return - } - case "Region": - z.Region, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "Region") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z *RegionComparableSpan) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 - // write "Span" - err = en.Append(0x82, 0xa4, 0x53, 0x70, 0x61, 0x6e) - if err != nil { - return - } - err = z.Span.EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "Span") - return - } - // write "Region" - err = en.Append(0xa6, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e) - if err != nil { - return - } - err = en.WriteUint64(z.Region) - if err != nil { - err = msgp.WrapError(err, "Region") - return - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z *RegionComparableSpan) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 2 - // string "Span" - o = append(o, 0x82, 0xa4, 0x53, 0x70, 0x61, 0x6e) - o, err = z.Span.MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "Span") - return - } - // string "Region" - o = append(o, 0xa6, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e) - o = msgp.AppendUint64(o, z.Region) - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *RegionComparableSpan) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Span": - bts, err = z.Span.UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "Span") - return - } - case "Region": - z.Region, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "Region") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z *RegionComparableSpan) Msgsize() (s int) { - s = 1 + 5 + z.Span.Msgsize() + 7 + msgp.Uint64Size - return -} From 98f0790287b0f40689d470e45cede87437968133 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 6 Oct 2022 23:51:26 +0800 Subject: [PATCH 14/19] fix lint --- cdc/kv/client.go | 2 +- cdc/kv/client_test.go | 187 +++++++++++++++++++++----------------- cdc/kv/region_worker.go | 2 +- cdc/model/kv.go | 10 +- cdc/model/kv_gen.go | 132 +++++++++++++++++++++++++++ cdc/model/kv_test.go | 11 ++- cdc/puller/puller_test.go | 27 +++--- 7 files changed, 263 insertions(+), 108 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index c4faa3e6b17..4c8694300a0 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -878,7 +878,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { // After this resolved ts event is sent, we don't need to send one more // resolved ts event when the region starts to work. resolvedEv := model.RegionFeedEvent{ - Resolved: &model.ResolvedSpan{ + Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{ { Span: sri.span, diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index f58030e177d..f7586447013 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -351,7 +351,7 @@ func TestConnectOfflineTiKV(t *testing.T) { } checkEvent := func(event model.RegionFeedEvent, ts uint64) { - require.Equal(t, ts, event.Resolved[0].ResolvedTs) + require.Equal(t, ts, event.Resolved.ResolvedTs) } initialized := mockInitializedEvent(3 /* regionID */, currentRequestID()) @@ -605,7 +605,7 @@ consumePreResolvedTs: select { case event = <-eventCh: require.NotNil(t, event.Resolved) - require.Equal(t, uint64(100), event.Resolved[0].ResolvedTs) + require.Equal(t, uint64(100), event.Resolved.ResolvedTs) case <-time.After(time.Second): break consumePreResolvedTs } @@ -641,7 +641,7 @@ consumePreResolvedTs: require.FailNow(t, "reconnection not succeed in 3 seconds") } require.NotNil(t, event.Resolved) - require.Equal(t, uint64(120), event.Resolved[0].ResolvedTs) + require.Equal(t, uint64(120), event.Resolved.ResolvedTs) cancel() } @@ -1080,10 +1080,11 @@ func testHandleFeedEvent(t *testing.T) { expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 100, + }, }, { Val: &model.RawKVEntry{ @@ -1151,25 +1152,29 @@ func testHandleFeedEvent(t *testing.T) { RegionID: 3, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 135, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 135, + }, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 145, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 145, + }, }, } multipleExpected := model.RegionFeedEvent{ - Resolved: make([]*model.ResolvedSpan, multiSize), - } - for i := range multipleExpected.Resolved { - multipleExpected.Resolved[i] = &model.ResolvedSpan{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Resolved: &model.ResolvedSpans{ + Spans: make([]model.RegionComparableSpan, multiSize), ResolvedTs: 160, + }, + } + for i := range multipleExpected.Resolved.Spans { + multipleExpected.Resolved.Spans[i] = model.RegionComparableSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, } } @@ -1327,7 +1332,7 @@ func TestStreamSendWithError(t *testing.T) { select { case event := <-eventCh: require.NotNil(t, event.Resolved) - require.Equal(t, 1, len(event.Resolved)) + require.Equal(t, 1, len(event.Resolved.Spans)) require.NotNil(t, 0, event.RegionID) case <-time.After(time.Second): require.Fail(t, fmt.Sprintf("expected events are not receive, received: %v", initRegions)) @@ -1431,16 +1436,18 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 120, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 120, + }, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 120, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 120, + }, }, } @@ -1449,7 +1456,7 @@ eventLoop: for { select { case ev := <-eventCh: - if ev.Resolved[0].ResolvedTs != uint64(100) { + if ev.Resolved.ResolvedTs != uint64(100) { events = append(events, ev) } case <-time.After(time.Second): @@ -1617,7 +1624,7 @@ ReceiveLoop: break ReceiveLoop } received = append(received, event) - if event.Resolved[0].ResolvedTs == 130 { + if event.Resolved.ResolvedTs == 130 { break ReceiveLoop } case <-time.After(time.Second): @@ -1626,7 +1633,7 @@ ReceiveLoop: } var lastResolvedTs uint64 for _, e := range received { - if lastResolvedTs > e.Resolved[0].ResolvedTs { + if lastResolvedTs > e.Resolved.ResolvedTs { require.Fail(t, fmt.Sprintf("the resolvedTs is back off %#v", resolved)) } } @@ -1753,7 +1760,7 @@ func TestIncompatibleTiKV(t *testing.T) { ch1 <- initialized select { case event := <-eventCh: - require.Equal(t, 1, len(event.Resolved)) + require.Equal(t, 1, len(event.Resolved.Spans)) case <-time.After(time.Second): require.Fail(t, "expected events are not receive") } @@ -1823,7 +1830,7 @@ func TestNoPendingRegionError(t *testing.T) { ch1 <- initialized ev := <-eventCh require.NotNil(t, ev.Resolved) - require.Equal(t, uint64(100), ev.Resolved[0].ResolvedTs) + require.Equal(t, uint64(100), ev.Resolved.ResolvedTs) resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { @@ -1835,7 +1842,7 @@ func TestNoPendingRegionError(t *testing.T) { ch1 <- resolved ev = <-eventCh require.NotNil(t, ev.Resolved) - require.Equal(t, uint64(200), ev.Resolved[0].ResolvedTs) + require.Equal(t, uint64(200), ev.Resolved.ResolvedTs) cancel() } @@ -1910,22 +1917,25 @@ func TestDropStaleRequest(t *testing.T) { }} expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 100, + }, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 120, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 120, + }, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 130, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 130, + }, }, } @@ -2010,16 +2020,18 @@ func TestResolveLock(t *testing.T) { }} expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 100, + }, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: tso, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: tso, + }, }, } ch1 <- resolved @@ -2342,17 +2354,19 @@ func testEventAfterFeedStop(t *testing.T) { expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 100, + }, RegionID: regionID, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 100, + }, RegionID: regionID, }, { @@ -2367,10 +2381,11 @@ func testEventAfterFeedStop(t *testing.T) { RegionID: 3, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 120, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 120, + }, RegionID: regionID, }, } @@ -2542,10 +2557,11 @@ func TestOutOfRegionRangeEvent(t *testing.T) { expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 100, + }, }, { Val: &model.RawKVEntry{ @@ -2570,10 +2586,11 @@ func TestOutOfRegionRangeEvent(t *testing.T) { RegionID: 3, }, { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 145, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 145, + }, }, } @@ -3025,17 +3042,18 @@ func testKVClientForceReconnect(t *testing.T) { ch2 <- resolved expected := model.RegionFeedEvent{ - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - ResolvedTs: 135, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, + }}, ResolvedTs: 135, + }, } eventLoop: for { select { case ev := <-eventCh: - if ev.Resolved != nil && ev.Resolved[0].ResolvedTs == uint64(100) { + if ev.Resolved != nil && ev.Resolved.ResolvedTs == uint64(100) { continue } require.Equal(t, expected, ev) @@ -3263,10 +3281,11 @@ func TestEvTimeUpdate(t *testing.T) { expected := []model.RegionFeedEvent{ { - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 100, - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 10, + }, }, { Val: &model.RawKVEntry{ diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 0ed86f97b82..73db668b318 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -828,7 +828,7 @@ func (w *regionWorker) handleResolvedTs( state.lock.Unlock() } // emit a checkpointTs - revent := model.RegionFeedEvent{Resolved: &model.ResolvedSpan{ResolvedTs: resolvedTs, Spans: resolvedSpans}} + revent := model.RegionFeedEvent{Resolved: &model.ResolvedSpans{ResolvedTs: resolvedTs, Spans: resolvedSpans}} select { case w.outputCh <- revent: w.metrics.metricSendEventResolvedCounter.Add(float64(len(resolvedSpans))) diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 59186b7706e..4f5150348d4 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -38,7 +38,7 @@ const ( //msgp:ignore RegionFeedEvent type RegionFeedEvent struct { Val *RawKVEntry - Resolved *ResolvedSpan + Resolved *ResolvedSpans // Additional debug info, not used RegionID uint64 @@ -55,17 +55,17 @@ func (e *RegionFeedEvent) GetValue() interface{} { } } -// ResolvedSpan guarantees all the KV value event +// ResolvedSpans guarantees all the KV value event // with commit ts less than ResolvedTs has been emitted. // -//msgp:ignore ResolvedSpan -type ResolvedSpan struct { +//msgp:ignore ResolvedSpans +type ResolvedSpans struct { Spans []RegionComparableSpan ResolvedTs uint64 } // String implements fmt.Stringer interface. -func (rs *ResolvedSpan) String() string { +func (rs *ResolvedSpans) String() string { return fmt.Sprintf("span: %v, resolved-ts: %d", rs.Spans, rs.ResolvedTs) } diff --git a/cdc/model/kv_gen.go b/cdc/model/kv_gen.go index d78e50d3604..cf0600e2f32 100644 --- a/cdc/model/kv_gen.go +++ b/cdc/model/kv_gen.go @@ -318,3 +318,135 @@ func (z *RawKVEntry) Msgsize() (s int) { s = 1 + 8 + msgp.IntSize + 4 + msgp.BytesPrefixSize + len(z.Key) + 6 + msgp.BytesPrefixSize + len(z.Value) + 10 + msgp.BytesPrefixSize + len(z.OldValue) + 9 + msgp.Uint64Size + 5 + msgp.Uint64Size + 10 + msgp.Uint64Size return } + +// DecodeMsg implements msgp.Decodable +func (z *RegionComparableSpan) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Span": + err = z.Span.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Span") + return + } + case "Region": + z.Region, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Region") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *RegionComparableSpan) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "Span" + err = en.Append(0x82, 0xa4, 0x53, 0x70, 0x61, 0x6e) + if err != nil { + return + } + err = z.Span.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Span") + return + } + // write "Region" + err = en.Append(0xa6, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteUint64(z.Region) + if err != nil { + err = msgp.WrapError(err, "Region") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *RegionComparableSpan) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "Span" + o = append(o, 0x82, 0xa4, 0x53, 0x70, 0x61, 0x6e) + o, err = z.Span.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Span") + return + } + // string "Region" + o = append(o, 0xa6, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e) + o = msgp.AppendUint64(o, z.Region) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *RegionComparableSpan) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Span": + bts, err = z.Span.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Span") + return + } + case "Region": + z.Region, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Region") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *RegionComparableSpan) Msgsize() (s int) { + s = 1 + 5 + z.Span.Msgsize() + 7 + msgp.Uint64Size + return +} diff --git a/cdc/model/kv_test.go b/cdc/model/kv_test.go index bef23dd13e5..54a482b0a33 100644 --- a/cdc/model/kv_test.go +++ b/cdc/model/kv_test.go @@ -27,9 +27,10 @@ func TestRegionFeedEvent(t *testing.T) { CRTs: 1, OpType: OpTypePut, } - resolved := &ResolvedSpan{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: 111, + resolved := &ResolvedSpans{ + Spans: []RegionComparableSpan{{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + }}, ResolvedTs: 111, } ev := &RegionFeedEvent{} @@ -38,8 +39,8 @@ func TestRegionFeedEvent(t *testing.T) { ev = &RegionFeedEvent{Val: raw} require.Equal(t, raw, ev.GetValue()) - ev = &RegionFeedEvent{Resolved: []*ResolvedSpan{resolved}} - require.Equal(t, resolved, ev.GetValue().([]*ResolvedSpan)[0]) + ev = &RegionFeedEvent{Resolved: resolved} + require.Equal(t, resolved, ev.GetValue().([]*ResolvedSpans)[0]) require.Equal(t, "span: [61, 62), resolved-ts: 111", resolved.String()) } diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 6b9a65c2049..347d7864ae5 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -158,22 +158,25 @@ func TestPullerResolvedForward(t *testing.T) { plr, cancel, wg, store := newPullerForTest(t, spans, checkpointTs) plr.cli.Returns(model.RegionFeedEvent{ - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_a"), End: []byte("t_c")}), - ResolvedTs: uint64(1001), - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_a"), End: []byte("t_c")}), + }}, ResolvedTs: uint64(1001), + }, }) plr.cli.Returns(model.RegionFeedEvent{ - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_c"), End: []byte("t_d")}), - ResolvedTs: uint64(1002), - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_c"), End: []byte("t_d")}), + }}, ResolvedTs: uint64(1002), + }, }) plr.cli.Returns(model.RegionFeedEvent{ - Resolved: []*model.ResolvedSpan{{ - Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_d"), End: []byte("t_e")}), - ResolvedTs: uint64(1000), - }}, + Resolved: &model.ResolvedSpans{ + Spans: []model.RegionComparableSpan{{ + Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_d"), End: []byte("t_e")}), + }}, ResolvedTs: uint64(1000), + }, }) ev := <-plr.Output() require.Equal(t, model.OpTypeResolved, ev.OpType) From 603fe1c443e373fb4a5012484e2f9245c5a59917 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Fri, 7 Oct 2022 00:06:38 +0800 Subject: [PATCH 15/19] fix lint --- cdc/model/kv.go | 2 + cdc/model/kv_gen.go | 132 -------------------------------------------- 2 files changed, 2 insertions(+), 132 deletions(-) diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 4f5150348d4..ede7d6a9651 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -70,6 +70,8 @@ func (rs *ResolvedSpans) String() string { } // RegionComparableSpan contains a comparable span and a region id of that span +// +//msgp:ignore ResolvedSpans type RegionComparableSpan struct { Span regionspan.ComparableSpan Region uint64 diff --git a/cdc/model/kv_gen.go b/cdc/model/kv_gen.go index cf0600e2f32..d78e50d3604 100644 --- a/cdc/model/kv_gen.go +++ b/cdc/model/kv_gen.go @@ -318,135 +318,3 @@ func (z *RawKVEntry) Msgsize() (s int) { s = 1 + 8 + msgp.IntSize + 4 + msgp.BytesPrefixSize + len(z.Key) + 6 + msgp.BytesPrefixSize + len(z.Value) + 10 + msgp.BytesPrefixSize + len(z.OldValue) + 9 + msgp.Uint64Size + 5 + msgp.Uint64Size + 10 + msgp.Uint64Size return } - -// DecodeMsg implements msgp.Decodable -func (z *RegionComparableSpan) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Span": - err = z.Span.DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "Span") - return - } - case "Region": - z.Region, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "Region") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z *RegionComparableSpan) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 - // write "Span" - err = en.Append(0x82, 0xa4, 0x53, 0x70, 0x61, 0x6e) - if err != nil { - return - } - err = z.Span.EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "Span") - return - } - // write "Region" - err = en.Append(0xa6, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e) - if err != nil { - return - } - err = en.WriteUint64(z.Region) - if err != nil { - err = msgp.WrapError(err, "Region") - return - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z *RegionComparableSpan) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 2 - // string "Span" - o = append(o, 0x82, 0xa4, 0x53, 0x70, 0x61, 0x6e) - o, err = z.Span.MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "Span") - return - } - // string "Region" - o = append(o, 0xa6, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e) - o = msgp.AppendUint64(o, z.Region) - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *RegionComparableSpan) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Span": - bts, err = z.Span.UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "Span") - return - } - case "Region": - z.Region, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "Region") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z *RegionComparableSpan) Msgsize() (s int) { - s = 1 + 5 + z.Span.Msgsize() + 7 + msgp.Uint64Size - return -} From 813b72d97b53320ef2740155243e3de9104e7a8d Mon Sep 17 00:00:00 2001 From: sdojjy Date: Fri, 7 Oct 2022 00:53:29 +0800 Subject: [PATCH 16/19] fix lint --- cdc/model/kv.go | 2 +- cdc/model/kv_gen_test.go | 113 --------------------------------------- 2 files changed, 1 insertion(+), 114 deletions(-) diff --git a/cdc/model/kv.go b/cdc/model/kv.go index ede7d6a9651..566cad317f9 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -71,7 +71,7 @@ func (rs *ResolvedSpans) String() string { // RegionComparableSpan contains a comparable span and a region id of that span // -//msgp:ignore ResolvedSpans +//msgp:ignore RegionComparableSpan type RegionComparableSpan struct { Span regionspan.ComparableSpan Region uint64 diff --git a/cdc/model/kv_gen_test.go b/cdc/model/kv_gen_test.go index e59512c360c..dbf9aefd376 100644 --- a/cdc/model/kv_gen_test.go +++ b/cdc/model/kv_gen_test.go @@ -121,116 +121,3 @@ func BenchmarkDecodeRawKVEntry(b *testing.B) { } } } - -func TestMarshalUnmarshalRegionComparableSpan(t *testing.T) { - v := RegionComparableSpan{} - bts, err := v.MarshalMsg(nil) - if err != nil { - t.Fatal(err) - } - left, err := v.UnmarshalMsg(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) - } - - left, err = msgp.Skip(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after Skip(): %q", len(left), left) - } -} - -func BenchmarkMarshalMsgRegionComparableSpan(b *testing.B) { - v := RegionComparableSpan{} - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.MarshalMsg(nil) - } -} - -func BenchmarkAppendMsgRegionComparableSpan(b *testing.B) { - v := RegionComparableSpan{} - bts := make([]byte, 0, v.Msgsize()) - bts, _ = v.MarshalMsg(bts[0:0]) - b.SetBytes(int64(len(bts))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - bts, _ = v.MarshalMsg(bts[0:0]) - } -} - -func BenchmarkUnmarshalRegionComparableSpan(b *testing.B) { - v := RegionComparableSpan{} - bts, _ := v.MarshalMsg(nil) - b.ReportAllocs() - b.SetBytes(int64(len(bts))) - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := v.UnmarshalMsg(bts) - if err != nil { - b.Fatal(err) - } - } -} - -func TestEncodeDecodeRegionComparableSpan(t *testing.T) { - v := RegionComparableSpan{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - - m := v.Msgsize() - if buf.Len() > m { - t.Log("WARNING: TestEncodeDecodeRegionComparableSpan Msgsize() is inaccurate") - } - - vn := RegionComparableSpan{} - err := msgp.Decode(&buf, &vn) - if err != nil { - t.Error(err) - } - - buf.Reset() - msgp.Encode(&buf, &v) - err = msgp.NewReader(&buf).Skip() - if err != nil { - t.Error(err) - } -} - -func BenchmarkEncodeRegionComparableSpan(b *testing.B) { - v := RegionComparableSpan{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - en := msgp.NewWriter(msgp.Nowhere) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.EncodeMsg(en) - } - en.Flush() -} - -func BenchmarkDecodeRegionComparableSpan(b *testing.B) { - v := RegionComparableSpan{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - rd := msgp.NewEndlessReader(buf.Bytes(), b) - dc := msgp.NewReader(rd) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - err := v.DecodeMsg(dc) - if err != nil { - b.Fatal(err) - } - } -} From 68bf5da4a23bdf795a5cf6779425b9c9fb768e24 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Fri, 7 Oct 2022 19:29:41 +0800 Subject: [PATCH 17/19] fix ut --- cdc/kv/client_test.go | 56 ++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index f7586447013..3185a10dbad 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -1082,7 +1082,8 @@ func testHandleFeedEvent(t *testing.T) { { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, }}, ResolvedTs: 100, }, }, @@ -1154,14 +1155,16 @@ func testHandleFeedEvent(t *testing.T) { { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, }}, ResolvedTs: 135, }, }, { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, }}, ResolvedTs: 145, }, }, @@ -1174,7 +1177,8 @@ func testHandleFeedEvent(t *testing.T) { } for i := range multipleExpected.Resolved.Spans { multipleExpected.Resolved.Spans[i] = model.RegionComparableSpan{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, } } @@ -1438,14 +1442,16 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, }}, ResolvedTs: 120, }, }, { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, }}, ResolvedTs: 120, }, }, @@ -1919,21 +1925,24 @@ func TestDropStaleRequest(t *testing.T) { { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, }}, ResolvedTs: 100, }, }, { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, }}, ResolvedTs: 120, }, }, { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, }}, ResolvedTs: 130, }, }, @@ -2022,14 +2031,16 @@ func TestResolveLock(t *testing.T) { { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, }}, ResolvedTs: 100, }, }, { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, }}, ResolvedTs: tso, }, }, @@ -2356,7 +2367,8 @@ func testEventAfterFeedStop(t *testing.T) { { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, }}, ResolvedTs: 100, }, RegionID: regionID, @@ -2364,7 +2376,8 @@ func testEventAfterFeedStop(t *testing.T) { { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, }}, ResolvedTs: 100, }, RegionID: regionID, @@ -2383,7 +2396,8 @@ func testEventAfterFeedStop(t *testing.T) { { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: regionID, }}, ResolvedTs: 120, }, RegionID: regionID, @@ -2559,7 +2573,8 @@ func TestOutOfRegionRangeEvent(t *testing.T) { { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, }}, ResolvedTs: 100, }, }, @@ -2588,7 +2603,8 @@ func TestOutOfRegionRangeEvent(t *testing.T) { { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, }}, ResolvedTs: 145, }, }, @@ -3044,7 +3060,8 @@ func testKVClientForceReconnect(t *testing.T) { expected := model.RegionFeedEvent{ Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, + Region: regionID3, }}, ResolvedTs: 135, }, } @@ -3283,8 +3300,9 @@ func TestEvTimeUpdate(t *testing.T) { { Resolved: &model.ResolvedSpans{ Spans: []model.RegionComparableSpan{{ - Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - }}, ResolvedTs: 10, + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + Region: 3, + }}, ResolvedTs: 100, }, }, { From 9e365db81d1bfa770b627794835cacc9eac80837 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Sat, 8 Oct 2022 10:29:10 +0800 Subject: [PATCH 18/19] fix ut --- cdc/model/kv_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/model/kv_test.go b/cdc/model/kv_test.go index 54a482b0a33..567247f6ecc 100644 --- a/cdc/model/kv_test.go +++ b/cdc/model/kv_test.go @@ -40,9 +40,9 @@ func TestRegionFeedEvent(t *testing.T) { require.Equal(t, raw, ev.GetValue()) ev = &RegionFeedEvent{Resolved: resolved} - require.Equal(t, resolved, ev.GetValue().([]*ResolvedSpans)[0]) + require.Equal(t, resolved, ev.GetValue().(*ResolvedSpans)) - require.Equal(t, "span: [61, 62), resolved-ts: 111", resolved.String()) + require.Equal(t, "span: [{[61, 62) 0}], resolved-ts: 111", resolved.String()) } func TestRawKVEntry(t *testing.T) { From 9f7b12bf2c68e8ee87f445e9736a3ae79df559c9 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Sun, 9 Oct 2022 16:54:25 +0800 Subject: [PATCH 19/19] address comments --- cdc/puller/frontier/frontier.go | 18 +++++++++++------- cdc/puller/metrics.go | 8 ++++---- cdc/puller/puller.go | 4 ++-- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index dc3144af2f0..33ef73d9444 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -23,6 +23,10 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// fakeRegionID when the frontier is initializing, there is no region ID +// use fakeRegionID ,so this span will be cached +const fakeRegionID = 0 + // Frontier checks resolved event of spans and moves the global resolved ts ahead type Frontier interface { Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) @@ -38,7 +42,7 @@ type spanFrontier struct { seekTempResult []*skipListNode cachedRegions map[uint64]*skipListNode - metricResolvedRegionCachedCounter prometheus.Counter + metricResolvedRegionMissedCounter prometheus.Counter } // NewFrontier creates Frontier from the given spans. @@ -46,7 +50,7 @@ type spanFrontier struct { // So we use set it as util.UpperBoundKey, the means the real use case *should not* have an // End key bigger than util.UpperBoundKey func NewFrontier(checkpointTs uint64, - metricResolvedRegionCachedCounter prometheus.Counter, + metricResolvedRegionMissedCounter prometheus.Counter, spans ...regionspan.ComparableSpan, ) Frontier { s := &spanFrontier{ @@ -54,7 +58,7 @@ func NewFrontier(checkpointTs uint64, seekTempResult: make(seekResult, maxHeight), cachedRegions: map[uint64]*skipListNode{}, - metricResolvedRegionCachedCounter: metricResolvedRegionCachedCounter, + metricResolvedRegionMissedCounter: metricResolvedRegionMissedCounter, } firstSpan := true for _, span := range spans { @@ -79,13 +83,13 @@ func (s *spanFrontier) Frontier() uint64 { func (s *spanFrontier) Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) { // it's the fast part to detect if the region is split or merged, // if not we can update the minTsHeap with use new ts directly - if n, ok := s.cachedRegions[regionID]; ok && n.regionID > 0 && n.end != nil { - if regionID == n.regionID && bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) { + if n, ok := s.cachedRegions[regionID]; ok && n.regionID != fakeRegionID && n.end != nil { + if bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) { s.minTsHeap.UpdateKey(n.Value(), ts) - s.metricResolvedRegionCachedCounter.Inc() return } } + s.metricResolvedRegionMissedCounter.Inc() s.insert(regionID, span, ts) } @@ -101,7 +105,7 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t if next != nil { if bytes.Equal(seekRes.Node().Key(), span.Start) && bytes.Equal(next.Key(), span.End) { s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts) - if regionID > 0 { + if regionID != fakeRegionID { s.cachedRegions[regionID] = seekRes.Node() s.cachedRegions[regionID].regionID = regionID s.cachedRegions[regionID].end = next.key diff --git a/cdc/puller/metrics.go b/cdc/puller/metrics.go index 20502bf472f..c91e1a0f480 100644 --- a/cdc/puller/metrics.go +++ b/cdc/puller/metrics.go @@ -25,12 +25,12 @@ var ( Name: "txn_collect_event_count", Help: "The number of events received from txn collector", }, []string{"namespace", "changefeed", "type"}) - cachedRegionCollectCounter = prometheus.NewCounterVec( + missedRegionCollectCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "ticdc", Subsystem: "puller", - Name: "region_resolved_cached_count", - Help: "The number of regions cached when forward resolved ts", + Name: "region_resolved_missed_count", + Help: "The number of regions not cached when forward resolved ts", }, []string{"namespace", "changefeed", "type"}) pullerResolvedTsGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -74,7 +74,7 @@ var ( // InitMetrics registers all metrics in this file func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(txnCollectCounter) - registry.MustRegister(cachedRegionCollectCounter) + registry.MustRegister(missedRegionCollectCounter) registry.MustRegister(pullerResolvedTsGauge) registry.MustRegister(memBufferSizeGauge) registry.MustRegister(outputChanSizeHistogram) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 40c1a4954d9..a0267c82897 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -92,9 +92,9 @@ func New(ctx context.Context, if len(spans) > 1 { pullerType = "ddl" } - metricCachedRegionCollectCounter := cachedRegionCollectCounter. + metricMissedRegionCollectCounter := missedRegionCollectCounter. WithLabelValues(changefeed.Namespace, changefeed.ID, pullerType) - tsTracker := frontier.NewFrontier(0, metricCachedRegionCollectCounter, comparableSpans...) + tsTracker := frontier.NewFrontier(0, metricMissedRegionCollectCounter, comparableSpans...) kvCli := kv.NewCDCKVClient( ctx, pdCli, grpcPool, regionCache, pdClock, cfg, changefeed, tableID, tableName) p := &pullerImpl{