Skip to content

Commit

Permalink
[fix #192] fix unified sorter (#194)
Browse files Browse the repository at this point in the history
* fix unified sorter

Signed-off-by: zeminzhou <[email protected]>

* delete log

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix comment

Signed-off-by: zeminzhou <[email protected]>

* fix lint

Signed-off-by: zeminzhou <[email protected]>

* fix lint

Signed-off-by: zeminzhou <[email protected]>
zeminzhou authored Aug 4, 2022
1 parent 769754e commit 86810be
Showing 11 changed files with 282 additions and 22 deletions.
8 changes: 6 additions & 2 deletions cdc/cdc/model/kv.go
Original file line number Diff line number Diff line change
@@ -87,11 +87,15 @@ type RawKVEntry struct {
// Additional debug info
RegionID uint64 `msg:"region_id"`
KeySpanID uint64 `msg:"keyspan_id"`
// For providing additional sequence number.
// To keep `RawKVEntries` from the same region in order during unstable sorting of `sorter`.
// The sequence number is generated by auto-increment in `puller` node of `processor.pipeline`.
Sequence uint64 `msg:"sequence"`
}

func (v *RawKVEntry) String() string {
return fmt.Sprintf("OpType: %v, Key: %s, Value: %s, StartTs: %d, CRTs: %d, RegionID: %d",
v.OpType, string(v.Key), string(v.Value), v.StartTs, v.CRTs, v.RegionID)
return fmt.Sprintf("OpType: %v, Key: %s, Value: %s, StartTs: %d, CRTs: %d, RegionID: %d, KeySpanID: %d, Sequence: %d",
v.OpType, string(v.Key), string(v.Value), v.StartTs, v.CRTs, v.RegionID, v.KeySpanID, v.Sequence)
}

// ApproximateDataSize calculate the approximate size of protobuf binary
59 changes: 56 additions & 3 deletions cdc/cdc/model/kv_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cdc/cdc/model/kv_gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cdc/cdc/model/kv_test.go
Original file line number Diff line number Diff line change
@@ -55,6 +55,6 @@ func TestRawKVEntry(t *testing.T) {
Value: []byte("345"),
}

require.Equal(t, "OpType: 1, Key: 123, Value: 345, StartTs: 100, CRTs: 101, RegionID: 0", raw.String())
require.Equal(t, "OpType: 1, Key: 123, Value: 345, StartTs: 100, CRTs: 101, RegionID: 0, KeySpanID: 0, Sequence: 0", raw.String())
require.Equal(t, int64(6), raw.ApproximateDataSize())
}
7 changes: 7 additions & 0 deletions cdc/cdc/model/mounter.go
Original file line number Diff line number Diff line change
@@ -60,6 +60,11 @@ func (e *PolymorphicEvent) IsResolved() bool {
return e.RawKV.OpType == OpTypeResolved
}

// Sequence returns the sequence of the event.
func (e *PolymorphicEvent) Sequence() uint64 {
return e.RawKV.Sequence
}

// ComparePolymorphicEvents compares two events by CRTs, Resolved order.
// It returns true if and only if i should precede j.
func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
@@ -69,6 +74,8 @@ func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
} else if j.IsResolved() {
return true
}

return i.Sequence() < j.Sequence()
}
return i.CRTs < j.CRTs
}
118 changes: 118 additions & 0 deletions cdc/cdc/model/mounter_test.go
Original file line number Diff line number Diff line change
@@ -72,3 +72,121 @@ func TestPolymorphicEventPrepare(t *testing.T) {
err := polyEvent.WaitPrepare(cctx)
require.Equal(t, context.Canceled, err)
}

func TestComparePolymorphicEvents(t *testing.T) {
testCases := []struct {
i *PolymorphicEvent
j *PolymorphicEvent
expected bool
}{
{
&PolymorphicEvent{
CRTs: 1,
},
&PolymorphicEvent{
CRTs: 2,
},
true,
},
{
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypeResolved,
},
},
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypePut,
},
},
false,
},
{
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypeResolved,
},
},
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypeResolved,
},
},
false,
},
{
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypePut,
},
},
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypeResolved,
},
},
true,
},
{
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypePut,
Sequence: 1,
},
},
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypePut,
Sequence: 2,
},
},
true,
},
{
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypePut,
Sequence: 2,
},
},
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypePut,
Sequence: 1,
},
},
false,
},
{
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypeResolved,
Sequence: 1,
},
},
&PolymorphicEvent{
CRTs: 2,
RawKV: &RawKVEntry{
OpType: OpTypePut,
Sequence: 2,
},
},
false,
},
}

for _, tc := range testCases {
require.Equal(t, tc.expected, ComparePolymorphicEvents(tc.i, tc.j))
}
}
4 changes: 4 additions & 0 deletions cdc/cdc/puller/puller.go
Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@ type pullerImpl struct {
resolvedTs uint64
initialized int64
enableOldValue bool
eventSeq uint64
}

// NewPuller create a new Puller fetch event start from checkpointTs
@@ -95,6 +96,7 @@ func NewPuller(
resolvedTs: checkpointTs,
initialized: 0,
enableOldValue: enableOldValue,
eventSeq: 0,
}
return p
}
@@ -185,6 +187,8 @@ func (p *pullerImpl) Run(ctx context.Context) error {
log.Debug("[TRACE] revcive region feed event", zap.Stringer("event", e))

if e.Val != nil {
e.Val.Sequence = p.eventSeq
p.eventSeq += 1
metricTxnCollectCounterKv.Inc()
if err := output(e.Val); err != nil {
return errors.Trace(err)
37 changes: 37 additions & 0 deletions cdc/cdc/puller/puller_test.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ import (
"context"
"fmt"
"sync"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/errors"
@@ -39,6 +40,10 @@ type pullerSuite struct{}

var _ = check.Suite(&pullerSuite{})

func TestSuite(t *testing.T) {
check.TestingT(t)
}

type mockPdClientForPullerTest struct {
pd.Client
clusterID uint64
@@ -225,3 +230,35 @@ func (s *pullerSuite) TestPullerRawKV(c *check.C) {
cancel()
wg.Wait()
}

func (s *pullerSuite) TestAssignSequence(c *check.C) {
defer testleak.AfterTest(c)()
spans := []regionspan.Span{
{Start: []byte("t_a"), End: []byte("t_z")},
}
checkpointTs := uint64(999)
plr, cancel, wg, store := s.newPullerForTest(c, spans, checkpointTs)
go func() {
for i := 1; i <= 100; i++ {
plr.cli.Returns(model.RegionFeedEvent{
Val: &model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte("t_b"),
Value: []byte("test-value"),
CRTs: checkpointTs + uint64(i),
},
})
}
}()

seq := uint64(0)
for seq < uint64(100) {
ev := <-plr.Output()
c.Assert(ev.Sequence, check.Equals, seq)
seq += 1
}

store.Close()
cancel()
wg.Wait()
}
2 changes: 1 addition & 1 deletion cdc/cdc/sorter/unified/file_backend_test.go
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ func (s *fileBackendSuite) TestNoSpace(c *check.C) {
w, err := fb.writer()
c.Assert(err, check.IsNil)

err = w.writeNext(model.NewPolymorphicEvent(generateMockRawKV(0)))
err = w.writeNext(model.NewPolymorphicEvent(generateMockRawKV(0, 0, uint64(0))[0]))
if err == nil {
// Due to write buffering, `writeNext` might not return an error when the filesystem is full.
err = w.flushAndClose()
2 changes: 1 addition & 1 deletion cdc/cdc/sorter/unified/merger_test.go
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ func (b *mockFlushTaskBuilder) generateRowChanges(tsRangeBegin, tsRangeEnd uint6
density := float64(tsRangeEnd-tsRangeBegin) / float64(count)
for fTs := float64(tsRangeBegin); fTs < float64(tsRangeEnd); fTs += density {
ts := uint64(fTs)
kvEntry := generateMockRawKV(ts)
kvEntry := generateMockRawKV(0, 0, ts)[0]
_ = b.writer.writeNext(model.NewPolymorphicEvent(kvEntry))
b.totalCount++
}
62 changes: 48 additions & 14 deletions cdc/cdc/sorter/unified/sorter_test.go
Original file line number Diff line number Diff line change
@@ -14,7 +14,9 @@
package unified

import (
"bytes"
"context"
"fmt"
"math"
_ "net/http/pprof"
"os"
@@ -45,15 +47,28 @@ var _ = check.SerialSuites(&sorterSuite{})

func Test(t *testing.T) { check.TestingT(t) }

func generateMockRawKV(ts uint64) *model.RawKVEntry {
return &model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte{},
Value: []byte{},
OldValue: nil,
StartTs: ts - 5,
CRTs: ts,
RegionID: 0,
func generateMockRawKV(i, j int, ts uint64) [2]*model.RawKVEntry {
return [2]*model.RawKVEntry{
{
OpType: model.OpTypePut,
Key: []byte(fmt.Sprintf("key%d-%d", i, j)),
Value: []byte("value1"),
OldValue: nil,
StartTs: ts,
CRTs: ts,
RegionID: 0,
Sequence: 1,
},
{
OpType: model.OpTypeDelete,
Key: []byte(fmt.Sprintf("key%d-%d", i, j)),
Value: []byte("value2"),
OldValue: nil,
StartTs: ts,
CRTs: ts,
RegionID: 0,
Sequence: 2,
},
}
}

@@ -160,12 +175,16 @@ func testSorter(ctx context.Context, c *check.C, sorter sorter.EventSorter, coun
default:
}

sorter.AddEntry(ctx, model.NewPolymorphicEvent(generateMockRawKV(uint64(j)<<5)))
rawKVs := generateMockRawKV(finalI, j, uint64(j)<<5)
sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawKVs[0]))
sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawKVs[1]))
if j%10000 == 0 {
atomic.StoreUint64(&producerProgress[finalI], uint64(j)<<5)
}
}
sorter.AddEntry(ctx, model.NewPolymorphicEvent(generateMockRawKV(uint64(count+1)<<5)))
rawKVs := generateMockRawKV(finalI, count+1, uint64(count+1)<<5)
sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawKVs[0]))
sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawKVs[1]))
atomic.StoreUint64(&producerProgress[finalI], uint64(count+1)<<5)
return nil
})
@@ -197,8 +216,11 @@ func testSorter(ctx context.Context, c *check.C, sorter sorter.EventSorter, coun

// launch the consumer
errg.Go(func() error {
counter := 0
lastTs := uint64(0)
var (
counter int
lastTs uint64
lastEvent *model.PolymorphicEvent
)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
@@ -210,6 +232,16 @@ func testSorter(ctx context.Context, c *check.C, sorter sorter.EventSorter, coun
if event.CRTs < lastTs {
panic("regressed")
}
if lastEvent != nil &&
bytes.Equal(lastEvent.RawKV.Key, event.RawKV.Key) {
if !bytes.Equal(lastEvent.RawKV.Value, []byte("value1")) {
panic("lastEvent value isn't equal to value1")
}
if !bytes.Equal(event.RawKV.Value, []byte("value2")) {
panic("event value isn't equal to value2")
}
}
lastEvent = event
lastTs = event.CRTs
counter += 1
if counter%10000 == 0 {
@@ -451,7 +483,9 @@ func (s *sorterSuite) TestSortClosedAddEntry(c *check.C) {
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second*10)
defer cancel1()
for i := 0; i < 10000; i++ {
sorter.AddEntry(ctx1, model.NewPolymorphicEvent(generateMockRawKV(uint64(i))))
rawKVs := generateMockRawKV(0, 0, uint64(i))
sorter.AddEntry(ctx1, model.NewPolymorphicEvent(rawKVs[0]))
sorter.AddEntry(ctx1, model.NewPolymorphicEvent(rawKVs[1]))
}

select {

0 comments on commit 86810be

Please sign in to comment.