Skip to content

Commit

Permalink
Merge branch 'fix-kafka-consumer-commitTs-regression' of https://gith…
Browse files Browse the repository at this point in the history
…ub.com/3AceShowHand/tiflow into fix-kafka-consumer-commitTs-regression
  • Loading branch information
3AceShowHand committed Apr 7, 2022
2 parents 0d1015b + 7f09db9 commit 88e3293
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 79 deletions.
16 changes: 9 additions & 7 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,19 @@ func (n *sinkNode) flushSink(ctx context.Context, resolvedTs model.Ts) (err erro
n.status.Store(TableStatusStopped)
return
}
if n.checkpointTs >= n.targetTs {
if atomic.LoadUint64(&n.checkpointTs) >= n.targetTs {
err = n.stop(ctx)
}
}()
if resolvedTs > n.barrierTs {
resolvedTs = n.barrierTs
currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs)
if resolvedTs > currentBarrierTs {
resolvedTs = currentBarrierTs
}
if resolvedTs > n.targetTs {
resolvedTs = n.targetTs
}
if resolvedTs <= n.checkpointTs {
if resolvedTs <= currentCheckpointTs {
return nil
}
if err := n.emitRowToSink(ctx); err != nil {
Expand All @@ -167,7 +169,7 @@ func (n *sinkNode) flushSink(ctx context.Context, resolvedTs model.Ts) (err erro
// 1. This table is newly added to the processor
// 2. There is one table in the processor that has a smaller
// checkpointTs than this one
if checkpointTs <= n.checkpointTs {
if checkpointTs <= currentCheckpointTs {
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)
Expand Down Expand Up @@ -343,7 +345,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
return false, errors.Trace(err)
}
case pmessage.MessageTypeTick:
if err := n.flushSink(ctx, n.resolvedTs); err != nil {
if err := n.flushSink(ctx, atomic.LoadUint64(&n.resolvedTs)); err != nil {
return false, errors.Trace(err)
}
case pmessage.MessageTypeCommand:
Expand All @@ -362,7 +364,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo

func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
atomic.StoreUint64(&n.barrierTs, ts)
if err := n.flushSink(ctx, n.resolvedTs); err != nil {
if err := n.flushSink(ctx, atomic.LoadUint64(&n.resolvedTs)); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
31 changes: 18 additions & 13 deletions cdc/sink/manager/kafka/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (m *TopicManager) Partitions(topic string) (int32, error) {
return partitions.(int32), nil
}

return m.cfg.PartitionNum, m.CreateTopic(topic)
return m.CreateTopic(topic)
}

// tryRefreshMeta try to refresh the topics' information maintained by manager.
Expand Down Expand Up @@ -97,7 +97,7 @@ func (m *TopicManager) tryUpdatePartitionsAndLogging(topic string, partitions in
m.topics.Store(topic, partitions)
log.Info(
"update topic partition number",
zap.String("duration", topic),
zap.String("topic", topic),
zap.Int32("oldPartitionNumber", oldPartitions.(int32)),
zap.Int32("newPartitionNumber", partitions),
)
Expand All @@ -106,27 +106,28 @@ func (m *TopicManager) tryUpdatePartitionsAndLogging(topic string, partitions in
m.topics.Store(topic, partitions)
log.Info(
"store topic partition number",
zap.String("duration", topic),
zap.String("topic", topic),
zap.Int32("partitionNumber", partitions),
)
}
}

// CreateTopic creates a topic with the given name.
func (m *TopicManager) CreateTopic(topicName string) error {
// CreateTopic creates a topic with the given name
// and returns the number of partitions.
func (m *TopicManager) CreateTopic(topicName string) (int32, error) {
start := time.Now()
topics, err := m.admin.ListTopics()
if err != nil {
log.Error(
"Kafka admin client list topics failed",
zap.Error(err),
zap.Duration("cost", time.Since(start)),
zap.Duration("duration", time.Since(start)),
)
return errors.Trace(err)
return 0, errors.Trace(err)
}
log.Info(
"Kafka admin client list topics success",
zap.Duration("cost", time.Since(start)),
zap.Duration("duration", time.Since(start)),
)

// Now that we have access to the latest topics' information,
Expand All @@ -137,12 +138,16 @@ func (m *TopicManager) CreateTopic(topicName string) error {
m.lastMetadataRefresh.Store(time.Now().Unix())

// Maybe our cache has expired information, so we just return it.
if _, ok := topics[topicName]; ok {
return nil
if t, ok := topics[topicName]; ok {
log.Info(
"topic already exists and the cached information has expired",
zap.String("topic", topicName),
)
return t.NumPartitions, nil
}

if !m.cfg.AutoCreate {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
return 0, cerror.ErrKafkaInvalidConfig.GenWithStack(
fmt.Sprintf("`auto-create-topic` is false, "+
"and %s not found", topicName))
}
Expand All @@ -162,7 +167,7 @@ func (m *TopicManager) CreateTopic(topicName string) error {
zap.Error(err),
zap.Duration("duration", time.Since(start)),
)
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
log.Info(
"Kafka admin client create the topic success",
Expand All @@ -173,5 +178,5 @@ func (m *TopicManager) CreateTopic(topicName string) error {
)
m.tryUpdatePartitionsAndLogging(topicName, m.cfg.PartitionNum)

return nil
return m.cfg.PartitionNum, nil
}
8 changes: 5 additions & 3 deletions cdc/sink/manager/kafka/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,21 @@ func TestCreateTopic(t *testing.T) {
}

manager := NewTopicManager(client, adminClient, cfg)
err := manager.CreateTopic(kafkamock.DefaultMockTopicName)
partitionNum, err := manager.CreateTopic(kafkamock.DefaultMockTopicName)
require.Nil(t, err)
require.Equal(t, int32(3), partitionNum)

err = manager.CreateTopic("new-topic")
partitionNum, err = manager.CreateTopic("new-topic")
require.Nil(t, err)
require.Equal(t, int32(2), partitionNum)
partitionsNum, err := manager.Partitions("new-topic")
require.Nil(t, err)
require.Equal(t, int32(2), partitionsNum)

// Try to create a topic without auto create.
cfg.AutoCreate = false
manager = NewTopicManager(client, adminClient, cfg)
err = manager.CreateTopic("new-topic2")
_, err = manager.CreateTopic("new-topic2")
require.Regexp(
t,
"`auto-create-topic` is false, and new-topic2 not found",
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ type TopicManager interface {
// Partitions returns the partitions of the topic.
Partitions(topic string) (int32, error)
// CreateTopic creates the topic.
CreateTopic(topicName string) error
CreateTopic(topicName string) (int32, error)
}
4 changes: 2 additions & 2 deletions cdc/sink/manager/pulsar/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ func (m *TopicManager) Partitions(_ string) (int32, error) {
}

// CreateTopic do nothing.
func (m *TopicManager) CreateTopic(_ string) error {
return nil
func (m *TopicManager) CreateTopic(_ string) (int32, error) {
return m.partitionNum, nil
}
10 changes: 9 additions & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,14 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
err = k.mqProducer.SyncBroadcastMessage(ctx, topic, partitionNum, msg)
return errors.Trace(err)
}
// Notice: We must call Partitions here,
// which will be responsible for automatically creating topics when they don't exist.
// If it is not called here and kafka has `auto.create.topics.enable` turned on,
// then the auto-created topic will not be created as configured by ticdc.
_, err = k.topicManager.Partitions(topic)
if err != nil {
return errors.Trace(err)
}
err = k.asyncFlushToPartitionZero(ctx, topic, msg)
return errors.Trace(err)
}
Expand Down Expand Up @@ -399,7 +407,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
adminClient,
baseConfig.DeriveTopicConfig(),
)
if err := topicManager.CreateTopic(topic); err != nil {
if _, err := topicManager.CreateTopic(topic); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err)
}

Expand Down
2 changes: 1 addition & 1 deletion deployments/ticdc/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ RUN apk add --no-cache git make bash
WORKDIR /go/src/github.com/pingcap/tiflow
COPY . .
ENV CDC_ENABLE_VENDOR=0
RUN make
RUN make cdc

FROM alpine:3.15
RUN apk add --no-cache tzdata bash curl socat
Expand Down
5 changes: 2 additions & 3 deletions pkg/config/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ type DebugConfig struct {

// EnableDBSorter enables db sorter.
//
// The default value is false.
// The default value is true.
EnableDBSorter bool `toml:"enable-db-sorter" json:"enable-db-sorter"`
DB *DBConfig `toml:"db" json:"db"`

// EnableNewScheduler enables the peer-messaging based new scheduler.
// The default value is false.
// TODO: turn on after GA.
// The default value is true.
EnableNewScheduler bool `toml:"enable-new-scheduler" json:"enable-new-scheduler"`
Messages *MessagesConfig `toml:"messages" json:"messages"`
}
Expand Down

This file was deleted.

43 changes: 0 additions & 43 deletions tests/integration_tests/bank_table_actor/run.sh

This file was deleted.

5 changes: 2 additions & 3 deletions tests/integration_tests/multi_topics/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
# FIXME(hi-rustin): Now the kafka test is not very stable, so skip it for now.
if [[ "$SINK_TYPE" == "mysql" || "$SINK_TYPE" == "kafka" ]]; then
if [ "$SINK_TYPE" == "mysql" ]; then
return
fi

Expand All @@ -31,7 +30,7 @@ function run() {

run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# NOTICE: we need to wait for the kafka topic to be created.
sleep 1m
sleep 2m

for i in $(seq 1 3); do
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/test_test${i}?protocol=canal-json&version=${KAFKA_VERSION}&enable-tidb-extension=true" "" ${i}
Expand Down

0 comments on commit 88e3293

Please sign in to comment.