Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new_owner: merge the new owner to 5.0 #2081

Merged
merged 28 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5ff8e2b
new_owner: a tso barriers mechanism (#1744)
May 10, 2021
51f8f96
*: refine the vars in context.Context (#1459)
May 13, 2021
af231d3
context: uniform the import naming of context, part 1 (#1773)
May 13, 2021
59f50ee
etcd_worker: support update multi-keys in a DataPatch (#1747)
May 20, 2021
fe25d93
new_owner: let processor and owner use the same reactor state (#1810)
May 21, 2021
3cca96f
etcd_worker: fix a bug that etcd worker maybe lost data at starting (…
May 24, 2021
e091da6
new_owner: a schema wrapper for owner (#1766)
May 24, 2021
cd79200
new_owner: implement gc manager (#1819)
May 26, 2021
350f293
new_owner: a ddl puller wrapper for owner (#1776)
May 26, 2021
117a385
new_owner: an async sink for owner (#1782)
May 29, 2021
a0ed0f0
new_owner: a feed state manager for owner (#1826)
May 30, 2021
2f00654
new_owner: a table task scheduler for the owner (#1820)
May 31, 2021
31d72df
new owner: port metrics from the old owner (#1886)
May 31, 2021
63c944c
new_owner: refine processor (#1821)
May 31, 2021
874c9d8
new_owner: a changefeed implement (#1894)
Jun 1, 2021
83917fd
new_owner: a owner implement, the owner is a manager of changefeeds (…
Jun 2, 2021
b5fbc9c
new_owner: a capture implement, the capture manages and runs the owne…
Jun 3, 2021
64eee8c
new_owner: switch on the new owner (#1927)
Jun 4, 2021
4e802be
tests: fix a unstable test in CI (#1940)
Jun 5, 2021
f45ef5a
owner: fix a changefeed can not removed when the changefeed status is…
Jun 5, 2021
2ef00b6
owner: fix the owner panic (#1964)
Jun 7, 2021
92f0224
owner: fix owner ship metrics (#1980)
Jun 9, 2021
3b96791
owner: fix some nil panic when switch off the new owner (#2004)
Jun 9, 2021
e6f4261
owner: fix etcd error too many operations in txn request (#1988)
Jun 10, 2021
f4330dd
owner: check the version of the ticdc cluster (#2003)
Jun 10, 2021
226ac10
gc_manager: refine update gc service safe point (#2029)
Jun 11, 2021
06cff6a
owner: deprecate the removed changefeed state (#1990)
Jun 15, 2021
43d763b
revert the CI test for the new owner and fix tests
Jun 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ check: check-copyright fmt lint check-static tidy errdoc check-leaktest-added

coverage:
GO111MODULE=off go get github.com/wadey/gocovmerge
gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/entry/schema_test_helper.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out"
ifeq ("$(JenkinsCI)", "1")
GO111MODULE=off go get github.com/mattn/goveralls
Expand Down
47 changes: 24 additions & 23 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"sync"
"time"

"github.com/pingcap/ticdc/pkg/version"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -28,10 +26,13 @@ import (
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/processor"
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
Expand All @@ -43,16 +44,11 @@ import (
"google.golang.org/grpc/backoff"
)

// captureOpts records options for capture
type captureOpts struct {
flushCheckpointInterval time.Duration
captureSessionTTL int
}

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
type Capture struct {
etcdClient kv.CDCEtcdClient
pdCli pd.Client
kvStorage tidbkv.Storage
credential *security.Credential

processorManager *processor.Manager
Expand All @@ -66,19 +62,18 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

opts *captureOpts
closed chan struct{}
}

// NewCapture returns a new Capture instance
func NewCapture(
ctx context.Context,
stdCtx context.Context,
pdEndpoints []string,
pdCli pd.Client,
credential *security.Credential,
advertiseAddr string,
opts *captureOpts,
kvStorage tidbkv.Storage,
) (c *Capture, err error) {
conf := config.GetGlobalServerConfig()
credential := conf.Security
tlsConfig, err := credential.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -92,7 +87,7 @@ func NewCapture(
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: pdEndpoints,
TLS: tlsConfig,
Context: ctx,
Context: stdCtx,
LogConfig: &logConfig,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
Expand All @@ -113,20 +108,20 @@ func NewCapture(
return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client")
}
sess, err := concurrency.NewSession(etcdCli,
concurrency.WithTTL(opts.captureSessionTTL))
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}
elec := concurrency.NewElection(sess, kv.CaptureOwnerKey)
cli := kv.NewCDCEtcdClient(ctx, etcdCli)
cli := kv.NewCDCEtcdClient(stdCtx, etcdCli)
id := uuid.New().String()
info := &model.CaptureInfo{
ID: id,
AdvertiseAddr: advertiseAddr,
AdvertiseAddr: conf.AdvertiseAddr,
Version: version.ReleaseVersion,
}
processorManager := processor.NewManager(pdCli, credential, info)
log.Info("creating capture", zap.String("capture-id", id), util.ZapFieldCapture(ctx))
processorManager := processor.NewManager()
log.Info("creating capture", zap.String("capture-id", id), util.ZapFieldCapture(stdCtx))

c = &Capture{
processors: make(map[string]*oldProcessor),
Expand All @@ -135,8 +130,8 @@ func NewCapture(
session: sess,
election: elec,
info: info,
opts: opts,
pdCli: pdCli,
kvStorage: kvStorage,
processorManager: processorManager,
closed: make(chan struct{}),
}
Expand All @@ -150,13 +145,19 @@ func (c *Capture) Run(ctx context.Context) (err error) {
// TODO: we'd better to add some wait mechanism to ensure no routine is blocked
defer cancel()
defer close(c.closed)

ctx = cdcContext.NewContext(ctx, &cdcContext.GlobalVars{
PDClient: c.pdCli,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
})
err = c.register(ctx)
if err != nil {
return errors.Trace(err)
}
if config.NewReplicaImpl {
sessionCli := c.session.Client()
etcdWorker, err := orchestrator.NewEtcdWorker(kv.NewCDCEtcdClient(ctx, sessionCli).Client, kv.EtcdKeyBase, c.processorManager, processor.NewGlobalState(c.info.ID))
etcdWorker, err := orchestrator.NewEtcdWorker(kv.NewCDCEtcdClient(ctx, sessionCli).Client, kv.EtcdKeyBase, c.processorManager, model.NewGlobalState())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -306,9 +307,9 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*oldProcessor, er
log.Info("run processor",
zap.String("capture-id", c.info.ID), util.ZapFieldCapture(ctx),
zap.String("changefeed", task.ChangeFeedID))

conf := config.GetGlobalServerConfig()
p, err := runProcessorImpl(
ctx, c.pdCli, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval)
ctx, c.pdCli, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, time.Duration(conf.ProcessorFlushInterval))
if err != nil {
log.Error("run processor failed",
zap.String("changefeed", task.ChangeFeedID),
Expand Down
Loading