Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka(ticdc): reset the admin client to fix broken pipe #8228

Merged
merged 6 commits into from
Feb 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 123 additions & 27 deletions pkg/sink/kafka/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,33 @@ package kafka

import (
"context"
"errors"
"io"
"net"
"strconv"
"strings"
"sync"
"syscall"

"github.com/Shopify/sarama"
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"go.uber.org/zap"
)

type saramaAdminClient struct {
brokerEndpoints []string
config *sarama.Config

mu sync.Mutex
client sarama.ClusterAdmin
}

const (
defaultRetryBackoff = 20
defaultRetryMaxTries = 3
)

// NewSaramaAdminClient constructs a ClusterAdminClient with sarama.
func NewSaramaAdminClient(ctx context.Context, config *Options) (ClusterAdminClient, error) {
saramaConfig, err := NewSaramaConfig(ctx, config)
Expand All @@ -39,11 +53,60 @@ func NewSaramaAdminClient(ctx context.Context, config *Options) (ClusterAdminCli
if err != nil {
return nil, err
}
return &saramaAdminClient{client: client}, nil
return &saramaAdminClient{
client: client,
brokerEndpoints: config.BrokerEndpoints,
config: saramaConfig,
}, nil
}

func (a *saramaAdminClient) reset() error {
newClient, err := sarama.NewClusterAdmin(a.brokerEndpoints, a.config)
if err != nil {
return cerror.Trace(err)
}

_ = a.client.Close()
a.client = newClient

return errors.New("retry after reset")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we always return an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if return nil here, the retry logic will exit, this return an error is on purpose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it is more appropriate to return this error outside of reset?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's very strange that a func always returns an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we move it outside, we still have to write one error when retry, no matter reset return error or not.

}

func (a *saramaAdminClient) queryClusterWithRetry(ctx context.Context, query func() error) error {
err := retry.Do(ctx, func() error {
a.mu.Lock()
defer a.mu.Unlock()
err := query()
if err == nil {
return nil
}

if !errors.Is(err, syscall.EPIPE) {
return err
}
if !errors.Is(err, net.ErrClosed) {
return err
}
if !errors.Is(err, io.EOF) {
return err
}

return a.reset()
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
}, retry.WithBackoffBaseDelay(defaultRetryBackoff), retry.WithMaxTries(defaultRetryMaxTries))
return err
}

func (a *saramaAdminClient) GetAllBrokers(context.Context) ([]Broker, error) {
brokers, _, err := a.client.DescribeCluster()
func (a *saramaAdminClient) GetAllBrokers(ctx context.Context) ([]Broker, error) {
var (
brokers []*sarama.Broker
err error
)
query := func() error {
brokers, _, err = a.client.DescribeCluster()
return err
}

err = a.queryClusterWithRetry(ctx, query)
if err != nil {
return nil, err
}
Expand All @@ -58,25 +121,39 @@ func (a *saramaAdminClient) GetAllBrokers(context.Context) ([]Broker, error) {
return result, nil
}

func (a *saramaAdminClient) GetCoordinator(context.Context) (int, error) {
_, controllerID, err := a.client.DescribeCluster()
if err != nil {
return 0, err
func (a *saramaAdminClient) GetCoordinator(ctx context.Context) (int, error) {
var (
controllerID int32
err error
)

query := func() error {
_, controllerID, err = a.client.DescribeCluster()
return err
}
return int(controllerID), nil
err = a.queryClusterWithRetry(ctx, query)
return int(controllerID), err
}

func (a *saramaAdminClient) GetBrokerConfig(_ context.Context, configName string) (string, error) {
_, controller, err := a.client.DescribeCluster()
func (a *saramaAdminClient) GetBrokerConfig(
ctx context.Context,
configName string,
) (string, error) {
controller, err := a.GetCoordinator(ctx)
if err != nil {
return "", err
}

configEntries, err := a.client.DescribeConfig(sarama.ConfigResource{
Type: sarama.BrokerResource,
Name: strconv.Itoa(int(controller)),
ConfigNames: []string{configName},
})
var configEntries []sarama.ConfigEntry
query := func() error {
configEntries, err = a.client.DescribeConfig(sarama.ConfigResource{
Type: sarama.BrokerResource,
Name: strconv.Itoa(controller),
ConfigNames: []string{configName},
})
return err
}
err = a.queryClusterWithRetry(ctx, query)
if err != nil {
return "", err
}
Expand All @@ -90,8 +167,17 @@ func (a *saramaAdminClient) GetBrokerConfig(_ context.Context, configName string
return configEntries[0].Value, nil
}

func (a *saramaAdminClient) GetAllTopicsMeta(context.Context) (map[string]TopicDetail, error) {
topics, err := a.client.ListTopics()
func (a *saramaAdminClient) GetAllTopicsMeta(ctx context.Context) (map[string]TopicDetail, error) {
var (
topics map[string]sarama.TopicDetail
err error
)

query := func() error {
topics, err = a.client.ListTopics()
return err
}
err = a.queryClusterWithRetry(ctx, query)
if err != nil {
return nil, err
}
Expand All @@ -116,11 +202,20 @@ func (a *saramaAdminClient) GetAllTopicsMeta(context.Context) (map[string]TopicD
}

func (a *saramaAdminClient) GetTopicsMeta(
_ context.Context,
ctx context.Context,
topics []string,
ignoreTopicError bool,
) (map[string]TopicDetail, error) {
metaList, err := a.client.DescribeTopics(topics)
var (
metaList []*sarama.TopicMetadata
err error
)
query := func() error {
metaList, err = a.client.DescribeTopics(topics)
return err
}

err = a.queryClusterWithRetry(ctx, query)
if err != nil {
return nil, err
}
Expand All @@ -146,21 +241,22 @@ func (a *saramaAdminClient) GetTopicsMeta(
}

func (a *saramaAdminClient) CreateTopic(
_ context.Context,
ctx context.Context,
detail *TopicDetail,
validateOnly bool,
) error {
err := a.client.CreateTopic(detail.Name, &sarama.TopicDetail{
request := &sarama.TopicDetail{
NumPartitions: detail.NumPartitions,
ReplicationFactor: detail.ReplicationFactor,
}, validateOnly)
// Ignore the already exists error because it's not harmful.
if err != nil && !strings.Contains(err.Error(), sarama.ErrTopicAlreadyExists.Error()) {
return err
}
return nil
query := func() error {
return a.client.CreateTopic(detail.Name, request, validateOnly)
}
return a.queryClusterWithRetry(ctx, query)
}

func (a *saramaAdminClient) Close() error {
a.mu.Lock()
defer a.mu.Unlock()
return a.client.Close()
}