Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer(cdc): fix unstable test move_table #5086

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d82a747
enable move_table case for test.
3AceShowHand Mar 31, 2022
ca35928
Merge branch 'master' into fix-kafka-consumer-commitTs-regression
3AceShowHand Mar 31, 2022
d968346
fix some log for debug help.
3AceShowHand Mar 31, 2022
902f429
Merge branch 'fix-kafka-consumer-commitTs-regression' of https://gith…
3AceShowHand Mar 31, 2022
fcad533
fix data race in pipeline sinkNode.
3AceShowHand Mar 31, 2022
dd56224
add a log to track.
3AceShowHand Mar 31, 2022
a4c266b
use last dml commit ts to prevent fall back,which cause mysql sink pa…
3AceShowHand Mar 31, 2022
f6a4676
add a log to mq help debug commit ts order.
3AceShowHand Mar 31, 2022
05c692d
change log level to debug for changefeed brrierTs check.
3AceShowHand Apr 1, 2022
4857d2d
emit row changed event when receive resolved ts.
3AceShowHand Apr 1, 2022
93b3f64
fix consumer panic.
3AceShowHand Apr 1, 2022
ad586c9
add error to sarama metrics collector.
3AceShowHand Apr 1, 2022
cc3f9f7
Merge branch 'master' into fix-kafka-consumer-commitTs-regression
3AceShowHand Apr 1, 2022
454c1ac
ready for review.
3AceShowHand Apr 1, 2022
8fa99db
Merge branch 'fix-kafka-consumer-commitTs-regression' of https://gith…
3AceShowHand Apr 1, 2022
1042f06
Merge branch 'master' into fix-kafka-consumer-commitTs-regression
3AceShowHand Apr 1, 2022
37f386d
fix kafka consumer, remove useless log.
3AceShowHand Apr 1, 2022
ad01187
Merge branch 'fix-kafka-consumer-commitTs-regression' of https://gith…
3AceShowHand Apr 1, 2022
fa57b8d
Update cdc/sink/common/common.go
3AceShowHand Apr 2, 2022
158a2cd
sarama version to 1.29.0
3AceShowHand Apr 2, 2022
6d5891c
Merge branch 'fix-kafka-consumer-commitTs-regression' of https://gith…
3AceShowHand Apr 2, 2022
913b3e2
Merge branch 'master' into fix-kafka-consumer-commitTs-regression
ti-chi-bot Apr 2, 2022
575787e
Merge branch 'master' into fix-kafka-consumer-commitTs-regression
ti-chi-bot Apr 2, 2022
81a0eb7
Merge branch 'master' into fix-kafka-consumer-commitTs-regression
ti-chi-bot Apr 3, 2022
fb19214
bump tidb to 0406 to fix unstable ddl test.
3AceShowHand Apr 6, 2022
f217cef
Merge branch 'fix-kafka-consumer-commitTs-regression' of https://gith…
3AceShowHand Apr 6, 2022
257ba8b
fix go mod tidy.
3AceShowHand Apr 6, 2022
48c8d39
fix line width in kafka consumer.
3AceShowHand Apr 6, 2022
ad2339c
replace tidb version.
3AceShowHand Apr 7, 2022
7f09db9
Merge branch 'master' into fix-kafka-consumer-commitTs-regression
3AceShowHand Apr 7, 2022
0d1015b
fix go mod tidy.
3AceShowHand Apr 7, 2022
88e3293
Merge branch 'fix-kafka-consumer-commitTs-regression' of https://gith…
3AceShowHand Apr 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
// This condition implies that the DDL resolved-ts has not yet reached checkpointTs,
// which implies that it would be premature to schedule tables or to update status.
// So we return here.
log.Info("barrierTs < checkpointTs, premature to schedule tables or update status",
log.Debug("barrierTs < checkpointTs, premature to schedule tables or update status",
zap.String("changefeed", c.id),
zap.Uint64("barrierTs", barrierTs), zap.Uint64("checkpointTs", checkpointTs))
return nil
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,14 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
}

func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (bool, error) {
if n.status == TableStatusStopped {
if n.status.Load() == TableStatusStopped {
return false, cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
}
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if event.RawKV.OpType == model.OpTypeResolved {
if n.status == TableStatusInitializing {
if n.status.Load() == TableStatusInitializing {
n.status.Store(TableStatusRunning)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
Expand Down
6 changes: 2 additions & 4 deletions cdc/sink/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha
// fail-fast check
if len(txns) != 0 && txns[len(txns)-1].commitTs > row.CommitTs {
log.Panic("the commitTs of the emit row is less than the received row",
zap.Stringer("table", row.Table),
zap.Uint64("emit row startTs", row.StartTs),
zap.Uint64("emit row commitTs", row.CommitTs),
zap.Uint64("last received row commitTs", txns[len(txns)-1].commitTs))
zap.Uint64("lastReceivedCommitTs", txns[len(txns)-1].commitTs),
zap.Any("row", row))
}
txns = append(txns, &txnsWithTheSameCommitTs{
commitTs: row.CommitTs,
Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/producer/kafka/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ func (sm *saramaMetricsMonitor) collectBrokers() {
"use historical brokers to collect kafka broker level metrics",
zap.String("changefeed", sm.changefeedID),
zap.Any("role", sm.role),
zap.Duration("duration", time.Since(start)))
zap.Duration("duration", time.Since(start)),
zap.Error(err))
return
}

Expand Down
66 changes: 56 additions & 10 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -453,9 +454,36 @@ func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

type eventsGroup struct {
events []*model.RowChangedEvent
}

func newEventsGroup() *eventsGroup {
return &eventsGroup{
events: make([]*model.RowChangedEvent, 0),
}
}

func (g *eventsGroup) Append(e *model.RowChangedEvent) {
g.events = append(g.events, e)
}

func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent {
sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})

i := sort.Search(len(g.events), func(i int) bool {
return g.events[i].CommitTs > resolveTs
})
result := g.events[:i]
g.events = g.events[i:]

return result
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := context.TODO()
partition := claim.Partition()
c.sinksMu.Lock()
sink := c.sinks[partition]
Expand All @@ -464,6 +492,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
panic("sink should initialized")
}

eventGroups := make(map[int64]*eventsGroup)
for message := range claim.Messages() {
var (
decoder codec.EventBatchDecoder
Expand Down Expand Up @@ -547,15 +576,15 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
if row.Table.IsPartition {
partitionID = row.Table.TableID
}
row.Table.TableID = c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID)
err = sink.EmitRowChangedEvents(ctx, row)
if err != nil {
log.Panic("emit row changed event failed", zap.Error(err))
}
lastCommitTs, ok := sink.tablesMap.Load(row.Table.TableID)
if !ok || lastCommitTs.(uint64) < row.CommitTs {
sink.tablesMap.Store(row.Table.TableID, row.CommitTs)
tableID := c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID)
row.Table.TableID = tableID

group, ok := eventGroups[tableID]
if !ok {
group = newEventsGroup()
eventGroups[tableID] = group
}
group.Append(row)
case model.MqMessageTypeResolved:
ts, err := decoder.NextResolvedEvent()
if err != nil {
Expand All @@ -570,7 +599,24 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
zap.Int32("partition", partition))
}
if ts > resolvedTs {
log.Info("update sink resolved ts",
for tableID, group := range eventGroups {
events := group.Resolve(ts)
if len(events) == 0 {
continue
}
if err := sink.EmitRowChangedEvents(context.Background(), events...); err != nil {
log.Panic("emit row changed event failed",
zap.Any("events", events),
zap.Error(err),
zap.Int32("partition", partition))
}
commitTs := events[len(events)-1].CommitTs
lastCommitTs, ok := sink.tablesMap.Load(tableID)
if !ok || lastCommitTs.(uint64) < commitTs {
sink.tablesMap.Store(tableID, commitTs)
}
}
log.Debug("update sink resolved ts",
zap.Uint64("ts", ts),
zap.Int32("partition", partition))
atomic.StoreUint64(&sink.resolvedTs, ts)
Expand Down
4 changes: 0 additions & 4 deletions tests/integration_tests/move_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
# TODO: remove after kafka-consumer/main.go is fixed
if [ "$SINK_TYPE" == "kafka" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

Expand Down