Skip to content

Commit

Permalink
fix line width in kafka consumer.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Apr 6, 2022
1 parent 257ba8b commit 48c8d39
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent {

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := context.Background()
partition := claim.Partition()
c.sinksMu.Lock()
sink := c.sinks[partition]
Expand Down Expand Up @@ -576,7 +577,8 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
if row.Table.IsPartition {
partitionID = row.Table.TableID
}
tableID := c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID)
tableID := c.fakeTableIDGenerator.
generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID)
row.Table.TableID = tableID

group, ok := eventGroups[tableID]
Expand Down Expand Up @@ -604,7 +606,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
if len(events) == 0 {
continue
}
if err := sink.EmitRowChangedEvents(context.Background(), events...); err != nil {
if err := sink.EmitRowChangedEvents(ctx, events...); err != nil {
log.Panic("emit row changed event failed",
zap.Any("events", events),
zap.Error(err),
Expand Down

0 comments on commit 48c8d39

Please sign in to comment.