Skip to content

Commit

Permalink
Merge branch 'master' into pre-check-return-warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
okJiang authored Jan 12, 2022
2 parents b64d32a + ac3d76d commit ce09d4c
Show file tree
Hide file tree
Showing 49 changed files with 1,531 additions and 1,061 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ check-static: tools/bin/golangci-lint
tools/bin/golangci-lint run --timeout 10m0s --skip-files kv_gen --skip-dirs dm,tests
cd dm && ../tools/bin/golangci-lint run --timeout 10m0s

check: check-copyright fmt check-static tidy terror_check errdoc check-leaktest-added check-merge-conflicts
check: check-copyright fmt check-static tidy terror_check errdoc check-leaktest-added check-merge-conflicts swagger-spec

integration_test_coverage: tools/bin/gocovmerge tools/bin/goveralls
tools/bin/gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/entry/schema_test_helper.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
Expand All @@ -231,6 +231,9 @@ unit_test_coverage:
data-flow-diagram: docs/data-flow.dot
dot -Tsvg docs/data-flow.dot > docs/data-flow.svg

swagger-spec: tools/bin/swag
tools/bin/swag init --parseVendor -generalInfo cdc/api/open.go --output docs/swagger

clean:
go clean -i ./...
rm -rf *.out
Expand Down Expand Up @@ -413,6 +416,9 @@ tools/bin/gotestsum: tools/check/go.mod
tools/bin/errdoc-gen: tools/check/go.mod
cd tools/check && $(GO) build -mod=mod -o ../bin/errdoc-gen github.com/pingcap/errors/errdoc-gen

tools/bin/swag: tools/check/go.mod
cd tools/check && $(GO) build -mod=mod -o ../bin/swag github.com/swaggo/swag/cmd/swag

check_failpoint_ctl: tools/bin/failpoint-ctl

failpoint-enable: check_failpoint_ctl
Expand Down
2 changes: 1 addition & 1 deletion cdc/capture/main_test.go → cdc/api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package capture
package api

import (
"testing"
Expand Down
105 changes: 67 additions & 38 deletions cdc/capture/http_handler.go → cdc/api/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package capture
package api

import (
"bufio"
Expand All @@ -21,6 +21,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/pkg/config"
Expand All @@ -45,16 +46,44 @@ const (
getOwnerRetryMaxTime = 3
)

// HTTPHandler is a HTTPHandler of capture
type HTTPHandler struct {
capture *Capture
// openAPI provides capture APIs.
type openAPI struct {
capture *capture.Capture
}

// NewHTTPHandler return a HTTPHandler for OpenAPI
func NewHTTPHandler(capture *Capture) HTTPHandler {
return HTTPHandler{
capture: capture,
}
// RegisterOpoenAPIRoutes registers routes for OpenAPI
func RegisterOpoenAPIRoutes(router *gin.Engine, capture *capture.Capture) {
openAPI := openAPI{capture: capture}

// common API
router.GET("/api/v1/status", openAPI.ServerStatus)
router.GET("/api/v1/health", openAPI.Health)
router.POST("/api/v1/log", SetLogLevel)

// changefeed API
changefeedGroup := router.Group("/api/v1/changefeeds")
changefeedGroup.GET("", openAPI.ListChangefeed)
changefeedGroup.GET("/:changefeed_id", openAPI.GetChangefeed)
changefeedGroup.POST("", openAPI.CreateChangefeed)
changefeedGroup.PUT("/:changefeed_id", openAPI.UpdateChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", openAPI.PauseChangefeed)
changefeedGroup.POST("/:changefeed_id/resume", openAPI.ResumeChangefeed)
changefeedGroup.DELETE("/:changefeed_id", openAPI.RemoveChangefeed)
changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", openAPI.RebalanceTable)
changefeedGroup.POST("/:changefeed_id/tables/move_table", openAPI.MoveTable)

// owner API
ownerGroup := router.Group("/api/v1/owner")
ownerGroup.POST("/resign", openAPI.ResignOwner)

// processor API
processorGroup := router.Group("/api/v1/processors")
processorGroup.GET("", openAPI.ListProcessor)
processorGroup.GET("/:changefeed_id/:capture_id", openAPI.GetProcessor)

// capture API
captureGroup := router.Group("/api/v1/captures")
captureGroup.GET("", openAPI.ListCapture)
}

// ListChangefeed lists all changgefeeds in cdc cluster
Expand All @@ -67,12 +96,12 @@ func NewHTTPHandler(capture *Capture) HTTPHandler {
// @Success 200 {array} model.ChangefeedCommonInfo
// @Failure 500 {object} model.HTTPError
// @Router /api/v1/changefeeds [get]
func (h *HTTPHandler) ListChangefeed(c *gin.Context) {
func (h *openAPI) ListChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()
statusProvider := h.capture.StatusProvider()
ctx := c.Request.Context()
state := c.Query(apiOpVarChangefeedState)
// get all changefeed status
Expand Down Expand Up @@ -132,12 +161,12 @@ func (h *HTTPHandler) ListChangefeed(c *gin.Context) {
// @Success 200 {object} model.ChangefeedDetail
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id} [get]
func (h *HTTPHandler) GetChangefeed(c *gin.Context) {
func (h *openAPI) GetChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
Expand Down Expand Up @@ -200,7 +229,7 @@ func (h *HTTPHandler) GetChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds [post]
func (h *HTTPHandler) CreateChangefeed(c *gin.Context) {
func (h *openAPI) CreateChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand All @@ -225,7 +254,7 @@ func (h *HTTPHandler) CreateChangefeed(c *gin.Context) {
return
}

err = h.capture.etcdClient.CreateChangefeedInfo(ctx, info, changefeedConfig.ID)
err = h.capture.EtcdClient.CreateChangefeedInfo(ctx, info, changefeedConfig.ID)
if err != nil {
_ = c.Error(err)
return
Expand All @@ -245,12 +274,12 @@ func (h *HTTPHandler) CreateChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/pause [post]
func (h *HTTPHandler) PauseChangefeed(c *gin.Context) {
func (h *openAPI) PauseChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()
statusProvider := h.capture.StatusProvider()
ctx := c.Request.Context()

changefeedID := c.Param(apiOpVarChangefeedID)
Expand Down Expand Up @@ -288,12 +317,12 @@ func (h *HTTPHandler) PauseChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/resume [post]
func (h *HTTPHandler) ResumeChangefeed(c *gin.Context) {
func (h *openAPI) ResumeChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()
statusProvider := h.capture.StatusProvider()
ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
Expand Down Expand Up @@ -336,12 +365,12 @@ func (h *HTTPHandler) ResumeChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id} [put]
func (h *HTTPHandler) UpdateChangefeed(c *gin.Context) {
func (h *openAPI) UpdateChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()
statusProvider := h.capture.StatusProvider()
ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)

Expand Down Expand Up @@ -373,7 +402,7 @@ func (h *HTTPHandler) UpdateChangefeed(c *gin.Context) {
return
}

err = h.capture.etcdClient.SaveChangeFeedInfo(ctx, newInfo, changefeedID)
err = h.capture.EtcdClient.SaveChangeFeedInfo(ctx, newInfo, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand All @@ -392,12 +421,12 @@ func (h *HTTPHandler) UpdateChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id} [delete]
func (h *HTTPHandler) RemoveChangefeed(c *gin.Context) {
func (h *openAPI) RemoveChangefeed(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()
statusProvider := h.capture.StatusProvider()
ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
Expand Down Expand Up @@ -434,12 +463,12 @@ func (h *HTTPHandler) RemoveChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/tables/rebalance_table [post]
func (h *HTTPHandler) RebalanceTable(c *gin.Context) {
func (h *openAPI) RebalanceTable(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()
statusProvider := h.capture.StatusProvider()
ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)

Expand Down Expand Up @@ -474,12 +503,12 @@ func (h *HTTPHandler) RebalanceTable(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/tables/move_table [post]
func (h *HTTPHandler) MoveTable(c *gin.Context) {
func (h *openAPI) MoveTable(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()
statusProvider := h.capture.StatusProvider()
ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
Expand Down Expand Up @@ -525,7 +554,7 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/owner/resign [post]
func (h *HTTPHandler) ResignOwner(c *gin.Context) {
func (h *openAPI) ResignOwner(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand All @@ -548,12 +577,12 @@ func (h *HTTPHandler) ResignOwner(c *gin.Context) {
// @Success 200 {object} model.ProcessorDetail
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/processors/{changefeed_id}/{capture_id} [get]
func (h *HTTPHandler) GetProcessor(c *gin.Context) {
func (h *openAPI) GetProcessor(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()

Expand Down Expand Up @@ -614,12 +643,12 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) {
// @Success 200 {array} model.ProcessorCommonInfo
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/processors [get]
func (h *HTTPHandler) ListProcessor(c *gin.Context) {
func (h *openAPI) ListProcessor(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
infos, err := statusProvider.GetProcessors(ctx)
Expand All @@ -644,12 +673,12 @@ func (h *HTTPHandler) ListProcessor(c *gin.Context) {
// @Success 200 {array} model.Capture
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/captures [get]
func (h *HTTPHandler) ListCapture(c *gin.Context) {
func (h *openAPI) ListCapture(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.owner.StatusProvider()
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
captureInfos, err := statusProvider.GetCaptures(ctx)
Expand Down Expand Up @@ -679,7 +708,7 @@ func (h *HTTPHandler) ListCapture(c *gin.Context) {
// @Success 200 {object} model.ServerStatus
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/status [get]
func (h *HTTPHandler) ServerStatus(c *gin.Context) {
func (h *openAPI) ServerStatus(c *gin.Context) {
status := model.ServerStatus{
Version: version.ReleaseVersion,
GitHash: version.GitHash,
Expand All @@ -699,7 +728,7 @@ func (h *HTTPHandler) ServerStatus(c *gin.Context) {
// @Success 200
// @Failure 500 {object} model.HTTPError
// @Router /api/v1/health [get]
func (h *HTTPHandler) Health(c *gin.Context) {
func (h *openAPI) Health(c *gin.Context) {
ctx := c.Request.Context()

if _, err := h.capture.GetOwner(ctx); err != nil {
Expand Down Expand Up @@ -740,7 +769,7 @@ func SetLogLevel(c *gin.Context) {
}

// forwardToOwner forward an request to owner
func (h *HTTPHandler) forwardToOwner(c *gin.Context) {
func (h *openAPI) forwardToOwner(c *gin.Context) {
ctx := c.Request.Context()
// every request can only forward to owner one time
if len(c.GetHeader(forWardFromCapture)) != 0 {
Expand Down
Loading

0 comments on commit ce09d4c

Please sign in to comment.