Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi tenant changefeed scheduler poc #16

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
59 changes: 59 additions & 0 deletions cdc/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,65 @@ func ForwardToOwner(c *gin.Context, p capture.Capture) {
}
}

// ForwardToChangefeedOwner forwards a request to the changefeed owner
func ForwardToChangefeedOwner(c *gin.Context, addr string) {
ctx := c.Request.Context()

security := config.GetGlobalServerConfig().Security

// init a request
req, err := http.NewRequestWithContext(
ctx, c.Request.Method, c.Request.RequestURI, c.Request.Body)
if err != nil {
_ = c.Error(err)
return
}

req.URL.Host = addr
// we should check tls config instead of security here because
// security will never be nil
if tls, _ := security.ToTLSConfigWithVerify(); tls != nil {
req.URL.Scheme = "https"
} else {
req.URL.Scheme = "http"
}
for k, v := range c.Request.Header {
for _, vv := range v {
req.Header.Add(k, vv)
}
}

// forward to owner
cli, err := httputil.NewClient(security)
if err != nil {
_ = c.Error(err)
return
}
resp, err := cli.Do(req)
if err != nil {
_ = c.Error(err)
return
}

// write header
for k, values := range resp.Header {
for _, v := range values {
c.Header(k, v)
}
}

// write status code
c.Status(resp.StatusCode)

// write response body
defer resp.Body.Close()
_, err = bufio.NewReader(resp.Body).WriteTo(c.Writer)
if err != nil {
_ = c.Error(err)
return
}
}

// HandleOwnerDrainCapture schedule drain the target capture
func HandleOwnerDrainCapture(
ctx context.Context, capture capture.Capture, captureID string,
Expand Down
3 changes: 1 addition & 2 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {

// changefeed apis
changefeedGroup := v2.Group("/changefeeds")
changefeedGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
//changefeedGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
changefeedGroup.GET("/:changefeed_id", api.getChangeFeed)
changefeedGroup.POST("", api.createChangefeed)
changefeedGroup.GET("", api.listChangeFeeds)
Expand All @@ -68,7 +68,6 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {

// processor apis
processorGroup := v2.Group("/processors")
processorGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
processorGroup.GET("/:changefeed_id/:capture_id", api.getProcessor)
processorGroup.GET("", api.listProcessors)

Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (h *OpenAPIV2) listCaptures(c *gin.Context) {
IsOwner: isOwner,
AdvertiseAddr: c.AdvertiseAddr,
ClusterID: etcdClient.GetClusterID(),
Labels: c.Labels,
})
}
resp := &ListResponse[Capture]{
Expand Down
114 changes: 101 additions & 13 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ const (
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds [post]
func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
api.ForwardToOwner(c, h.capture)

// Without calling Abort(), Gin will continued to process the next handler,
// execute code which should only be run by the owner, and cause a panic.
// See https://github.com/pingcap/tiflow/issues/5888
c.Abort()
return
}
ctx := c.Request.Context()
cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()}

Expand Down Expand Up @@ -223,6 +232,15 @@ func hasRunningImport(ctx context.Context, cli *clientv3.Client) error {
// @Failure 500 {object} model.HTTPError
// @Router /api/v2/changefeeds [get]
func (h *OpenAPIV2) listChangeFeeds(c *gin.Context) {
if !h.capture.IsOwner() {
api.ForwardToOwner(c, h.capture)

// Without calling Abort(), Gin will continued to process the next handler,
// execute code which should only be run by the owner, and cause a panic.
// See https://github.com/pingcap/tiflow/issues/5888
c.Abort()
return
}
ctx := c.Request.Context()
state := c.Query(apiOpVarChangefeedState)
statuses, err := h.capture.StatusProvider().GetAllChangeFeedStatuses(ctx)
Expand Down Expand Up @@ -372,6 +390,15 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id} [put]
func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
api.ForwardToOwner(c, h.capture)

// Without calling Abort(), Gin will continued to process the next handler,
// execute code which should only be run by the owner, and cause a panic.
// See https://github.com/pingcap/tiflow/issues/5888
c.Abort()
return
}
ctx := c.Request.Context()

namespace := getNamespaceValueWithDefault(c)
Expand Down Expand Up @@ -482,6 +509,11 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id} [get]
func (h *OpenAPIV2) getChangeFeed(c *gin.Context) {
if !h.capture.IsOwner() {
api.ForwardToOwner(c, h.capture)
c.Abort()
return
}
ctx := c.Request.Context()
namespace := getNamespaceValueWithDefault(c)
changefeedID := model.ChangeFeedID{Namespace: namespace, ID: c.Param(apiOpVarChangefeedID)}
Expand Down Expand Up @@ -510,9 +542,15 @@ func (h *OpenAPIV2) getChangeFeed(c *gin.Context) {
_ = c.Error(err)
return
}
ow, err := h.capture.StatusProvider().GetChangefeedOwner(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

taskStatus := make([]model.CaptureTaskStatus, 0)
if cfInfo.State == model.StateNormal {
if ow != nil && cfInfo.State == model.StateNormal {
// todo : forward to changefeed owner if has changefeed owner
processorInfos, err := h.capture.StatusProvider().GetAllTaskStatuses(
ctx,
changefeedID,
Expand Down Expand Up @@ -559,14 +597,34 @@ func (h *OpenAPIV2) deleteChangefeed(c *gin.Context) {
changefeedID.ID))
return
}
_, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID)
o, err := h.capture.GetOwner()
if err != nil {
if cerror.ErrChangeFeedNotExists.Equal(err) {
c.JSON(http.StatusOK, &EmptyResponse{})
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("can not get owner: %s",
changefeedID.ID))
return
}
if !o.HasChangefeed(changefeedID) {
if !h.capture.IsOwner() {
// forward to global owner
api.ForwardToOwner(c, h.capture)
c.Abort()
return
} else {
// forward to changefeed owner
ow, err := h.capture.StatusProvider().GetChangefeedOwner(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}
if ow == nil {
// changefeed not assigned to any capture
// todo : global owner delete that changefeed directly
}

api.ForwardToChangefeedOwner(c, ow.AdvertiseAddr)
c.Abort()
return
}
_ = c.Error(err)
return
}

job := model.AdminJob{
Expand Down Expand Up @@ -677,10 +735,24 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
return
}

_, err = h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
o, err := h.capture.GetOwner()
if !o.HasChangefeed(changefeedID) {
if !h.capture.IsOwner() {
// forward to global owner
api.ForwardToOwner(c, h.capture)
c.Abort()
return
} else {
// forward to changefeed owner
ow, err := h.capture.StatusProvider().GetChangefeedOwner(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}
api.ForwardToChangefeedOwner(c, ow.AdvertiseAddr)
c.Abort()
return
}
}

cfg := new(ResumeChangefeedConfig)
Expand Down Expand Up @@ -772,13 +844,29 @@ func (h *OpenAPIV2) pauseChangefeed(c *gin.Context) {
changefeedID.ID))
return
}
// check if the changefeed exists
_, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID)
o, err := h.capture.GetOwner()
if err != nil {
_ = c.Error(err)
return
}

if !o.HasChangefeed(changefeedID) {
if !h.capture.IsOwner() {
// forward to global owner
api.ForwardToOwner(c, h.capture)
c.Abort()
return
} else {
// forward to changefeed owner
ow, err := h.capture.StatusProvider().GetChangefeedOwner(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}
api.ForwardToChangefeedOwner(c, ow.AdvertiseAddr)
c.Abort()
return
}
}
job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminStop,
Expand Down
13 changes: 9 additions & 4 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ type ReplicaConfig struct {
Consistent *ConsistentConfig `json:"consistent,omitempty"`
Scheduler *ChangefeedSchedulerConfig `json:"scheduler"`
Integrity *IntegrityConfig `json:"integrity"`

NodeSelector map[string]string `json:"node-selector"`
}

// ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig
Expand Down Expand Up @@ -395,6 +397,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
CorruptionHandleLevel: c.Integrity.CorruptionHandleLevel,
}
}
res.NodeSelector = c.NodeSelector
return res
}

Expand Down Expand Up @@ -620,6 +623,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
}
}

res.NodeSelector = cloned.NodeSelector
return res
}

Expand Down Expand Up @@ -911,10 +915,11 @@ type ServerStatus struct {

// Capture holds common information of a capture in cdc
type Capture struct {
ID string `json:"id"`
IsOwner bool `json:"is_owner"`
AdvertiseAddr string `json:"address"`
ClusterID string `json:"cluster_id"`
ID string `json:"id"`
IsOwner bool `json:"is_owner"`
AdvertiseAddr string `json:"address"`
ClusterID string `json:"cluster_id"`
Labels map[string]string `json:"labels"`
}

// CodecConfig represents a MQ codec configuration
Expand Down
39 changes: 39 additions & 0 deletions cdc/api/v2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package v2

import (
"context"
"fmt"
"net/http"

"github.com/gin-gonic/gin"
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
)
Expand Down Expand Up @@ -47,6 +49,11 @@ func (h *OpenAPIV2) getProcessor(c *gin.Context) {
return
}

err, done := h.try2ForwardToChangefeedOwner(c, changefeedID, ctx)
if done {
return
}

captureID := c.Param(apiOpVarCaptureID)
if err := model.ValidateChangefeedID(captureID); err != nil {
_ = c.Error(
Expand Down Expand Up @@ -120,6 +127,34 @@ func (h *OpenAPIV2) getProcessor(c *gin.Context) {
c.JSON(http.StatusOK, &processorDetail)
}

func (h *OpenAPIV2) try2ForwardToChangefeedOwner(c *gin.Context, changefeedID model.ChangeFeedID, ctx context.Context) (error, bool) {
o, err := h.capture.GetOwner()
if err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("can not get owner: %s",
changefeedID.ID))
return nil, true
}
if !o.HasChangefeed(changefeedID) {
if !h.capture.IsOwner() {
// forward to global owner
api.ForwardToOwner(c, h.capture)
c.Abort()
return nil, true
} else {
// forward to changefeed owner
ow, err := h.capture.StatusProvider().GetChangefeedOwner(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return nil, true
}
api.ForwardToChangefeedOwner(c, ow.AdvertiseAddr)
c.Abort()
return nil, true
}
}
return err, false
}

// listProcessors lists all processors in the TiCDC cluster
// @Summary List processors
// @Description list all processors in the TiCDC cluster
Expand All @@ -129,6 +164,10 @@ func (h *OpenAPIV2) getProcessor(c *gin.Context) {
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/processors [get]
func (h *OpenAPIV2) listProcessors(c *gin.Context) {
if !h.capture.IsOwner() {
api.ForwardToOwner(c, h.capture)
return
}
ctx := c.Request.Context()
infos, err := h.capture.StatusProvider().GetProcessors(ctx)
if err != nil {
Expand Down
Loading