Skip to content

Commit

Permalink
[fix #183] fix memory sort (#185)
Browse files Browse the repository at this point in the history
* update fix memory sort

Signed-off-by: zeminzhou <[email protected]>

* fix sort: if two evets.crts are equal, the delete event will be sort first

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix sort compare func

Signed-off-by: zeminzhou <[email protected]>

* fix lint

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* test for unified sorter

Signed-off-by: zeminzhou <[email protected]>

* fix error msg

Signed-off-by: zeminzhou <[email protected]>

* recover unified sorter

Signed-off-by: zeminzhou <[email protected]>
  • Loading branch information
zeminzhou authored Aug 3, 2022
1 parent 9582202 commit 769754e
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 77 deletions.
19 changes: 19 additions & 0 deletions cdc/cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,25 @@ func (e *PolymorphicEvent) RegionID() uint64 {
return e.RawKV.RegionID
}

// IsResolved returns true if the event is resolved. Note that this function can
// only be called when `RawKV != nil`.
func (e *PolymorphicEvent) IsResolved() bool {
return e.RawKV.OpType == OpTypeResolved
}

// ComparePolymorphicEvents compares two events by CRTs, Resolved order.
// It returns true if and only if i should precede j.
func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
if i.CRTs == j.CRTs {
if i.IsResolved() {
return false
} else if j.IsResolved() {
return true
}
}
return i.CRTs < j.CRTs
}

// SetUpFinishedChan creates an internal channel to support PrepareFinished and WaitPrepare
func (e *PolymorphicEvent) SetUpFinishedChan() {
if e.finished == nil {
Expand Down
13 changes: 2 additions & 11 deletions cdc/cdc/sorter/memory/entry_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (es *EntrySorter) Run(ctx context.Context) error {
}
toSort = append(toSort, resEvents...)
startTime := time.Now()
sort.Slice(toSort, func(i, j int) bool {
sort.SliceStable(toSort, func(i, j int) bool {
return eventLess(toSort[i], toSort[j])
})
metricEntrySorterSortDuration.Observe(time.Since(startTime).Seconds())
Expand Down Expand Up @@ -159,16 +159,7 @@ func (es *EntrySorter) Output() <-chan *model.PolymorphicEvent {
}

func eventLess(i *model.PolymorphicEvent, j *model.PolymorphicEvent) bool {
if i.CRTs == j.CRTs {
if i.RawKV.OpType == model.OpTypeDelete {
return true
}

if j.RawKV.OpType == model.OpTypeResolved {
return true
}
}
return i.CRTs < j.CRTs
return model.ComparePolymorphicEvents(i, j)
}

func mergeEvents(kvsA []*model.PolymorphicEvent, kvsB []*model.PolymorphicEvent, output func(*model.PolymorphicEvent)) {
Expand Down
179 changes: 124 additions & 55 deletions cdc/cdc/sorter/memory/entry_sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package memory

import (
"bytes"
"context"
"fmt"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -45,10 +47,10 @@ func (s *mockEntrySorterSuite) TestEntrySorter(c *check.C) {
}{
{
input: []*model.RawKVEntry{
{CRTs: 1, OpType: model.OpTypePut},
{CRTs: 2, OpType: model.OpTypePut},
{CRTs: 4, OpType: model.OpTypeDelete},
{CRTs: 2, OpType: model.OpTypeDelete},
{Key: []byte("key1"), CRTs: 1, OpType: model.OpTypePut},
{Key: []byte("key2"), CRTs: 2, OpType: model.OpTypePut},
{Key: []byte("key1"), CRTs: 4, OpType: model.OpTypeDelete},
{Key: []byte("key2"), CRTs: 2, OpType: model.OpTypeDelete},
},
resolvedTs: 0,
expect: []*model.RawKVEntry{
Expand All @@ -57,17 +59,17 @@ func (s *mockEntrySorterSuite) TestEntrySorter(c *check.C) {
},
{
input: []*model.RawKVEntry{
{CRTs: 3, OpType: model.OpTypePut},
{CRTs: 2, OpType: model.OpTypePut},
{CRTs: 5, OpType: model.OpTypePut},
{Key: []byte("key3"), CRTs: 3, OpType: model.OpTypePut},
{Key: []byte("key4"), CRTs: 2, OpType: model.OpTypePut},
{Key: []byte("key5"), CRTs: 5, OpType: model.OpTypePut},
},
resolvedTs: 3,
expect: []*model.RawKVEntry{
{CRTs: 1, OpType: model.OpTypePut},
{CRTs: 2, OpType: model.OpTypeDelete},
{CRTs: 2, OpType: model.OpTypePut},
{CRTs: 2, OpType: model.OpTypePut},
{CRTs: 3, OpType: model.OpTypePut},
{Key: []byte("key1"), CRTs: 1, OpType: model.OpTypePut},
{Key: []byte("key2"), CRTs: 2, OpType: model.OpTypePut},
{Key: []byte("key2"), CRTs: 2, OpType: model.OpTypeDelete},
{Key: []byte("key4"), CRTs: 2, OpType: model.OpTypePut},
{Key: []byte("key3"), CRTs: 3, OpType: model.OpTypePut},
{CRTs: 3, OpType: model.OpTypeResolved},
},
},
Expand All @@ -78,29 +80,29 @@ func (s *mockEntrySorterSuite) TestEntrySorter(c *check.C) {
},
{
input: []*model.RawKVEntry{
{CRTs: 7, OpType: model.OpTypePut},
{Key: []byte("key6"), CRTs: 7, OpType: model.OpTypePut},
},
resolvedTs: 6,
expect: []*model.RawKVEntry{
{CRTs: 4, OpType: model.OpTypeDelete},
{CRTs: 5, OpType: model.OpTypePut},
{Key: []byte("key1"), CRTs: 4, OpType: model.OpTypeDelete},
{Key: []byte("key5"), CRTs: 5, OpType: model.OpTypePut},
{CRTs: 6, OpType: model.OpTypeResolved},
},
},
{
input: []*model.RawKVEntry{{CRTs: 7, OpType: model.OpTypeDelete}},
input: []*model.RawKVEntry{{Key: []byte("key3"), CRTs: 7, OpType: model.OpTypeDelete}},
resolvedTs: 6,
expect: []*model.RawKVEntry{
{CRTs: 6, OpType: model.OpTypeResolved},
},
},
{
input: []*model.RawKVEntry{{CRTs: 7, OpType: model.OpTypeDelete}},
input: []*model.RawKVEntry{{Key: []byte("key4"), CRTs: 7, OpType: model.OpTypeDelete}},
resolvedTs: 8,
expect: []*model.RawKVEntry{
{CRTs: 7, OpType: model.OpTypeDelete},
{CRTs: 7, OpType: model.OpTypeDelete},
{CRTs: 7, OpType: model.OpTypePut},
{Key: []byte("key6"), CRTs: 7, OpType: model.OpTypePut},
{Key: []byte("key3"), CRTs: 7, OpType: model.OpTypeDelete},
{Key: []byte("key4"), CRTs: 7, OpType: model.OpTypeDelete},
{CRTs: 8, OpType: model.OpTypeResolved},
},
},
Expand Down Expand Up @@ -144,10 +146,10 @@ func (s *mockEntrySorterSuite) TestEntrySorterNonBlocking(c *check.C) {
}{
{
input: []*model.RawKVEntry{
{CRTs: 1, OpType: model.OpTypePut},
{CRTs: 2, OpType: model.OpTypePut},
{CRTs: 4, OpType: model.OpTypeDelete},
{CRTs: 2, OpType: model.OpTypeDelete},
{Key: []byte("key1"), CRTs: 1, OpType: model.OpTypePut},
{Key: []byte("key2"), CRTs: 2, OpType: model.OpTypePut},
{Key: []byte("key1"), CRTs: 4, OpType: model.OpTypeDelete},
{Key: []byte("key2"), CRTs: 2, OpType: model.OpTypeDelete},
},
resolvedTs: 0,
expect: []*model.RawKVEntry{
Expand All @@ -156,17 +158,17 @@ func (s *mockEntrySorterSuite) TestEntrySorterNonBlocking(c *check.C) {
},
{
input: []*model.RawKVEntry{
{CRTs: 3, OpType: model.OpTypePut},
{CRTs: 2, OpType: model.OpTypePut},
{CRTs: 5, OpType: model.OpTypePut},
{Key: []byte("key3"), CRTs: 3, OpType: model.OpTypePut},
{Key: []byte("key4"), CRTs: 2, OpType: model.OpTypePut},
{Key: []byte("key5"), CRTs: 5, OpType: model.OpTypePut},
},
resolvedTs: 3,
expect: []*model.RawKVEntry{
{CRTs: 1, OpType: model.OpTypePut},
{CRTs: 2, OpType: model.OpTypeDelete},
{CRTs: 2, OpType: model.OpTypePut},
{CRTs: 2, OpType: model.OpTypePut},
{CRTs: 3, OpType: model.OpTypePut},
{Key: []byte("key1"), CRTs: 1, OpType: model.OpTypePut},
{Key: []byte("key2"), CRTs: 2, OpType: model.OpTypePut},
{Key: []byte("key2"), CRTs: 2, OpType: model.OpTypeDelete},
{Key: []byte("key4"), CRTs: 2, OpType: model.OpTypePut},
{Key: []byte("key3"), CRTs: 3, OpType: model.OpTypePut},
{CRTs: 3, OpType: model.OpTypeResolved},
},
},
Expand All @@ -177,29 +179,29 @@ func (s *mockEntrySorterSuite) TestEntrySorterNonBlocking(c *check.C) {
},
{
input: []*model.RawKVEntry{
{CRTs: 7, OpType: model.OpTypePut},
{Key: []byte("key6"), CRTs: 7, OpType: model.OpTypePut},
},
resolvedTs: 6,
expect: []*model.RawKVEntry{
{CRTs: 4, OpType: model.OpTypeDelete},
{CRTs: 5, OpType: model.OpTypePut},
{Key: []byte("key1"), CRTs: 4, OpType: model.OpTypeDelete},
{Key: []byte("key5"), CRTs: 5, OpType: model.OpTypePut},
{CRTs: 6, OpType: model.OpTypeResolved},
},
},
{
input: []*model.RawKVEntry{{CRTs: 7, OpType: model.OpTypeDelete}},
input: []*model.RawKVEntry{{Key: []byte("key3"), CRTs: 7, OpType: model.OpTypeDelete}},
resolvedTs: 6,
expect: []*model.RawKVEntry{
{CRTs: 6, OpType: model.OpTypeResolved},
},
},
{
input: []*model.RawKVEntry{{CRTs: 7, OpType: model.OpTypeDelete}},
input: []*model.RawKVEntry{{Key: []byte("key4"), CRTs: 7, OpType: model.OpTypeDelete}},
resolvedTs: 8,
expect: []*model.RawKVEntry{
{CRTs: 7, OpType: model.OpTypeDelete},
{CRTs: 7, OpType: model.OpTypeDelete},
{CRTs: 7, OpType: model.OpTypePut},
{Key: []byte("key6"), CRTs: 7, OpType: model.OpTypePut},
{Key: []byte("key3"), CRTs: 7, OpType: model.OpTypeDelete},
{Key: []byte("key4"), CRTs: 7, OpType: model.OpTypeDelete},
{CRTs: 8, OpType: model.OpTypeResolved},
},
},
Expand Down Expand Up @@ -256,36 +258,58 @@ func (s *mockEntrySorterSuite) TestEntrySorterRandomly(c *check.C) {
go func() {
defer wg.Done()
for resolvedTs := uint64(1); resolvedTs <= maxTs; resolvedTs += 400 {
var opType model.OpType
var (
opType1 model.OpType
opType2 model.OpType
)
if rand.Intn(2) == 0 {
opType = model.OpTypePut
opType1 = model.OpTypePut
opType2 = model.OpTypePut
} else {
opType = model.OpTypeDelete
opType1 = model.OpTypePut
opType2 = model.OpTypeDelete
}
for i := 0; i < 1000; i++ {
CRTs := make([]uint64, 500)
for i := 0; i < 500; i++ {
CRTs[i] = uint64(int64(resolvedTs) + rand.Int63n(int64(maxTs-resolvedTs)))
entry := &model.RawKVEntry{
CRTs: uint64(int64(resolvedTs) + rand.Int63n(int64(maxTs-resolvedTs))),
OpType: opType,
Key: []byte(fmt.Sprintf("key%d-%d", resolvedTs, i)),
Value: []byte("value1"),
CRTs: CRTs[i],
OpType: opType1,
}
es.AddEntry(ctx, model.NewPolymorphicEvent(entry))
}
for i := 0; i < 500; i++ {
entry := &model.RawKVEntry{
Key: []byte(fmt.Sprintf("key%d-%d", resolvedTs, i)),
Value: []byte("value2"),
CRTs: CRTs[i],
OpType: opType2,
}
es.AddEntry(ctx, model.NewPolymorphicEvent(entry))
}
es.AddEntry(ctx, model.NewResolvedPolymorphicEvent(0, resolvedTs, 0))
}
es.AddEntry(ctx, model.NewResolvedPolymorphicEvent(0, maxTs, 0))
}()
var lastTs uint64
var resolvedTs uint64
lastOpType := model.OpTypePut
var (
lastTs uint64
lastEntry *model.PolymorphicEvent
resolvedTs uint64
)
for entry := range es.Output() {
c.Assert(entry.CRTs, check.GreaterEqual, lastTs)
c.Assert(entry.CRTs, check.Greater, resolvedTs)
if lastOpType == model.OpTypePut && entry.RawKV.OpType == model.OpTypeDelete {
c.Assert(entry.CRTs, check.Greater, lastTs)
}
lastTs = entry.CRTs
lastOpType = entry.RawKV.OpType
if entry.RawKV.OpType == model.OpTypeResolved {
resolvedTs = entry.CRTs
} else {
if lastEntry != nil && bytes.Equal(lastEntry.RawKV.Key, entry.RawKV.Key) {
c.Assert(lastEntry.RawKV.Value, check.BytesEquals, []byte("value1"))
c.Assert(entry.RawKV.Value, check.BytesEquals, []byte("value2"))
}
lastEntry = entry
}
if resolvedTs == maxTs {
break
Expand Down Expand Up @@ -324,7 +348,7 @@ func (s *mockEntrySorterSuite) TestEventLess(c *check.C) {
OpType: model.OpTypeDelete,
},
},
true,
false,
},
{
&model.PolymorphicEvent{
Expand All @@ -339,7 +363,7 @@ func (s *mockEntrySorterSuite) TestEventLess(c *check.C) {
OpType: model.OpTypeResolved,
},
},
true,
false,
},
{
&model.PolymorphicEvent{
Expand All @@ -356,6 +380,51 @@ func (s *mockEntrySorterSuite) TestEventLess(c *check.C) {
},
false,
},
{
&model.PolymorphicEvent{
CRTs: 2,
RawKV: &model.RawKVEntry{
OpType: model.OpTypeDelete,
},
},
&model.PolymorphicEvent{
CRTs: 2,
RawKV: &model.RawKVEntry{
OpType: model.OpTypeResolved,
},
},
true,
},
{
&model.PolymorphicEvent{
CRTs: 2,
RawKV: &model.RawKVEntry{
OpType: model.OpTypePut,
},
},
&model.PolymorphicEvent{
CRTs: 2,
RawKV: &model.RawKVEntry{
OpType: model.OpTypeResolved,
},
},
true,
},
{
&model.PolymorphicEvent{
CRTs: 2,
RawKV: &model.RawKVEntry{
OpType: model.OpTypeResolved,
},
},
&model.PolymorphicEvent{
CRTs: 2,
RawKV: &model.RawKVEntry{
OpType: model.OpTypePut,
},
},
false,
},
{
&model.PolymorphicEvent{
CRTs: 3,
Expand Down
10 changes: 1 addition & 9 deletions cdc/cdc/sorter/unified/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,7 @@ type sortHeap []*sortItem

func (h sortHeap) Len() int { return len(h) }
func (h sortHeap) Less(i, j int) bool {
if h[i].entry.CRTs == h[j].entry.CRTs {
if h[j].entry.RawKV.OpType == model.OpTypeResolved && h[i].entry.RawKV.OpType != model.OpTypeResolved {
return true
}
if h[i].entry.RawKV.OpType == model.OpTypeDelete && h[j].entry.RawKV.OpType != model.OpTypeDelete {
return true
}
}
return h[i].entry.CRTs < h[j].entry.CRTs
return model.ComparePolymorphicEvents(h[i].entry, h[j].entry)
}
func (h sortHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *sortHeap) Push(x interface{}) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/sorter/unified/heap_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (h *heapSorter) init(ctx context.Context, onError func(err error)) {
poolHandle := heapSorterPool.RegisterEvent(func(ctx context.Context, eventI interface{}) error {
event := eventI.(*model.PolymorphicEvent)
heap.Push(&h.heap, &sortItem{entry: event})
isResolvedEvent := event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved
isResolvedEvent := event.RawKV != nil && event.IsResolved()

if isResolvedEvent {
if event.RawKV.CRTs < state.maxResolved {
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/sorter/unified/unified_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (s *Sorter) Run(ctx context.Context) error {
case <-subctx.Done():
return subctx.Err()
case event := <-s.inputCh:
if event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved {
if event.RawKV != nil && event.IsResolved() {
// broadcast resolved events
for _, sorter := range heapSorters {
select {
Expand Down

0 comments on commit 769754e

Please sign in to comment.