Skip to content

Commit

Permalink
kafka(ticdc): reset the admin client to fix broken pipe (#8228) (#8230)
Browse files Browse the repository at this point in the history
close #8223, close #8225
  • Loading branch information
ti-chi-bot authored Feb 11, 2023
1 parent f5ed680 commit 30d4ca5
Showing 1 changed file with 123 additions and 27 deletions.
150 changes: 123 additions & 27 deletions pkg/sink/kafka/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,33 @@ package kafka

import (
"context"
"errors"
"io"
"net"
"strconv"
"strings"
"sync"
"syscall"

"github.com/Shopify/sarama"
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"go.uber.org/zap"
)

type saramaAdminClient struct {
brokerEndpoints []string
config *sarama.Config

mu sync.Mutex
client sarama.ClusterAdmin
}

const (
defaultRetryBackoff = 20
defaultRetryMaxTries = 3
)

// NewSaramaAdminClient constructs a ClusterAdminClient with sarama.
func NewSaramaAdminClient(ctx context.Context, config *Options) (ClusterAdminClient, error) {
saramaConfig, err := NewSaramaConfig(ctx, config)
Expand All @@ -39,11 +53,60 @@ func NewSaramaAdminClient(ctx context.Context, config *Options) (ClusterAdminCli
if err != nil {
return nil, err
}
return &saramaAdminClient{client: client}, nil
return &saramaAdminClient{
client: client,
brokerEndpoints: config.BrokerEndpoints,
config: saramaConfig,
}, nil
}

func (a *saramaAdminClient) reset() error {
newClient, err := sarama.NewClusterAdmin(a.brokerEndpoints, a.config)
if err != nil {
return cerror.Trace(err)
}

_ = a.client.Close()
a.client = newClient

return errors.New("retry after reset")
}

func (a *saramaAdminClient) queryClusterWithRetry(ctx context.Context, query func() error) error {
err := retry.Do(ctx, func() error {
a.mu.Lock()
defer a.mu.Unlock()
err := query()
if err == nil {
return nil
}

if !errors.Is(err, syscall.EPIPE) {
return err
}
if !errors.Is(err, net.ErrClosed) {
return err
}
if !errors.Is(err, io.EOF) {
return err
}

return a.reset()
}, retry.WithBackoffBaseDelay(defaultRetryBackoff), retry.WithMaxTries(defaultRetryMaxTries))
return err
}

func (a *saramaAdminClient) GetAllBrokers(context.Context) ([]Broker, error) {
brokers, _, err := a.client.DescribeCluster()
func (a *saramaAdminClient) GetAllBrokers(ctx context.Context) ([]Broker, error) {
var (
brokers []*sarama.Broker
err error
)
query := func() error {
brokers, _, err = a.client.DescribeCluster()
return err
}

err = a.queryClusterWithRetry(ctx, query)
if err != nil {
return nil, err
}
Expand All @@ -58,25 +121,39 @@ func (a *saramaAdminClient) GetAllBrokers(context.Context) ([]Broker, error) {
return result, nil
}

func (a *saramaAdminClient) GetCoordinator(context.Context) (int, error) {
_, controllerID, err := a.client.DescribeCluster()
if err != nil {
return 0, err
func (a *saramaAdminClient) GetCoordinator(ctx context.Context) (int, error) {
var (
controllerID int32
err error
)

query := func() error {
_, controllerID, err = a.client.DescribeCluster()
return err
}
return int(controllerID), nil
err = a.queryClusterWithRetry(ctx, query)
return int(controllerID), err
}

func (a *saramaAdminClient) GetBrokerConfig(_ context.Context, configName string) (string, error) {
_, controller, err := a.client.DescribeCluster()
func (a *saramaAdminClient) GetBrokerConfig(
ctx context.Context,
configName string,
) (string, error) {
controller, err := a.GetCoordinator(ctx)
if err != nil {
return "", err
}

configEntries, err := a.client.DescribeConfig(sarama.ConfigResource{
Type: sarama.BrokerResource,
Name: strconv.Itoa(int(controller)),
ConfigNames: []string{configName},
})
var configEntries []sarama.ConfigEntry
query := func() error {
configEntries, err = a.client.DescribeConfig(sarama.ConfigResource{
Type: sarama.BrokerResource,
Name: strconv.Itoa(controller),
ConfigNames: []string{configName},
})
return err
}
err = a.queryClusterWithRetry(ctx, query)
if err != nil {
return "", err
}
Expand All @@ -90,8 +167,17 @@ func (a *saramaAdminClient) GetBrokerConfig(_ context.Context, configName string
return configEntries[0].Value, nil
}

func (a *saramaAdminClient) GetAllTopicsMeta(context.Context) (map[string]TopicDetail, error) {
topics, err := a.client.ListTopics()
func (a *saramaAdminClient) GetAllTopicsMeta(ctx context.Context) (map[string]TopicDetail, error) {
var (
topics map[string]sarama.TopicDetail
err error
)

query := func() error {
topics, err = a.client.ListTopics()
return err
}
err = a.queryClusterWithRetry(ctx, query)
if err != nil {
return nil, err
}
Expand All @@ -116,11 +202,20 @@ func (a *saramaAdminClient) GetAllTopicsMeta(context.Context) (map[string]TopicD
}

func (a *saramaAdminClient) GetTopicsMeta(
_ context.Context,
ctx context.Context,
topics []string,
ignoreTopicError bool,
) (map[string]TopicDetail, error) {
metaList, err := a.client.DescribeTopics(topics)
var (
metaList []*sarama.TopicMetadata
err error
)
query := func() error {
metaList, err = a.client.DescribeTopics(topics)
return err
}

err = a.queryClusterWithRetry(ctx, query)
if err != nil {
return nil, err
}
Expand All @@ -146,21 +241,22 @@ func (a *saramaAdminClient) GetTopicsMeta(
}

func (a *saramaAdminClient) CreateTopic(
_ context.Context,
ctx context.Context,
detail *TopicDetail,
validateOnly bool,
) error {
err := a.client.CreateTopic(detail.Name, &sarama.TopicDetail{
request := &sarama.TopicDetail{
NumPartitions: detail.NumPartitions,
ReplicationFactor: detail.ReplicationFactor,
}, validateOnly)
// Ignore the already exists error because it's not harmful.
if err != nil && !strings.Contains(err.Error(), sarama.ErrTopicAlreadyExists.Error()) {
return err
}
return nil
query := func() error {
return a.client.CreateTopic(detail.Name, request, validateOnly)
}
return a.queryClusterWithRetry(ctx, query)
}

func (a *saramaAdminClient) Close() error {
a.mu.Lock()
defer a.mu.Unlock()
return a.client.Close()
}

0 comments on commit 30d4ca5

Please sign in to comment.