Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple with upstream #6

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
726 changes: 363 additions & 363 deletions cdc/api/open.go

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions cdc/api/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (h *ownerAPI) handleChangefeedAdmin(w http.ResponseWriter, req *http.Reques
opts.ForceRemove = forceRemoveOpt
}
job := model.AdminJob{
CfID: req.Form.Get(OpVarChangefeedID),
CfID: model.ChangeFeedID{req.Form.Get("namespace"), req.Form.Get(OpVarChangefeedID)},
Type: model.AdminJobType(typ),
Opts: opts,
}
Expand All @@ -169,8 +169,8 @@ func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Reque
writeError(w, http.StatusInternalServerError, err)
return
}
changefeedID := req.Form.Get(OpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
changefeedID := model.ChangeFeedID{"default", req.Form.Get(OpVarChangefeedID)}
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
writeError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID))
return
Expand All @@ -193,8 +193,8 @@ func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) {
cerror.WrapError(cerror.ErrInternalServerError, err))
return
}
changefeedID := req.Form.Get(OpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
changefeedID := model.ChangeFeedID{"default", req.Form.Get(OpVarChangefeedID)}
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
writeError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID))
return
Expand Down Expand Up @@ -230,8 +230,8 @@ func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Reques
writeError(w, http.StatusInternalServerError, err)
return
}
changefeedID := req.Form.Get(OpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
changefeedID := model.ChangeFeedID{req.Form.Get("namespace"), req.Form.Get(OpVarChangefeedID)}
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
writeError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID))
return
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func RegisterStatusAPIRoutes(router *gin.Engine, capture *capture.Capture) {
}

func (h *statusAPI) writeEtcdInfo(ctx context.Context, cli *etcd.CDCEtcdClient, w io.Writer) {
resp, err := cli.Client.Get(ctx, etcd.EtcdKeyBase, clientv3.WithPrefix())
resp, err := cli.Client.Get(ctx, etcd.EtcdKeyBase(), clientv3.WithPrefix())
if err != nil {
fmt.Fprintf(w, "failed to get info: %s\n\n", err.Error())
return
Expand Down
4 changes: 2 additions & 2 deletions cdc/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func handleOwnerJob(
}

func handleOwnerRebalance(
ctx context.Context, capture *capture.Capture, changefeedID string,
ctx context.Context, capture *capture.Capture, changefeedID model.ChangeFeedID,
) error {
// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
Expand All @@ -118,7 +118,7 @@ func handleOwnerRebalance(

func handleOwnerScheduleTable(
ctx context.Context, capture *capture.Capture,
changefeedID string, captureID string, tableID int64,
changefeedID model.ChangeFeedID, captureID string, tableID int64,
) error {
// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
Expand Down
44 changes: 26 additions & 18 deletions cdc/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"github.com/r3labs/diff"
Expand All @@ -41,6 +42,13 @@ func verifyCreateChangefeedConfig(
changefeedConfig model.ChangefeedConfig,
capture *capture.Capture,
) (*model.ChangeFeedInfo, error) {
upStream, err := upstream.UpStreamManager.Get("")
defer upstream.UpStreamManager.Release(0)

if err != nil {
return nil, errors.Trace(err)
}

// verify sinkURI
if changefeedConfig.SinkURI == "" {
return nil, cerror.ErrSinkURIInvalid.GenWithStackByArgs("sink-uri is empty, can't not create a changefeed without sink-uri")
Expand All @@ -51,17 +59,17 @@ func verifyCreateChangefeedConfig(
return nil, cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedConfig.ID)
}
// check if the changefeed exists
cfStatus, err := capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedConfig.ID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
return nil, err
}
if cfStatus != nil {
return nil, cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(changefeedConfig.ID)
}
//cfStatus, err := capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedConfig.ID)
//if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
// return nil, err
//}
//if cfStatus != nil {
// return nil, cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(changefeedConfig.ID)
//}

// verify start-ts
if changefeedConfig.StartTS == 0 {
ts, logical, err := capture.PDClient.GetTS(ctx)
ts, logical, err := upStream.PDClient.GetTS(ctx)
if err != nil {
return nil, cerror.ErrPDEtcdAPIError.GenWithStackByArgs("fail to get ts from pd client")
}
Expand All @@ -70,13 +78,13 @@ func verifyCreateChangefeedConfig(

// Ensure the start ts is valid in the next 1 hour.
const ensureTTL = 60 * 60
if err := gc.EnsureChangefeedStartTsSafety(
ctx, capture.PDClient, changefeedConfig.ID, ensureTTL, changefeedConfig.StartTS); err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
return nil, cerror.ErrPDEtcdAPIError.Wrap(err)
}
return nil, err
}
//if err := gc.EnsureChangefeedStartTsSafety(
// ctx, upStream.PDClient, changefeedConfig.ID, ensureTTL, changefeedConfig.StartTS); err != nil {
// if !cerror.ErrStartTsBeforeGC.Equal(err) {
// return nil, cerror.ErrPDEtcdAPIError.Wrap(err)
// }
// return nil, err
//}

// verify target-ts
if changefeedConfig.TargetTS > 0 && changefeedConfig.TargetTS <= changefeedConfig.StartTS {
Expand Down Expand Up @@ -133,7 +141,7 @@ func verifyCreateChangefeedConfig(
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := VerifyTables(replicaConfig, capture.Storage, changefeedConfig.StartTS)
ineligibleTables, _, err := VerifyTables(replicaConfig, upStream.KVStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
Expand All @@ -146,7 +154,7 @@ func verifyCreateChangefeedConfig(
if err != nil {
return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone))
}
ctx = util.PutTimezoneInCtx(ctx, tz)
ctx = contextutil.PutTimezoneInCtx(ctx, tz)
if err := sink.Validate(ctx, info.SinkURI, info.Config, info.Opts); err != nil {
return nil, err
}
Expand Down
107 changes: 52 additions & 55 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"

"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/processor"
Expand All @@ -44,7 +42,6 @@ import (
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/version"
)

Expand All @@ -62,12 +59,12 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

PDClient pd.Client
Storage tidbkv.Storage
EtcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
regionCache *tikv.RegionCache
pdClock *pdtime.PDClock
// PDClient pd.Client
// Storage tidbkv.Storage
EtcdClient *etcd.CDCEtcdClient
// grpcPool kv.GrpcPool
// regionCache *tikv.RegionCache
// pdClock *pdtime.PDClock
sorterSystem *ssystem.System

enableNewScheduler bool
Expand All @@ -90,15 +87,15 @@ type Capture struct {
cancel context.CancelFunc

newProcessorManager func() *processor.Manager
newOwner func(pd.Client) owner.Owner
newOwner func() owner.Owner
}

// NewCapture returns a new Capture instance
func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper) *Capture {
conf := config.GetGlobalServerConfig()
return &Capture{
PDClient: pdClient,
Storage: kvStorage,
// PDClient: pdClient,
// Storage: kvStorage,
EtcdClient: etcdClient,
grpcService: grpcService,
cancel: func() {},
Expand Down Expand Up @@ -141,16 +138,16 @@ func (c *Capture) reset(ctx context.Context) error {
_ = c.session.Close()
}
c.session = sess
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey)
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey())

if c.pdClock != nil {
c.pdClock.Stop()
}
// if c.pdClock != nil {
// c.pdClock.Stop()
// }

c.pdClock, err = pdtime.NewClock(ctx, c.PDClient)
if err != nil {
return errors.Trace(err)
}
// c.pdClock, err = pdtime.NewClock(ctx, c.PDClient)
// if err != nil {
// return errors.Trace(err)
// }

if c.tableActorSystem != nil {
c.tableActorSystem.Stop()
Expand Down Expand Up @@ -183,9 +180,9 @@ func (c *Capture) reset(ctx context.Context) error {
"create sorter system")
}
}
if c.grpcPool != nil {
c.grpcPool.Close()
}
// if c.grpcPool != nil {
// c.grpcPool.Close()
// }

if c.enableNewScheduler {
c.grpcService.Reset(nil)
Expand All @@ -197,11 +194,11 @@ func (c *Capture) reset(ctx context.Context) error {
}
}

c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security)
if c.regionCache != nil {
c.regionCache.Close()
}
c.regionCache = tikv.NewRegionCache(c.PDClient)
// c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security)
// if c.regionCache != nil {
// c.regionCache.Close()
// }
// c.regionCache = tikv.NewRegionCache(c.PDClient)

if c.enableNewScheduler {
messageServerConfig := conf.Debug.Messages.ToMessageServerConfig()
Expand Down Expand Up @@ -265,13 +262,13 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
PDClient: c.PDClient,
KVStorage: c.Storage,
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
GrpcPool: c.grpcPool,
RegionCache: c.regionCache,
PDClock: c.pdClock,
// PDClient: c.PDClient,
// KVStorage: c.Storage,
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
// GrpcPool: c.grpcPool,
// RegionCache: c.regionCache,
// PDClock: c.pdClock,
TableActorSystem: c.tableActorSystem,
SorterSystem: c.sorterSystem,
MessageServer: c.MessageServer,
Expand Down Expand Up @@ -324,16 +321,16 @@ func (c *Capture) run(stdCtx context.Context) error {
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, "processor")
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
wg.Add(1)
go func() {
defer wg.Done()
c.pdClock.Run(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
}()
// wg.Add(1)
// go func() {
// defer wg.Done()
// c.pdClock.Run(ctx)
// }()
// wg.Add(1)
// go func() {
// defer wg.Done()
// c.grpcPool.RecycleConn(ctx)
// }()
if c.enableNewScheduler {
wg.Add(1)
go func() {
Expand Down Expand Up @@ -419,7 +416,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
zap.String("captureID", c.info.ID),
zap.Int64("ownerRev", ownerRev))

owner := c.newOwner(c.PDClient)
owner := c.newOwner()
c.setOwner(owner)

globalState := orchestrator.NewGlobalState()
Expand Down Expand Up @@ -456,7 +453,7 @@ func (c *Capture) runEtcdWorker(
timerInterval time.Duration,
role string,
) error {
etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, etcd.EtcdKeyBase, reactor, reactorState)
etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, etcd.EtcdKeyBase(), reactor, reactorState)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -541,13 +538,13 @@ func (c *Capture) AsyncClose() {
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
if c.grpcPool != nil {
c.grpcPool.Close()
}
if c.regionCache != nil {
c.regionCache.Close()
c.regionCache = nil
}
// if c.grpcPool != nil {
// c.grpcPool.Close()
// }
// if c.regionCache != nil {
// c.regionCache.Close()
// c.regionCache = nil
// }
if c.tableActorSystem != nil {
c.tableActorSystem.Stop()
c.tableActorSystem = nil
Expand Down Expand Up @@ -622,7 +619,7 @@ func (c *Capture) GetOwnerCaptureInfo(ctx context.Context) (*model.CaptureInfo,
return nil, err
}

ownerID, err := c.EtcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey)
ownerID, err := c.EtcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey())
if err != nil {
return nil, err
}
Expand Down
Loading