diff --git a/cdc/api/middleware.go b/cdc/api/middleware.go new file mode 100644 index 00000000000..0abe7c38cf0 --- /dev/null +++ b/cdc/api/middleware.go @@ -0,0 +1,72 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/zap" +) + +func logMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + path := c.Request.URL.Path + query := c.Request.URL.RawQuery + c.Next() + + cost := time.Since(start) + + err := c.Errors.Last() + var stdErr error + if err != nil { + stdErr = err.Err + } + + log.Info(path, + zap.Int("status", c.Writer.Status()), + zap.String("method", c.Request.Method), + zap.String("path", path), + zap.String("query", query), + zap.String("ip", c.ClientIP()), + zap.String("user-agent", c.Request.UserAgent()), + zap.Error(stdErr), + zap.Duration("duration", cost), + ) + } +} + +func errorHandleMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + c.Next() + // because we will return immediately after an error occurs in http_handler + // there wil be only one error in c.Errors + lastError := c.Errors.Last() + if lastError != nil { + err := lastError.Err + // put the error into response + if IsHTTPBadRequestError(err) { + c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + } else { + c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + } + c.Abort() + return + } + } +} diff --git a/cdc/api/open.go b/cdc/api/open.go index 80041ffeca6..14f0b29c403 100644 --- a/cdc/api/open.go +++ b/cdc/api/open.go @@ -49,41 +49,62 @@ const ( // openAPI provides capture APIs. type openAPI struct { capture *capture.Capture + // use for unit test only + testStatusProvider owner.StatusProvider } -// RegisterOpoenAPIRoutes registers routes for OpenAPI -func RegisterOpoenAPIRoutes(router *gin.Engine, capture *capture.Capture) { - openAPI := openAPI{capture: capture} +func NewOpenAPI(c *capture.Capture) openAPI { + return openAPI{capture: c} +} + +// NewOpenAPI4Test return a openAPI for test +func NewOpenAPI4Test(c *capture.Capture, p owner.StatusProvider) openAPI { + return openAPI{capture: c, testStatusProvider: p} +} + +func (h *openAPI) statusProvider() owner.StatusProvider { + if h.testStatusProvider != nil { + return h.testStatusProvider + } + return h.capture.StatusProvider() +} + +// RegisterOpenAPIRoutes registers routes for OpenAPI +func RegisterOpenAPIRoutes(router *gin.Engine, api openAPI) { + v1 := router.Group("/api/v1") + + v1.Use(logMiddleware()) + v1.Use(errorHandleMiddleware()) // common API - router.GET("/api/v1/status", openAPI.ServerStatus) - router.GET("/api/v1/health", openAPI.Health) - router.POST("/api/v1/log", SetLogLevel) + v1.GET("/status", api.ServerStatus) + v1.GET("/health", api.Health) + v1.POST("/log", SetLogLevel) // changefeed API - changefeedGroup := router.Group("/api/v1/changefeeds") - changefeedGroup.GET("", openAPI.ListChangefeed) - changefeedGroup.GET("/:changefeed_id", openAPI.GetChangefeed) - changefeedGroup.POST("", openAPI.CreateChangefeed) - changefeedGroup.PUT("/:changefeed_id", openAPI.UpdateChangefeed) - changefeedGroup.POST("/:changefeed_id/pause", openAPI.PauseChangefeed) - changefeedGroup.POST("/:changefeed_id/resume", openAPI.ResumeChangefeed) - changefeedGroup.DELETE("/:changefeed_id", openAPI.RemoveChangefeed) - changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", openAPI.RebalanceTable) - changefeedGroup.POST("/:changefeed_id/tables/move_table", openAPI.MoveTable) + changefeedGroup := v1.Group("/changefeeds") + changefeedGroup.GET("", api.ListChangefeed) + changefeedGroup.GET("/:changefeed_id", api.GetChangefeed) + changefeedGroup.POST("", api.CreateChangefeed) + changefeedGroup.PUT("/:changefeed_id", api.UpdateChangefeed) + changefeedGroup.POST("/:changefeed_id/pause", api.PauseChangefeed) + changefeedGroup.POST("/:changefeed_id/resume", api.ResumeChangefeed) + changefeedGroup.DELETE("/:changefeed_id", api.RemoveChangefeed) + changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTable) + changefeedGroup.POST("/:changefeed_id/tables/move_table", api.MoveTable) // owner API - ownerGroup := router.Group("/api/v1/owner") - ownerGroup.POST("/resign", openAPI.ResignOwner) + ownerGroup := v1.Group("/owner") + ownerGroup.POST("/resign", api.ResignOwner) // processor API - processorGroup := router.Group("/api/v1/processors") - processorGroup.GET("", openAPI.ListProcessor) - processorGroup.GET("/:changefeed_id/:capture_id", openAPI.GetProcessor) + processorGroup := v1.Group("/processors") + processorGroup.GET("", api.ListProcessor) + processorGroup.GET("/:changefeed_id/:capture_id", api.GetProcessor) // capture API - captureGroup := router.Group("/api/v1/captures") - captureGroup.GET("", openAPI.ListCapture) + captureGroup := v1.Group("/captures") + captureGroup.GET("", api.ListCapture) } // ListChangefeed lists all changgefeeds in cdc cluster @@ -101,17 +122,17 @@ func (h *openAPI) ListChangefeed(c *gin.Context) { h.forwardToOwner(c) return } - statusProvider := h.capture.StatusProvider() + ctx := c.Request.Context() state := c.Query(apiOpVarChangefeedState) // get all changefeed status - statuses, err := statusProvider.GetAllChangeFeedStatuses(ctx) + statuses, err := h.statusProvider().GetAllChangeFeedStatuses(ctx) if err != nil { _ = c.Error(err) return } // get all changefeed infos - infos, err := statusProvider.GetAllChangeFeedInfo(ctx) + infos, err := h.statusProvider().GetAllChangeFeedInfo(ctx) if err != nil { // this call will return a parsedError generated by the error we passed in // so it is no need to check the parsedError @@ -166,7 +187,6 @@ func (h *openAPI) GetChangefeed(c *gin.Context) { h.forwardToOwner(c) return } - statusProvider := h.capture.StatusProvider() ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) @@ -175,19 +195,19 @@ func (h *openAPI) GetChangefeed(c *gin.Context) { return } - info, err := statusProvider.GetChangeFeedInfo(ctx, changefeedID) + info, err := h.statusProvider().GetChangeFeedInfo(ctx, changefeedID) if err != nil { _ = c.Error(err) return } - status, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) + status, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID) if err != nil { _ = c.Error(err) return } - processorInfos, err := statusProvider.GetAllTaskStatuses(ctx, changefeedID) + processorInfos, err := h.statusProvider().GetAllTaskStatuses(ctx, changefeedID) if err != nil { _ = c.Error(err) return @@ -279,7 +299,7 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) { h.forwardToOwner(c) return } - statusProvider := h.capture.StatusProvider() + ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) @@ -288,7 +308,7 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) { return } // check if the changefeed exists - _, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) + _, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID) if err != nil { _ = c.Error(err) return @@ -322,7 +342,7 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) { h.forwardToOwner(c) return } - statusProvider := h.capture.StatusProvider() + ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { @@ -330,7 +350,7 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) { return } // check if the changefeed exists - _, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) + _, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID) if err != nil { _ = c.Error(err) return @@ -370,7 +390,7 @@ func (h *openAPI) UpdateChangefeed(c *gin.Context) { h.forwardToOwner(c) return } - statusProvider := h.capture.StatusProvider() + ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) @@ -378,7 +398,7 @@ func (h *openAPI) UpdateChangefeed(c *gin.Context) { _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } - info, err := statusProvider.GetChangeFeedInfo(ctx, changefeedID) + info, err := h.statusProvider().GetChangeFeedInfo(ctx, changefeedID) if err != nil { _ = c.Error(err) return @@ -426,7 +446,7 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) { h.forwardToOwner(c) return } - statusProvider := h.capture.StatusProvider() + ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { @@ -434,7 +454,7 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) { return } // check if the changefeed exists - _, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) + _, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID) if err != nil { _ = c.Error(err) return @@ -468,7 +488,7 @@ func (h *openAPI) RebalanceTable(c *gin.Context) { h.forwardToOwner(c) return } - statusProvider := h.capture.StatusProvider() + ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) @@ -477,7 +497,7 @@ func (h *openAPI) RebalanceTable(c *gin.Context) { return } // check if the changefeed exists - _, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) + _, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID) if err != nil { _ = c.Error(err) return @@ -508,7 +528,7 @@ func (h *openAPI) MoveTable(c *gin.Context) { h.forwardToOwner(c) return } - statusProvider := h.capture.StatusProvider() + ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { @@ -516,7 +536,7 @@ func (h *openAPI) MoveTable(c *gin.Context) { return } // check if the changefeed exists - _, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) + _, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID) if err != nil { _ = c.Error(err) return @@ -582,7 +602,6 @@ func (h *openAPI) GetProcessor(c *gin.Context) { h.forwardToOwner(c) return } - statusProvider := h.capture.StatusProvider() ctx := c.Request.Context() @@ -598,7 +617,7 @@ func (h *openAPI) GetProcessor(c *gin.Context) { return } - statuses, err := statusProvider.GetAllTaskStatuses(ctx, changefeedID) + statuses, err := h.statusProvider().GetAllTaskStatuses(ctx, changefeedID) if err != nil { _ = c.Error(err) return @@ -609,7 +628,7 @@ func (h *openAPI) GetProcessor(c *gin.Context) { return } - positions, err := statusProvider.GetTaskPositions(ctx, changefeedID) + positions, err := h.statusProvider().GetTaskPositions(ctx, changefeedID) if err != nil { _ = c.Error(err) return @@ -648,10 +667,9 @@ func (h *openAPI) ListProcessor(c *gin.Context) { h.forwardToOwner(c) return } - statusProvider := h.capture.StatusProvider() ctx := c.Request.Context() - infos, err := statusProvider.GetProcessors(ctx) + infos, err := h.statusProvider().GetProcessors(ctx) if err != nil { _ = c.Error(err) return @@ -678,10 +696,9 @@ func (h *openAPI) ListCapture(c *gin.Context) { h.forwardToOwner(c) return } - statusProvider := h.capture.StatusProvider() ctx := c.Request.Context() - captureInfos, err := statusProvider.GetCaptures(ctx) + captureInfos, err := h.statusProvider().GetCaptures(ctx) if err != nil { _ = c.Error(err) return diff --git a/cdc/api/open_test.go b/cdc/api/open_test.go new file mode 100644 index 00000000000..48a1fb53d4d --- /dev/null +++ b/cdc/api/open_test.go @@ -0,0 +1,450 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/capture" + "github.com/pingcap/tiflow/cdc/model" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +const ( + changeFeedID = "test-changeFeed" + captureID = "test-capture" + nonExistChangefeedID = "non-exist-changefeed" +) + +type mockStatusProvider struct { + mock.Mock +} + +type testCase struct { + url string + method string +} + +func (p *mockStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedStatus, error) { + args := p.Called(ctx) + return args.Get(0).(map[model.ChangeFeedID]*model.ChangeFeedStatus), args.Error(1) +} + +func (p *mockStatusProvider) GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedStatus, error) { + args := p.Called(ctx, changefeedID) + log.Info("err", zap.Error(args.Error(1))) + return args.Get(0).(*model.ChangeFeedStatus), args.Error(1) +} + +func (p *mockStatusProvider) GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error) { + args := p.Called(ctx) + return args.Get(0).(map[model.ChangeFeedID]*model.ChangeFeedInfo), args.Error(1) +} + +func (p *mockStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedInfo, error) { + args := p.Called(ctx) + return args.Get(0).(*model.ChangeFeedInfo), args.Error(1) +} + +func (p *mockStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskStatus, error) { + args := p.Called(ctx) + return args.Get(0).(map[model.CaptureID]*model.TaskStatus), args.Error(1) +} + +func (p *mockStatusProvider) GetTaskPositions(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskPosition, error) { + args := p.Called(ctx) + return args.Get(0).(map[model.CaptureID]*model.TaskPosition), args.Error(1) +} + +func (p *mockStatusProvider) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) { + args := p.Called(ctx) + return args.Get(0).([]*model.ProcInfoSnap), args.Error(1) +} + +func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) { + args := p.Called(ctx) + return args.Get(0).([]*model.CaptureInfo), args.Error(1) +} + +func newRouter(p *mockStatusProvider) *gin.Engine { + c := capture.NewCapture4Test(true) + router := gin.New() + RegisterOpenAPIRoutes(router, NewOpenAPI4Test(c, p)) + return router +} + +func newStatusProvider() *mockStatusProvider { + statusProvider := &mockStatusProvider{} + statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID). + Return(&model.ChangeFeedStatus{CheckpointTs: 1}, nil) + + statusProvider.On("GetChangeFeedStatus", mock.Anything, nonExistChangefeedID). + Return(new(model.ChangeFeedStatus), + cerror.ErrChangeFeedNotExists.GenWithStackByArgs(nonExistChangefeedID)) + + statusProvider.On("GetAllTaskStatuses", mock.Anything). + Return(map[model.CaptureID]*model.TaskStatus{captureID: {}}, nil) + + statusProvider.On("GetTaskPositions", mock.Anything). + Return(map[model.CaptureID]*model.TaskPosition{captureID: {Error: &model.RunningError{Message: "test"}}}, nil) + + statusProvider.On("GetAllChangeFeedStatuses", mock.Anything). + Return(map[model.ChangeFeedID]*model.ChangeFeedStatus{ + changeFeedID + "1": {CheckpointTs: 1}, + changeFeedID + "2": {CheckpointTs: 2}, + }, nil) + + statusProvider.On("GetAllChangeFeedInfo", mock.Anything). + Return(map[model.ChangeFeedID]*model.ChangeFeedInfo{ + changeFeedID + "1": {State: model.StateNormal}, + changeFeedID + "2": {State: model.StateStopped}, + }, nil) + + statusProvider.On("GetAllTaskStatuses", mock.Anything). + Return(map[model.CaptureID]*model.TaskStatus{captureID: {}}, nil) + + statusProvider.On("GetChangeFeedInfo", mock.Anything). + Return(&model.ChangeFeedInfo{State: model.StateNormal}, nil) + + statusProvider.On("GetProcessors", mock.Anything). + Return([]*model.ProcInfoSnap{{CfID: changeFeedID, CaptureID: captureID}}, nil) + + statusProvider.On("GetCaptures", mock.Anything). + Return([]*model.CaptureInfo{{ID: captureID}}, nil) + + return statusProvider +} + +func TestListChangefeed(t *testing.T) { + t.Parallel() + router := newRouter(newStatusProvider()) + + // test list changefeed succeeded + api := testCase{url: "/api/v1/changefeeds", method: "GET"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 200, w.Code) + var resp []model.ChangefeedCommonInfo + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, 2, len(resp)) + + // test list changefeed with specific state + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds?state=%s", "stopped"), method: "GET"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 200, w.Code) + resp = []model.ChangefeedCommonInfo{} + err = json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, 1, len(resp)) + require.Equal(t, model.StateStopped, resp[0].FeedState) + require.Equal(t, uint64(0x2), resp[0].CheckpointTSO) +} + +func TestGetChangefeed(t *testing.T) { + t.Parallel() + router := newRouter(newStatusProvider()) + + // test get changefeed succeeded + api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "GET"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 200, w.Code) + var resp model.ChangefeedDetail + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, model.StateNormal, resp.FeedState) + + // test get changefeed failed + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID), method: "GET"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr := model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") +} + +func TestPauseChangefeed(t *testing.T) { + t.Parallel() + router := newRouter(newStatusProvider()) + // test pause changefeed succeeded + api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID), method: "POST"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 202, w.Code) + + // test pause changefeed failed + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID), method: "POST"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr := model.HTTPError{} + err := json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") +} + +func TestResumeChangefeed(t *testing.T) { + t.Parallel() + router := newRouter(newStatusProvider()) + // test resume changefeed succeeded + api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID), method: "POST"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 202, w.Code) + + // test resume changefeed failed + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID), method: "POST"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr := model.HTTPError{} + err := json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") +} + +func TestRemoveChangefeed(t *testing.T) { + t.Parallel() + router := newRouter(newStatusProvider()) + // test remove changefeed succeeded + api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "DELETE"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 202, w.Code) + + // test remove changefeed failed + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID), method: "DELETE"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr := model.HTTPError{} + err := json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") +} + +func TestRebalanceTable(t *testing.T) { + t.Parallel() + router := newRouter(newStatusProvider()) + // test rebalance table succeeded + api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID), method: "POST"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 202, w.Code) + + // test rebalance table failed + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", nonExistChangefeedID), method: "POST"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr := model.HTTPError{} + err := json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") +} + +func TestMoveTable(t *testing.T) { + t.Parallel() + + data := struct { + CaptureID string `json:"capture_id"` + TableID int64 `json:"table_id"` + }{captureID, 1} + b, err := json.Marshal(&data) + require.Nil(t, err) + body := bytes.NewReader(b) + + router := newRouter(newStatusProvider()) + // test move table succeeded + api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), method: "POST"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, body) + router.ServeHTTP(w, req) + require.Equal(t, 202, w.Code) + + // test move table failed + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID), method: "POST"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, body) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr := model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") +} + +func TestResignOwner(t *testing.T) { + t.Parallel() + router := newRouter(newStatusProvider()) + // test resign owner succeeded + api := testCase{url: "/api/v1/owner/resign", method: "POST"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 202, w.Code) +} + +func TestGetProcessor(t *testing.T) { + t.Parallel() + router := newRouter(newStatusProvider()) + // test get processor succeeded + api := testCase{url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, captureID), method: "GET"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 200, w.Code) + processorDetail := &model.ProcessorDetail{} + err := json.NewDecoder(w.Body).Decode(processorDetail) + require.Nil(t, err) + require.Equal(t, "test", processorDetail.Error.Message) + + // test get processor fail due to capture ID error + api = testCase{url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, "non-exist-capture"), method: "GET"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + httpError := &model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(httpError) + require.Nil(t, err) + require.Contains(t, httpError.Error, "capture not exists, key: non-exist-capture") +} + +func TestListProcessor(t *testing.T) { + t.Parallel() + router := newRouter(newStatusProvider()) + // test list processor succeeded + api := testCase{url: "/api/v1/processors", method: "GET"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 200, w.Code) + var resp []model.ProcessorCommonInfo + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, changeFeedID, resp[0].CfID) +} + +func TestListCapture(t *testing.T) { + t.Parallel() + router := newRouter(newStatusProvider()) + // test list processor succeeded + api := testCase{url: "/api/v1/captures", method: "GET"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 200, w.Code) + var resp []model.Capture + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, captureID, resp[0].ID) +} + +func TestServerStatus(t *testing.T) { + t.Parallel() + // capture is owner + ownerRouter := newRouter(newStatusProvider()) + api := testCase{url: "/api/v1/status", method: "GET"} + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + ownerRouter.ServeHTTP(w, req) + require.Equal(t, 200, w.Code) + var resp model.ServerStatus + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, "capture-for-test", resp.ID) + require.True(t, resp.IsOwner) + + // capture is not owner + c := capture.NewCapture4Test(false) + r := gin.New() + RegisterOpenAPIRoutes(r, NewOpenAPI4Test(c, nil)) + api = testCase{url: "/api/v1/status", method: "GET"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + r.ServeHTTP(w, req) + require.Equal(t, 200, w.Code) + resp = model.ServerStatus{} + err = json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.False(t, resp.IsOwner) +} + +func TestSetLogLevel(t *testing.T) { + t.Parallel() + + // test set log level succeeded + data := struct { + Level string `json:"log_level"` + }{"warn"} + router := newRouter(newStatusProvider()) + api := testCase{url: "/api/v1/log", method: "POST"} + w := httptest.NewRecorder() + b, err := json.Marshal(&data) + require.Nil(t, err) + body := bytes.NewReader(b) + req, _ := http.NewRequest(api.method, api.url, body) + router.ServeHTTP(w, req) + require.Equal(t, 200, w.Code) + + // test set log level failed + data = struct { + Level string `json:"log_level"` + }{"foo"} + api = testCase{url: "/api/v1/log", method: "POST"} + w = httptest.NewRecorder() + b, err = json.Marshal(&data) + require.Nil(t, err) + body = bytes.NewReader(b) + req, _ = http.NewRequest(api.method, api.url, body) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + httpError := &model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(httpError) + require.Nil(t, err) + require.Contains(t, httpError.Error, "fail to change log level: foo") +} + +// TODO: finished these test cases after we decouple those APIs from etcdClient. +func TestCreateChangefeed(t *testing.T) {} +func TestUpdateChangefeed(t *testing.T) {} +func TestHealth(t *testing.T) {} diff --git a/cdc/api/owner.go b/cdc/api/owner.go index d6f0726cdad..a8b565c4a1a 100644 --- a/cdc/api/owner.go +++ b/cdc/api/owner.go @@ -82,11 +82,16 @@ type ownerAPI struct { // RegisterOwnerAPIRoutes registers routes for owner APIs. func RegisterOwnerAPIRoutes(router *gin.Engine, capture *capture.Capture) { ownerAPI := ownerAPI{capture: capture} - router.POST("/capture/owner/resign", gin.WrapF(ownerAPI.handleResignOwner)) - router.POST("/capture/owner/admin", gin.WrapF(ownerAPI.handleChangefeedAdmin)) - router.POST("/capture/owner/rebalance_trigger", gin.WrapF(ownerAPI.handleRebalanceTrigger)) - router.POST("/capture/owner/move_table", gin.WrapF(ownerAPI.handleMoveTable)) - router.POST("/capture/owner/changefeed/query", gin.WrapF(ownerAPI.handleChangefeedQuery)) + owner := router.Group("/capture/owner") + + owner.Use(errorHandleMiddleware()) + owner.Use(logMiddleware()) + + owner.POST("/resign", gin.WrapF(ownerAPI.handleResignOwner)) + owner.POST("/admin", gin.WrapF(ownerAPI.handleChangefeedAdmin)) + owner.POST("/rebalance_trigger", gin.WrapF(ownerAPI.handleRebalanceTrigger)) + owner.POST("/move_table", gin.WrapF(ownerAPI.handleMoveTable)) + owner.POST("/changefeed/query", gin.WrapF(ownerAPI.handleChangefeedQuery)) } func handleOwnerResp(w http.ResponseWriter, err error) { @@ -269,7 +274,7 @@ func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Reques writeData(w, resp) } -func handleAdminLogLevel(w http.ResponseWriter, r *http.Request) { +func HandleAdminLogLevel(w http.ResponseWriter, r *http.Request) { var level string data, err := io.ReadAll(r.Body) r.Body.Close() diff --git a/cdc/api/owner_test.go b/cdc/api/owner_test.go index 7c19890d191..bd09d0ec7cc 100644 --- a/cdc/api/owner_test.go +++ b/cdc/api/owner_test.go @@ -14,124 +14,64 @@ package api import ( - "bytes" "fmt" "io" "net/http" "net/http/httptest" "net/url" + "testing" "github.com/gin-gonic/gin" - "github.com/pingcap/check" - "github.com/pingcap/failpoint" - "github.com/pingcap/tiflow/cdc/capture" - "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/clientv3/concurrency" ) -type httpStatusSuite struct{} - -var _ = check.Suite(&httpStatusSuite{}) - -func (s *httpStatusSuite) TestHTTPStatus(c *check.C) { - defer testleak.AfterTest(c)() - +func TestHTTPStatus(t *testing.T) { + t.Parallel() router := gin.New() - RegisterRoutes(router, capture.NewCapture4Test(), nil) - + RegisterOwnerAPIRoutes(router, nil) ts := httptest.NewServer(router) defer ts.Close() addr := ts.URL - testPprof(c, addr) - testReisgnOwner(c, addr) - testHandleChangefeedAdmin(c, addr) - testHandleRebalance(c, addr) - testHandleMoveTable(c, addr) - testHandleChangefeedQuery(c, addr) - testHandleFailpoint(c, addr) + testReisgnOwner(t, addr) + testHandleChangefeedAdmin(t, addr) + testHandleRebalance(t, addr) + testHandleMoveTable(t, addr) + testHandleChangefeedQuery(t, addr) } -func testPprof(c *check.C, addr string) { - testValidPprof := func(uri string) { - resp, err := http.Get(uri) - c.Assert(err, check.IsNil) - defer resp.Body.Close() - c.Assert(resp.StatusCode, check.Equals, 200) - _, err = io.ReadAll(resp.Body) - c.Assert(err, check.IsNil) - } - testValidPprof(fmt.Sprintf("%s/debug/pprof", addr)) - testValidPprof(fmt.Sprintf("%s/debug/pprof/cmdline", addr)) - testValidPprof(fmt.Sprintf("%s/debug/pprof/mutex", addr)) - testValidPprof(fmt.Sprintf("%s/debug/pprof/heap?debug=1", addr)) -} - -func testReisgnOwner(c *check.C, addr string) { +func testReisgnOwner(t *testing.T, addr string) { uri := fmt.Sprintf("%s/capture/owner/resign", addr) - testRequestNonOwnerFailed(c, uri) + testRequestNonOwnerFailed(t, uri) } -func testHandleChangefeedAdmin(c *check.C, addr string) { +func testHandleChangefeedAdmin(t *testing.T, addr string) { uri := fmt.Sprintf("%s/capture/owner/admin", addr) - testRequestNonOwnerFailed(c, uri) + testRequestNonOwnerFailed(t, uri) } -func testHandleRebalance(c *check.C, addr string) { +func testHandleRebalance(t *testing.T, addr string) { uri := fmt.Sprintf("%s/capture/owner/rebalance_trigger", addr) - testRequestNonOwnerFailed(c, uri) + testRequestNonOwnerFailed(t, uri) } -func testHandleMoveTable(c *check.C, addr string) { +func testHandleMoveTable(t *testing.T, addr string) { uri := fmt.Sprintf("%s/capture/owner/move_table", addr) - testRequestNonOwnerFailed(c, uri) + testRequestNonOwnerFailed(t, uri) } -func testHandleChangefeedQuery(c *check.C, addr string) { +func testHandleChangefeedQuery(t *testing.T, addr string) { uri := fmt.Sprintf("%s/capture/owner/changefeed/query", addr) - testRequestNonOwnerFailed(c, uri) + testRequestNonOwnerFailed(t, uri) } -func testRequestNonOwnerFailed(c *check.C, uri string) { +func testRequestNonOwnerFailed(t *testing.T, uri string) { resp, err := http.PostForm(uri, url.Values{}) - c.Assert(err, check.IsNil) + require.Nil(t, err) data, err := io.ReadAll(resp.Body) - c.Assert(err, check.IsNil) - defer resp.Body.Close() - c.Assert(resp.StatusCode, check.Equals, http.StatusBadRequest) - c.Assert(string(data), check.Equals, concurrency.ErrElectionNotLeader.Error()) -} - -func testHandleFailpoint(c *check.C, addr string) { - fp := "github.com/pingcap/tiflow/cdc/TestHandleFailpoint" - uri := fmt.Sprintf("%s/debug/fail/%s", addr, fp) - body := bytes.NewReader([]byte("return(true)")) - req, err := http.NewRequest("PUT", uri, body) - c.Assert(err, check.IsNil) - - resp, err := http.DefaultClient.Do(req) - c.Assert(err, check.IsNil) + require.Nil(t, err) defer resp.Body.Close() - c.Assert(resp.StatusCode, check.GreaterEqual, 200) - c.Assert(resp.StatusCode, check.Less, 300) - - failpointHit := false - failpoint.Inject("TestHandleFailpoint", func() { - failpointHit = true - }) - c.Assert(failpointHit, check.IsTrue) - - req, err = http.NewRequest("DELETE", uri, body) - c.Assert(err, check.IsNil) - resp, err = http.DefaultClient.Do(req) - c.Assert(err, check.IsNil) - defer resp.Body.Close() - c.Assert(resp.StatusCode, check.GreaterEqual, 200) - c.Assert(resp.StatusCode, check.Less, 300) - - failpointHit = false - failpoint.Inject("TestHandleFailpoint", func() { - failpointHit = true - }) - c.Assert(failpointHit, check.IsFalse) + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + require.Equal(t, concurrency.ErrElectionNotLeader.Error(), string(data)) } diff --git a/cdc/api/router.go b/cdc/api/router.go deleted file mode 100644 index c12e5d356bf..00000000000 --- a/cdc/api/router.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import ( - "net/http" - "net/http/pprof" - - "github.com/gin-gonic/gin" - "github.com/pingcap/failpoint" - "github.com/pingcap/tiflow/cdc/capture" - "github.com/pingcap/tiflow/pkg/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - swaggerFiles "github.com/swaggo/files" - ginSwagger "github.com/swaggo/gin-swagger" - - // use for OpenAPI online docs - _ "github.com/pingcap/tiflow/docs/swagger" -) - -// RegisterRoutes create a router for OpenAPI -func RegisterRoutes( - router *gin.Engine, - capture *capture.Capture, - registry prometheus.Gatherer, -) { - // online docs - router.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) - - // Open API - RegisterOpoenAPIRoutes(router, capture) - - // Owner API - RegisterOwnerAPIRoutes(router, capture) - - // Status API - RegisterStatusAPIRoutes(router, capture) - - // Log API - router.POST("/admin/log", gin.WrapF(handleAdminLogLevel)) - - // pprof debug API - pprofGroup := router.Group("/debug/pprof/") - pprofGroup.GET("", gin.WrapF(pprof.Index)) - pprofGroup.GET("/:any", gin.WrapF(pprof.Index)) - pprofGroup.GET("/cmdline", gin.WrapF(pprof.Cmdline)) - pprofGroup.GET("/profile", gin.WrapF(pprof.Profile)) - pprofGroup.GET("/symbol", gin.WrapF(pprof.Symbol)) - pprofGroup.GET("/trace", gin.WrapF(pprof.Trace)) - pprofGroup.GET("/threadcreate", gin.WrapF(pprof.Handler("threadcreate").ServeHTTP)) - - // Failpoint API - if util.FailpointBuild { - // `http.StripPrefix` is needed because `failpoint.HttpHandler` assumes that it handles the prefix `/`. - router.Any("/debug/fail/*any", gin.WrapH(http.StripPrefix("/debug/fail", &failpoint.HttpHandler{}))) - } - - // Promtheus metrics API - prometheus.DefaultGatherer = registry - router.Any("/metrics", gin.WrapH(promhttp.Handler())) -} diff --git a/cdc/api/router_test.go b/cdc/api/router_test.go deleted file mode 100644 index 76abe25f46b..00000000000 --- a/cdc/api/router_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import ( - "fmt" - "net/http" - "net/http/httptest" - "testing" - - "github.com/gin-gonic/gin" - "github.com/stretchr/testify/require" -) - -func TestPProfPath(t *testing.T) { - t.Parallel() - - router := gin.New() - RegisterRoutes(router, nil, nil) - - apis := []*testCase{ - {"/debug/pprof/", http.MethodGet}, - {"/debug/pprof/cmdline", http.MethodGet}, - {"/debug/pprof/symbol", http.MethodGet}, - // these two apis make will make ut slow - //{"/debug/pprof/profile", http.MethodGet}, - //{"/debug/pprof/trace", http.MethodGet}, - {"/debug/pprof/threadcreate", http.MethodGet}, - {"/debug/pprof/allocs", http.MethodGet}, - {"/debug/pprof/block", http.MethodGet}, - {"/debug/pprof/goroutine?debug=1", http.MethodGet}, - {"/debug/pprof/mutex?debug=1", http.MethodGet}, - } - for _, api := range apis { - w := httptest.NewRecorder() - req, _ := http.NewRequest(api.method, api.url, nil) - router.ServeHTTP(w, req) - require.Equal(t, 200, w.Code, api.String()) - } -} - -type testCase struct { - url string - method string -} - -func (a *testCase) String() string { - return fmt.Sprintf("%s:%s", a.method, a.url) -} diff --git a/cdc/api/util_test.go b/cdc/api/util_test.go index 75a2adbffe6..f9b8f36d5d5 100644 --- a/cdc/api/util_test.go +++ b/cdc/api/util_test.go @@ -22,6 +22,7 @@ import ( ) func TestIsHTTPBadRequestError(t *testing.T) { + t.Parallel() err := cerror.ErrAPIInvalidParam.GenWithStack("aa") require.Equal(t, true, IsHTTPBadRequestError(err)) err = cerror.ErrAPIInvalidParam.Wrap(errors.New("aa")) diff --git a/cdc/api/validator_test.go b/cdc/api/validator_test.go index f7d0dea59b1..967e7b11cdb 100644 --- a/cdc/api/validator_test.go +++ b/cdc/api/validator_test.go @@ -23,6 +23,7 @@ import ( ) func TestVerifyUpdateChangefeedConfig(t *testing.T) { + t.Parallel() ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 97b17835f2d..b2f8fae43aa 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -107,10 +107,14 @@ func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *etcd.C } } -func NewCapture4Test() *Capture { - return &Capture{ +func NewCapture4Test(isOwner bool) *Capture { + res := &Capture{ info: &model.CaptureInfo{ID: "capture-for-test", AdvertiseAddr: "127.0.0.1", Version: "test"}, } + if isOwner { + res.owner = &owner.Owner{} + } + return res } func (c *Capture) reset(ctx context.Context) error { diff --git a/cdc/cdc_test.go b/cdc/cdc_test.go deleted file mode 100644 index 9dd368511d5..00000000000 --- a/cdc/cdc_test.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package cdc - -import ( - "testing" - - "github.com/pingcap/check" -) - -func TestSuite(t *testing.T) { check.TestingT(t) } diff --git a/cdc/http.go b/cdc/http.go index df1faa2411d..acdda995acf 100644 --- a/cdc/http.go +++ b/cdc/http.go @@ -14,90 +14,61 @@ package cdc import ( - "context" "net/http" - "strings" - "time" + "net/http/pprof" "github.com/gin-gonic/gin" - "github.com/pingcap/log" + "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/api" - "github.com/pingcap/tiflow/cdc/model" - "go.uber.org/zap" -) + "github.com/pingcap/tiflow/cdc/capture" + "github.com/pingcap/tiflow/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + swaggerFiles "github.com/swaggo/files" + ginSwagger "github.com/swaggo/gin-swagger" -// timeoutMiddleware wraps the request context with a timeout -func timeoutMiddleware(timeout time.Duration) gin.HandlerFunc { - return func(c *gin.Context) { - // wrap the request context with a timeout - ctx, cancel := context.WithTimeout(c.Request.Context(), timeout) + // use for OpenAPI online docs + _ "github.com/pingcap/tiflow/docs/swagger" +) - defer func() { - // check if context timeout was reached - if ctx.Err() == context.DeadlineExceeded { +// RegisterRoutes create a router for OpenAPI +func RegisterRoutes( + router *gin.Engine, + capture *capture.Capture, + registry prometheus.Gatherer, +) { + // online docs + router.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) - // write response and abort the request - c.Writer.WriteHeader(http.StatusGatewayTimeout) - c.Abort() - } + // Open API + api.RegisterOpenAPIRoutes(router, api.NewOpenAPI(capture)) - // cancel to clear resources after finished - cancel() - }() + // Owner API + api.RegisterOwnerAPIRoutes(router, capture) - // replace request with context wrapped request - c.Request = c.Request.WithContext(ctx) - c.Next() - } -} + // Status API + api.RegisterStatusAPIRoutes(router, capture) -func logMiddleware() gin.HandlerFunc { - return func(c *gin.Context) { - start := time.Now() - path := c.Request.URL.Path - query := c.Request.URL.RawQuery - c.Next() + // Log API + router.POST("/admin/log", gin.WrapF(api.HandleAdminLogLevel)) - cost := time.Since(start) + // pprof debug API + pprofGroup := router.Group("/debug/pprof/") + pprofGroup.GET("", gin.WrapF(pprof.Index)) + pprofGroup.GET("/:any", gin.WrapF(pprof.Index)) + pprofGroup.GET("/cmdline", gin.WrapF(pprof.Cmdline)) + pprofGroup.GET("/profile", gin.WrapF(pprof.Profile)) + pprofGroup.GET("/symbol", gin.WrapF(pprof.Symbol)) + pprofGroup.GET("/trace", gin.WrapF(pprof.Trace)) + pprofGroup.GET("/threadcreate", gin.WrapF(pprof.Handler("threadcreate").ServeHTTP)) - err := c.Errors.Last() - var stdErr error - if err != nil { - stdErr = err.Err - } - // Do not log metrics related requests when there is no error - if strings.Contains(path, "/metrics") && err == nil { - return - } - log.Info(path, - zap.Int("status", c.Writer.Status()), - zap.String("method", c.Request.Method), - zap.String("path", path), - zap.String("query", query), - zap.String("ip", c.ClientIP()), - zap.String("user-agent", c.Request.UserAgent()), - zap.Error(stdErr), - zap.Duration("duration", cost), - ) + // Failpoint API + if util.FailpointBuild { + // `http.StripPrefix` is needed because `failpoint.HttpHandler` assumes that it handles the prefix `/`. + router.Any("/debug/fail/*any", gin.WrapH(http.StripPrefix("/debug/fail", &failpoint.HttpHandler{}))) } -} -func errorHandleMiddleware() gin.HandlerFunc { - return func(c *gin.Context) { - c.Next() - // because we will return immediately after an error occurs in http_handler - // there wil be only one error in c.Errors - lastError := c.Errors.Last() - if lastError != nil { - err := lastError.Err - // put the error into response - if api.IsHTTPBadRequestError(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - } else { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) - } - c.Abort() - return - } - } + // Promtheus metrics API + prometheus.DefaultGatherer = registry + router.Any("/metrics", gin.WrapH(promhttp.Handler())) } diff --git a/cdc/http_test.go b/cdc/http_test.go index a59719e468f..39398a82639 100644 --- a/cdc/http_test.go +++ b/cdc/http_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,182 +14,80 @@ package cdc import ( - "context" - "crypto/tls" - "encoding/json" + "bytes" "fmt" "net/http" - "strings" - "sync" - "time" + "net/http/httptest" + "testing" - "github.com/pingcap/check" - "github.com/pingcap/tidb/br/pkg/httputil" + "github.com/pingcap/failpoint" + + "github.com/gin-gonic/gin" "github.com/pingcap/tiflow/cdc/capture" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" - cerrors "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/retry" - security2 "github.com/pingcap/tiflow/pkg/security" - "github.com/pingcap/tiflow/pkg/util/testleak" - "github.com/tikv/pd/pkg/tempurl" + "github.com/stretchr/testify/require" ) -type httpStatusSuite struct{} - -var _ = check.Suite(&httpStatusSuite{}) - -const retryTime = 20 - -func (s *httpStatusSuite) TestServerTLSWithoutCommonName(c *check.C) { - defer testleak.AfterTest(c) - addr := tempurl.Alloc()[len("http://"):] - // Do not specify common name - security, err := security2.NewCredential4Test("") - c.Assert(err, check.IsNil) - conf := config.GetDefaultServerConfig() - conf.Addr = addr - conf.AdvertiseAddr = addr - conf.Security = &security - config.StoreGlobalServerConfig(conf) - - server, err := NewServer([]string{"https://127.0.0.1:2379"}) - server.capture = capture.NewCapture4Test() - c.Assert(err, check.IsNil) - err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) - c.Assert(err, check.IsNil) - defer func() { - c.Assert(server.statusServer.Close(), check.IsNil) - }() - - statusURL := fmt.Sprintf("https://%s/api/v1/status", addr) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := server.tcpServer.Run(ctx) - c.Check(err, check.ErrorMatches, ".*ErrTCPServerClosed.*") - }() - - // test cli sends request without a cert will success - err = retry.Do(ctx, func() error { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - cli := &http.Client{Transport: tr} - resp, err := cli.Get(statusURL) - if err != nil { - return err - } - decoder := json.NewDecoder(resp.Body) - captureInfo := &model.CaptureInfo{} - err = decoder.Decode(captureInfo) - c.Assert(err, check.IsNil) - c.Assert(captureInfo.ID, check.Equals, server.capture.Info().ID) - resp.Body.Close() - return nil - }, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError)) - c.Assert(err, check.IsNil) - - // test cli sends request with a cert will success - err = retry.Do(ctx, func() error { - tlsConfig, err := security.ToTLSConfigWithVerify() - if err != nil { - c.Assert(err, check.IsNil) - } - cli := httputil.NewClient(tlsConfig) - resp, err := cli.Get(statusURL) - if err != nil { - return err - } - decoder := json.NewDecoder(resp.Body) - captureInfo := &model.CaptureInfo{} - err = decoder.Decode(captureInfo) - c.Assert(err, check.IsNil) - c.Assert(captureInfo.ID, check.Equals, server.capture.Info().ID) - resp.Body.Close() - return nil - }, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError)) - c.Assert(err, check.IsNil) - - cancel() - wg.Wait() +type testCase struct { + url string + method string } -func (s *httpStatusSuite) TestServerTLSWithCommonName(c *check.C) { - defer testleak.AfterTest(c) - addr := tempurl.Alloc()[len("http://"):] - // specify a common name - security, err := security2.NewCredential4Test("test") - c.Assert(err, check.IsNil) - conf := config.GetDefaultServerConfig() - conf.Addr = addr - conf.AdvertiseAddr = addr - conf.Security = &security - config.StoreGlobalServerConfig(conf) - - server, err := NewServer([]string{"https://127.0.0.1:2379"}) - server.capture = capture.NewCapture4Test() - c.Assert(err, check.IsNil) - err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) - c.Assert(err, check.IsNil) - defer func() { - c.Assert(server.statusServer.Close(), check.IsNil) - }() - - statusURL := fmt.Sprintf("https://%s/api/v1/status", addr) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := server.tcpServer.Run(ctx) - c.Check(err, check.ErrorMatches, ".*ErrTCPServerClosed.*") - }() +func (a *testCase) String() string { + return fmt.Sprintf("%s:%s", a.method, a.url) +} - // test cli sends request without a cert will fail - err = retry.Do(ctx, func() error { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - cli := &http.Client{Transport: tr} - resp, err := cli.Get(statusURL) - if err != nil { - return err - } - decoder := json.NewDecoder(resp.Body) - captureInfo := &model.CaptureInfo{} - err = decoder.Decode(captureInfo) - c.Assert(err, check.IsNil) - c.Assert(captureInfo.ID, check.Equals, server.capture.Info().ID) - resp.Body.Close() - return nil - }, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError)) - c.Assert(strings.Contains(err.Error(), "remote error: tls: bad certificate"), check.IsTrue) +func TestPProfPath(t *testing.T) { + router := gin.New() + RegisterRoutes(router, capture.NewCapture4Test(false), nil) + + apis := []*testCase{ + {"/debug/pprof/", http.MethodGet}, + {"/debug/pprof/cmdline", http.MethodGet}, + {"/debug/pprof/symbol", http.MethodGet}, + // these two apis make will make ut slow + //{"/debug/pprof/profile", http.MethodGet}, + //{"/debug/pprof/trace", http.MethodGet}, + {"/debug/pprof/threadcreate", http.MethodGet}, + {"/debug/pprof/allocs", http.MethodGet}, + {"/debug/pprof/block", http.MethodGet}, + {"/debug/pprof/goroutine?debug=1", http.MethodGet}, + {"/debug/pprof/mutex?debug=1", http.MethodGet}, + } + for _, api := range apis { + w := httptest.NewRecorder() + req, _ := http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 200, w.Code, api.String()) + } +} - // test cli sends request with a cert will success - err = retry.Do(ctx, func() error { - tlsConfig, err := security.ToTLSConfigWithVerify() - if err != nil { - c.Assert(err, check.IsNil) - } - cli := httputil.NewClient(tlsConfig) - resp, err := cli.Get(statusURL) - if err != nil { - return err - } - decoder := json.NewDecoder(resp.Body) - captureInfo := &model.CaptureInfo{} - err = decoder.Decode(captureInfo) - c.Assert(err, check.IsNil) - c.Assert(captureInfo.ID, check.Equals, server.capture.Info().ID) - resp.Body.Close() - return nil - }, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError)) - c.Assert(err, check.IsNil) +func TestHandleFailpoint(t *testing.T) { + router := gin.New() + RegisterRoutes(router, capture.NewCapture4Test(false), nil) + fp := "github.com/pingcap/tiflow/cdc/TestHandleFailpoint" + uri := fmt.Sprintf("/debug/fail/%s", fp) + body := bytes.NewReader([]byte("return(true)")) + req, err := http.NewRequest("PUT", uri, body) + require.Nil(t, err) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + require.True(t, w.Code >= 200 && w.Code <= 300) + + failpointHit := false + failpoint.Inject("TestHandleFailpoint", func() { + failpointHit = true + }) + require.True(t, failpointHit) + + req, err = http.NewRequest("DELETE", uri, body) + require.Nil(t, err) + w = httptest.NewRecorder() + router.ServeHTTP(w, req) + require.True(t, w.Code >= 200 && w.Code <= 300) + + failpointHit = false + failpoint.Inject("TestHandleFailpoint", func() { + failpointHit = true + }) + require.False(t, failpointHit) } diff --git a/cdc/server.go b/cdc/server.go index 86d8529fbfb..cf3ab6f651c 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" tidbkv "github.com/pingcap/tidb/kv" - "github.com/pingcap/tiflow/cdc/api" "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/sorter/unified" @@ -218,16 +217,9 @@ func (s *Server) startStatusHTTP(lis net.Listener) error { // discard gin log output gin.DefaultWriter = io.Discard - router := gin.New() - - router.Use(logMiddleware()) - // request will timeout after 10 second - router.Use(timeoutMiddleware(time.Second * 10)) - router.Use(errorHandleMiddleware()) - // Register APIs. - api.RegisterRoutes(router, s.capture, registry) + RegisterRoutes(router, s.capture, registry) // No need to configure TLS because it is already handled by `s.tcpServer`. s.statusServer = &http.Server{Handler: router} diff --git a/cdc/server_test.go b/cdc/server_test.go index 567eb4c71f7..b6eccf3c74b 100644 --- a/cdc/server_test.go +++ b/cdc/server_test.go @@ -15,27 +15,36 @@ package cdc import ( "context" + "crypto/tls" + "encoding/json" + "fmt" + "net/http" "net/url" "os" "os/user" "path/filepath" "runtime" + "sync" "testing" "time" - "github.com/pingcap/check" + "github.com/pingcap/tidb/br/pkg/httputil" + "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" + cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" + "github.com/pingcap/tiflow/pkg/retry" + security2 "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/util" - "github.com/pingcap/tiflow/pkg/util/testleak" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/tempurl" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" "golang.org/x/sync/errgroup" ) -type serverSuite struct { +type testServer struct { server *Server e *embed.Etcd clientURL *url.URL @@ -44,19 +53,20 @@ type serverSuite struct { errg *errgroup.Group } -func (s *serverSuite) SetUpTest(c *check.C) { +func newServer(t *testing.T) *testServer { var err error - dir := c.MkDir() + dir := t.TempDir() + s := &testServer{} s.clientURL, s.e, err = etcd.SetupEmbedEtcd(dir) - c.Assert(err, check.IsNil) + require.Nil(t, err) pdEndpoints := []string{ "http://" + s.clientURL.Host, "http://invalid-pd-host:2379", } server, err := NewServer(pdEndpoints) - c.Assert(err, check.IsNil) - c.Assert(server, check.NotNil) + require.Nil(t, err) + require.NotNil(t, server) s.server = server s.ctx, s.cancel = context.WithCancel(context.Background()) @@ -65,78 +75,75 @@ func (s *serverSuite) SetUpTest(c *check.C) { Context: s.ctx, DialTimeout: 5 * time.Second, }) - c.Assert(err, check.IsNil) + require.Nil(t, err) etcdClient := etcd.NewCDCEtcdClient(s.ctx, client) s.server.etcdClient = &etcdClient - s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { c.Log(e) }) + s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { t.Log(e) }) + return s } -func (s *serverSuite) TearDownTest(c *check.C) { +func (s *testServer) close(t *testing.T) { s.server.Close() s.e.Close() s.cancel() err := s.errg.Wait() if err != nil { - c.Errorf("Error group error: %s", err) + t.Errorf("Error group error: %s", err) } } -var _ = check.Suite(&serverSuite{}) - -func (s *serverSuite) TestEtcdHealthChecker(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) +func TestServerBasic(t *testing.T) { + t.Parallel() + s := newServer(t) + defer s.close(t) + testEtcdHealthChecker(t, s) + testSetUpDataDir(t, s) +} +func testEtcdHealthChecker(t *testing.T, s *testServer) { s.errg.Go(func() error { err := s.server.etcdHealthChecker(s.ctx) - c.Assert(err, check.Equals, context.Canceled) + require.Equal(t, context.Canceled, err) return nil }) // longer than one check tick 3s time.Sleep(time.Second * 4) - s.cancel() } -func (s *serverSuite) TestSetUpDataDir(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) - +func testSetUpDataDir(t *testing.T, s *testServer) { conf := config.GetGlobalServerConfig() // DataDir is not set, and no changefeed exist, use the default conf.DataDir = "" err := s.server.setUpDir(s.ctx) - c.Assert(err, check.IsNil) - c.Assert(conf.DataDir, check.Equals, defaultDataDir) - c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(defaultDataDir, config.DefaultSortDir)) + require.Nil(t, err) + require.Equal(t, defaultDataDir, conf.DataDir) + require.Equal(t, filepath.Join(defaultDataDir, config.DefaultSortDir), conf.Sorter.SortDir) // DataDir is not set, but has existed changefeed, use the one with the largest available space conf.DataDir = "" - dir := c.MkDir() + dir := t.TempDir() err = s.server.etcdClient.SaveChangeFeedInfo(s.ctx, &model.ChangeFeedInfo{SortDir: dir}, "a") - c.Assert(err, check.IsNil) + require.Nil(t, err) err = s.server.etcdClient.SaveChangeFeedInfo(s.ctx, &model.ChangeFeedInfo{}, "b") - c.Assert(err, check.IsNil) + require.Nil(t, err) err = s.server.setUpDir(s.ctx) - c.Assert(err, check.IsNil) + require.Nil(t, err) - c.Assert(conf.DataDir, check.Equals, dir) - c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(dir, config.DefaultSortDir)) + require.Equal(t, dir, conf.DataDir) + require.Equal(t, filepath.Join(dir, config.DefaultSortDir), conf.Sorter.SortDir) - conf.DataDir = c.MkDir() + conf.DataDir = t.TempDir() // DataDir has been set, just use it err = s.server.setUpDir(s.ctx) - c.Assert(err, check.IsNil) - c.Assert(conf.DataDir, check.Not(check.Equals), "") - c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(conf.DataDir, config.DefaultSortDir)) - - s.cancel() + require.Nil(t, err) + require.NotEqual(t, "", conf.DataDir) + require.Equal(t, filepath.Join(conf.DataDir, config.DefaultSortDir), conf.Sorter.SortDir) } func TestCheckDir(t *testing.T) { - t.Parallel() me, err := user.Current() require.Nil(t, err) if me.Name == "root" || runtime.GOOS == "windows" { @@ -166,3 +173,154 @@ func TestCheckDir(t *testing.T) { _, err = checkDir(file) require.Error(t, err) } + +const retryTime = 20 + +func TestServerTLSWithoutCommonName(t *testing.T) { + addr := tempurl.Alloc()[len("http://"):] + // Do not specify common name + security, err := security2.NewCredential4Test("") + require.Nil(t, err) + conf := config.GetDefaultServerConfig() + conf.Addr = addr + conf.AdvertiseAddr = addr + conf.Security = &security + config.StoreGlobalServerConfig(conf) + + server, err := NewServer([]string{"https://127.0.0.1:2379"}) + server.capture = capture.NewCapture4Test(false) + require.Nil(t, err) + err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) + require.Nil(t, err) + defer func() { + require.Nil(t, server.statusServer.Close()) + }() + + statusURL := fmt.Sprintf("https://%s/api/v1/status", addr) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := server.tcpServer.Run(ctx) + require.Contains(t, err.Error(), "ErrTCPServerClosed") + }() + + // test cli sends request without a cert will success + err = retry.Do(ctx, func() error { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + cli := &http.Client{Transport: tr} + resp, err := cli.Get(statusURL) + if err != nil { + return err + } + decoder := json.NewDecoder(resp.Body) + captureInfo := &model.CaptureInfo{} + err = decoder.Decode(captureInfo) + require.Nil(t, err) + require.Equal(t, server.capture.Info().ID, captureInfo.ID) + resp.Body.Close() + return nil + }, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError)) + require.Nil(t, err) + + // test cli sends request with a cert will success + err = retry.Do(ctx, func() error { + tlsConfig, err := security.ToTLSConfigWithVerify() + require.Nil(t, err) + + cli := httputil.NewClient(tlsConfig) + resp, err := cli.Get(statusURL) + if err != nil { + return err + } + decoder := json.NewDecoder(resp.Body) + captureInfo := &model.CaptureInfo{} + err = decoder.Decode(captureInfo) + require.Nil(t, err) + require.Equal(t, server.capture.Info().ID, captureInfo.ID) + resp.Body.Close() + return nil + }, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError)) + require.Nil(t, err) + + cancel() + wg.Wait() +} + +func TestServerTLSWithCommonName(t *testing.T) { + addr := tempurl.Alloc()[len("http://"):] + // specify a common name + security, err := security2.NewCredential4Test("test") + require.Nil(t, err) + conf := config.GetDefaultServerConfig() + conf.Addr = addr + conf.AdvertiseAddr = addr + conf.Security = &security + config.StoreGlobalServerConfig(conf) + + server, err := NewServer([]string{"https://127.0.0.1:2379"}) + server.capture = capture.NewCapture4Test(false) + require.Nil(t, err) + err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) + require.Nil(t, err) + defer func() { + require.Nil(t, server.statusServer.Close()) + }() + + statusURL := fmt.Sprintf("https://%s/api/v1/status", addr) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := server.tcpServer.Run(ctx) + require.Contains(t, err.Error(), "ErrTCPServerClosed") + }() + + // test cli sends request without a cert will fail + err = retry.Do(ctx, func() error { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + cli := &http.Client{Transport: tr} + resp, err := cli.Get(statusURL) + if err != nil { + return err + } + decoder := json.NewDecoder(resp.Body) + captureInfo := &model.CaptureInfo{} + err = decoder.Decode(captureInfo) + require.Nil(t, err) + require.Equal(t, server.capture.Info().ID, captureInfo.ID) + resp.Body.Close() + return nil + }, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError)) + require.Contains(t, err.Error(), "remote error: tls: bad certificate") + + // test cli sends request with a cert will success + err = retry.Do(ctx, func() error { + tlsConfig, err := security.ToTLSConfigWithVerify() + require.Nil(t, err) + + cli := httputil.NewClient(tlsConfig) + resp, err := cli.Get(statusURL) + if err != nil { + return err + } + decoder := json.NewDecoder(resp.Body) + captureInfo := &model.CaptureInfo{} + err = decoder.Decode(captureInfo) + require.Nil(t, err) + require.Equal(t, server.capture.Info().ID, captureInfo.ID) + resp.Body.Close() + return nil + }, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError)) + require.Nil(t, err) +}