From 86810be1922217790f4c365f8a90baf7a8eb4c50 Mon Sep 17 00:00:00 2001 From: zzm Date: Thu, 4 Aug 2022 21:03:01 +0800 Subject: [PATCH] [fix #192] fix unified sorter (#194) * fix unified sorter Signed-off-by: zeminzhou * delete log Signed-off-by: zeminzhou * fix ut Signed-off-by: zeminzhou * fix comment Signed-off-by: zeminzhou * fix lint Signed-off-by: zeminzhou * fix lint Signed-off-by: zeminzhou --- cdc/cdc/model/kv.go | 8 +- cdc/cdc/model/kv_gen.go | 59 +++++++++- cdc/cdc/model/kv_gen_test.go | 3 + cdc/cdc/model/kv_test.go | 2 +- cdc/cdc/model/mounter.go | 7 ++ cdc/cdc/model/mounter_test.go | 118 ++++++++++++++++++++ cdc/cdc/puller/puller.go | 4 + cdc/cdc/puller/puller_test.go | 37 ++++++ cdc/cdc/sorter/unified/file_backend_test.go | 2 +- cdc/cdc/sorter/unified/merger_test.go | 2 +- cdc/cdc/sorter/unified/sorter_test.go | 62 +++++++--- 11 files changed, 282 insertions(+), 22 deletions(-) diff --git a/cdc/cdc/model/kv.go b/cdc/cdc/model/kv.go index 72ad9319..9fa9ebb1 100644 --- a/cdc/cdc/model/kv.go +++ b/cdc/cdc/model/kv.go @@ -87,11 +87,15 @@ type RawKVEntry struct { // Additional debug info RegionID uint64 `msg:"region_id"` KeySpanID uint64 `msg:"keyspan_id"` + // For providing additional sequence number. + // To keep `RawKVEntries` from the same region in order during unstable sorting of `sorter`. + // The sequence number is generated by auto-increment in `puller` node of `processor.pipeline`. + Sequence uint64 `msg:"sequence"` } func (v *RawKVEntry) String() string { - return fmt.Sprintf("OpType: %v, Key: %s, Value: %s, StartTs: %d, CRTs: %d, RegionID: %d", - v.OpType, string(v.Key), string(v.Value), v.StartTs, v.CRTs, v.RegionID) + return fmt.Sprintf("OpType: %v, Key: %s, Value: %s, StartTs: %d, CRTs: %d, RegionID: %d, KeySpanID: %d, Sequence: %d", + v.OpType, string(v.Key), string(v.Value), v.StartTs, v.CRTs, v.RegionID, v.KeySpanID, v.Sequence) } // ApproximateDataSize calculate the approximate size of protobuf binary diff --git a/cdc/cdc/model/kv_gen.go b/cdc/cdc/model/kv_gen.go index 76d7416e..0c5226ae 100644 --- a/cdc/cdc/model/kv_gen.go +++ b/cdc/cdc/model/kv_gen.go @@ -3,6 +3,7 @@ package model // Code generated by github.com/tinylib/msgp DO NOT EDIT. import ( + "fmt" "reflect" "github.com/tinylib/msgp/msgp" @@ -19,6 +20,19 @@ var ( fieldNameExpiredTs = generateFeildName("expired_ts") fieldNameRegionID = generateFeildName("region_id") fieldNameKeySpanID = generateFeildName("keyspan_id") + fieldNameSequence = generateFeildName("sequence") + fieldNames = [][]byte{ + fieldNameOpType, + fieldNameKey, + fieldNameValue, + fieldNameOldValue, + fieldNameStartTs, + fieldNameCRTs, + fieldNameExpiredTs, + fieldNameRegionID, + fieldNameKeySpanID, + fieldNameSequence, + } ) // DecodeMsg implements msgp.Decodable @@ -149,6 +163,12 @@ func (z *RawKVEntry) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "KeySpanID") return } + case "sequence": + z.Sequence, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Sequence") + return + } default: err = dc.Skip() if err != nil { @@ -162,7 +182,7 @@ func (z *RawKVEntry) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *RawKVEntry) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 9 + // map header, size 10 err = en.Append(0x80 + byte(fieldNum)) if err != nil { return @@ -257,13 +277,24 @@ func (z *RawKVEntry) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "KeySpanID") return } + // write "sequence" + err = en.Append(fieldNameSequence...) + if err != nil { + return + } + err = en.WriteUint64(z.Sequence) + if err != nil { + err = msgp.WrapError(err, "Sequence") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *RawKVEntry) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 9 + // map header, size 10 + fmt.Println(0x80 + byte(fieldNum)) o = append(o, 0x80+byte(fieldNum)) // string "op_type" o = append(o, fieldNameOpType...) @@ -292,6 +323,9 @@ func (z *RawKVEntry) MarshalMsg(b []byte) (o []byte, err error) { // string "keyspan_id" o = append(o, fieldNameKeySpanID...) o = msgp.AppendUint64(o, z.KeySpanID) + // string "sequence" + o = append(o, fieldNameSequence...) + o = msgp.AppendUint64(o, z.Sequence) return } @@ -371,6 +405,12 @@ func (z *RawKVEntry) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "KeySpanID") return } + case "sequence": + z.Sequence, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Sequence") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -385,7 +425,20 @@ func (z *RawKVEntry) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *RawKVEntry) Msgsize() (s int) { - s = 1 + 8 + msgp.IntSize + 4 + msgp.BytesPrefixSize + len(z.Key) + 6 + msgp.BytesPrefixSize + len(z.Value) + 10 + msgp.BytesPrefixSize + len(z.OldValue) + 9 + msgp.Uint64Size + 5 + msgp.Uint64Size + 11 + msgp.Uint64Size + 10 + msgp.Uint64Size + 11 + msgp.Uint64Size + s = 1 + for _, fieldName := range fieldNames { + s += len(fieldName) + } + s += msgp.IntSize + // OpType + msgp.BytesPrefixSize + len(z.Key) + // Key + msgp.BytesPrefixSize + len(z.Value) + // Value + msgp.BytesPrefixSize + len(z.OldValue) + // OldVale + msgp.Uint64Size + // StartTs + msgp.Uint64Size + // CRTs + msgp.Uint64Size + // ExpiredTs + msgp.Uint64Size + // RegionID + msgp.Uint64Size + // KeySpanID + msgp.Uint64Size // Sequence return } diff --git a/cdc/cdc/model/kv_gen_test.go b/cdc/cdc/model/kv_gen_test.go index 3f96002c..d8e6d218 100644 --- a/cdc/cdc/model/kv_gen_test.go +++ b/cdc/cdc/model/kv_gen_test.go @@ -38,6 +38,7 @@ func TestMarshalUnmarshalRawKVEntry(t *testing.T) { ExpiredTs: 2, RegionID: 3, KeySpanID: 4, + Sequence: 5, } bts, err := v.MarshalMsg(nil) @@ -61,6 +62,7 @@ func TestMarshalUnmarshalRawKVEntry(t *testing.T) { ExpiredTs: 2, RegionID: 3, KeySpanID: 4, + Sequence: 5, }) left, err = msgp.Skip(bts) @@ -118,6 +120,7 @@ func TestEncodeDecodeRawKVEntry(t *testing.T) { ExpiredTs: 2, RegionID: 3, KeySpanID: 4, + Sequence: 5, } var buf bytes.Buffer diff --git a/cdc/cdc/model/kv_test.go b/cdc/cdc/model/kv_test.go index 96892ab4..88f5d3d3 100644 --- a/cdc/cdc/model/kv_test.go +++ b/cdc/cdc/model/kv_test.go @@ -55,6 +55,6 @@ func TestRawKVEntry(t *testing.T) { Value: []byte("345"), } - require.Equal(t, "OpType: 1, Key: 123, Value: 345, StartTs: 100, CRTs: 101, RegionID: 0", raw.String()) + require.Equal(t, "OpType: 1, Key: 123, Value: 345, StartTs: 100, CRTs: 101, RegionID: 0, KeySpanID: 0, Sequence: 0", raw.String()) require.Equal(t, int64(6), raw.ApproximateDataSize()) } diff --git a/cdc/cdc/model/mounter.go b/cdc/cdc/model/mounter.go index 1a94eb97..21171681 100644 --- a/cdc/cdc/model/mounter.go +++ b/cdc/cdc/model/mounter.go @@ -60,6 +60,11 @@ func (e *PolymorphicEvent) IsResolved() bool { return e.RawKV.OpType == OpTypeResolved } +// Sequence returns the sequence of the event. +func (e *PolymorphicEvent) Sequence() uint64 { + return e.RawKV.Sequence +} + // ComparePolymorphicEvents compares two events by CRTs, Resolved order. // It returns true if and only if i should precede j. func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool { @@ -69,6 +74,8 @@ func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool { } else if j.IsResolved() { return true } + + return i.Sequence() < j.Sequence() } return i.CRTs < j.CRTs } diff --git a/cdc/cdc/model/mounter_test.go b/cdc/cdc/model/mounter_test.go index f7937e6c..978190e2 100644 --- a/cdc/cdc/model/mounter_test.go +++ b/cdc/cdc/model/mounter_test.go @@ -72,3 +72,121 @@ func TestPolymorphicEventPrepare(t *testing.T) { err := polyEvent.WaitPrepare(cctx) require.Equal(t, context.Canceled, err) } + +func TestComparePolymorphicEvents(t *testing.T) { + testCases := []struct { + i *PolymorphicEvent + j *PolymorphicEvent + expected bool + }{ + { + &PolymorphicEvent{ + CRTs: 1, + }, + &PolymorphicEvent{ + CRTs: 2, + }, + true, + }, + { + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypeResolved, + }, + }, + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypePut, + }, + }, + false, + }, + { + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypeResolved, + }, + }, + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypeResolved, + }, + }, + false, + }, + { + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypePut, + }, + }, + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypeResolved, + }, + }, + true, + }, + { + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypePut, + Sequence: 1, + }, + }, + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypePut, + Sequence: 2, + }, + }, + true, + }, + { + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypePut, + Sequence: 2, + }, + }, + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypePut, + Sequence: 1, + }, + }, + false, + }, + { + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypeResolved, + Sequence: 1, + }, + }, + &PolymorphicEvent{ + CRTs: 2, + RawKV: &RawKVEntry{ + OpType: OpTypePut, + Sequence: 2, + }, + }, + false, + }, + } + + for _, tc := range testCases { + require.Equal(t, tc.expected, ComparePolymorphicEvents(tc.i, tc.j)) + } +} diff --git a/cdc/cdc/puller/puller.go b/cdc/cdc/puller/puller.go index 3561fbe1..d6f5ce91 100644 --- a/cdc/cdc/puller/puller.go +++ b/cdc/cdc/puller/puller.go @@ -58,6 +58,7 @@ type pullerImpl struct { resolvedTs uint64 initialized int64 enableOldValue bool + eventSeq uint64 } // NewPuller create a new Puller fetch event start from checkpointTs @@ -95,6 +96,7 @@ func NewPuller( resolvedTs: checkpointTs, initialized: 0, enableOldValue: enableOldValue, + eventSeq: 0, } return p } @@ -185,6 +187,8 @@ func (p *pullerImpl) Run(ctx context.Context) error { log.Debug("[TRACE] revcive region feed event", zap.Stringer("event", e)) if e.Val != nil { + e.Val.Sequence = p.eventSeq + p.eventSeq += 1 metricTxnCollectCounterKv.Inc() if err := output(e.Val); err != nil { return errors.Trace(err) diff --git a/cdc/cdc/puller/puller_test.go b/cdc/cdc/puller/puller_test.go index 252ada44..85c4e101 100644 --- a/cdc/cdc/puller/puller_test.go +++ b/cdc/cdc/puller/puller_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "sync" + "testing" "github.com/pingcap/check" "github.com/pingcap/errors" @@ -39,6 +40,10 @@ type pullerSuite struct{} var _ = check.Suite(&pullerSuite{}) +func TestSuite(t *testing.T) { + check.TestingT(t) +} + type mockPdClientForPullerTest struct { pd.Client clusterID uint64 @@ -225,3 +230,35 @@ func (s *pullerSuite) TestPullerRawKV(c *check.C) { cancel() wg.Wait() } + +func (s *pullerSuite) TestAssignSequence(c *check.C) { + defer testleak.AfterTest(c)() + spans := []regionspan.Span{ + {Start: []byte("t_a"), End: []byte("t_z")}, + } + checkpointTs := uint64(999) + plr, cancel, wg, store := s.newPullerForTest(c, spans, checkpointTs) + go func() { + for i := 1; i <= 100; i++ { + plr.cli.Returns(model.RegionFeedEvent{ + Val: &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte("t_b"), + Value: []byte("test-value"), + CRTs: checkpointTs + uint64(i), + }, + }) + } + }() + + seq := uint64(0) + for seq < uint64(100) { + ev := <-plr.Output() + c.Assert(ev.Sequence, check.Equals, seq) + seq += 1 + } + + store.Close() + cancel() + wg.Wait() +} diff --git a/cdc/cdc/sorter/unified/file_backend_test.go b/cdc/cdc/sorter/unified/file_backend_test.go index 1153baf9..f56f6dbb 100644 --- a/cdc/cdc/sorter/unified/file_backend_test.go +++ b/cdc/cdc/sorter/unified/file_backend_test.go @@ -54,7 +54,7 @@ func (s *fileBackendSuite) TestNoSpace(c *check.C) { w, err := fb.writer() c.Assert(err, check.IsNil) - err = w.writeNext(model.NewPolymorphicEvent(generateMockRawKV(0))) + err = w.writeNext(model.NewPolymorphicEvent(generateMockRawKV(0, 0, uint64(0))[0])) if err == nil { // Due to write buffering, `writeNext` might not return an error when the filesystem is full. err = w.flushAndClose() diff --git a/cdc/cdc/sorter/unified/merger_test.go b/cdc/cdc/sorter/unified/merger_test.go index 178f8603..1cd8dca1 100644 --- a/cdc/cdc/sorter/unified/merger_test.go +++ b/cdc/cdc/sorter/unified/merger_test.go @@ -71,7 +71,7 @@ func (b *mockFlushTaskBuilder) generateRowChanges(tsRangeBegin, tsRangeEnd uint6 density := float64(tsRangeEnd-tsRangeBegin) / float64(count) for fTs := float64(tsRangeBegin); fTs < float64(tsRangeEnd); fTs += density { ts := uint64(fTs) - kvEntry := generateMockRawKV(ts) + kvEntry := generateMockRawKV(0, 0, ts)[0] _ = b.writer.writeNext(model.NewPolymorphicEvent(kvEntry)) b.totalCount++ } diff --git a/cdc/cdc/sorter/unified/sorter_test.go b/cdc/cdc/sorter/unified/sorter_test.go index a6f2658f..741cce94 100644 --- a/cdc/cdc/sorter/unified/sorter_test.go +++ b/cdc/cdc/sorter/unified/sorter_test.go @@ -14,7 +14,9 @@ package unified import ( + "bytes" "context" + "fmt" "math" _ "net/http/pprof" "os" @@ -45,15 +47,28 @@ var _ = check.SerialSuites(&sorterSuite{}) func Test(t *testing.T) { check.TestingT(t) } -func generateMockRawKV(ts uint64) *model.RawKVEntry { - return &model.RawKVEntry{ - OpType: model.OpTypePut, - Key: []byte{}, - Value: []byte{}, - OldValue: nil, - StartTs: ts - 5, - CRTs: ts, - RegionID: 0, +func generateMockRawKV(i, j int, ts uint64) [2]*model.RawKVEntry { + return [2]*model.RawKVEntry{ + { + OpType: model.OpTypePut, + Key: []byte(fmt.Sprintf("key%d-%d", i, j)), + Value: []byte("value1"), + OldValue: nil, + StartTs: ts, + CRTs: ts, + RegionID: 0, + Sequence: 1, + }, + { + OpType: model.OpTypeDelete, + Key: []byte(fmt.Sprintf("key%d-%d", i, j)), + Value: []byte("value2"), + OldValue: nil, + StartTs: ts, + CRTs: ts, + RegionID: 0, + Sequence: 2, + }, } } @@ -160,12 +175,16 @@ func testSorter(ctx context.Context, c *check.C, sorter sorter.EventSorter, coun default: } - sorter.AddEntry(ctx, model.NewPolymorphicEvent(generateMockRawKV(uint64(j)<<5))) + rawKVs := generateMockRawKV(finalI, j, uint64(j)<<5) + sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawKVs[0])) + sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawKVs[1])) if j%10000 == 0 { atomic.StoreUint64(&producerProgress[finalI], uint64(j)<<5) } } - sorter.AddEntry(ctx, model.NewPolymorphicEvent(generateMockRawKV(uint64(count+1)<<5))) + rawKVs := generateMockRawKV(finalI, count+1, uint64(count+1)<<5) + sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawKVs[0])) + sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawKVs[1])) atomic.StoreUint64(&producerProgress[finalI], uint64(count+1)<<5) return nil }) @@ -197,8 +216,11 @@ func testSorter(ctx context.Context, c *check.C, sorter sorter.EventSorter, coun // launch the consumer errg.Go(func() error { - counter := 0 - lastTs := uint64(0) + var ( + counter int + lastTs uint64 + lastEvent *model.PolymorphicEvent + ) ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { @@ -210,6 +232,16 @@ func testSorter(ctx context.Context, c *check.C, sorter sorter.EventSorter, coun if event.CRTs < lastTs { panic("regressed") } + if lastEvent != nil && + bytes.Equal(lastEvent.RawKV.Key, event.RawKV.Key) { + if !bytes.Equal(lastEvent.RawKV.Value, []byte("value1")) { + panic("lastEvent value isn't equal to value1") + } + if !bytes.Equal(event.RawKV.Value, []byte("value2")) { + panic("event value isn't equal to value2") + } + } + lastEvent = event lastTs = event.CRTs counter += 1 if counter%10000 == 0 { @@ -451,7 +483,9 @@ func (s *sorterSuite) TestSortClosedAddEntry(c *check.C) { ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second*10) defer cancel1() for i := 0; i < 10000; i++ { - sorter.AddEntry(ctx1, model.NewPolymorphicEvent(generateMockRawKV(uint64(i)))) + rawKVs := generateMockRawKV(0, 0, uint64(i)) + sorter.AddEntry(ctx1, model.NewPolymorphicEvent(rawKVs[0])) + sorter.AddEntry(ctx1, model.NewPolymorphicEvent(rawKVs[1])) } select {