Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openapi(dm): add task config template releated openapi #3656

Merged
merged 7 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/openapi"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/dm/pkg/ha"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
Expand Down Expand Up @@ -822,6 +823,123 @@ func (s *Server) DMAPIOperateTableStructure(c *gin.Context, taskName string, sou
}
}

// DMAPIImportTaskConfig create task_config_template url is: (POST /api/v1/task/configs/import).
func (s *Server) DMAPIImportTaskConfig(c *gin.Context) {
var req openapi.TaskConfigRequest
if err := c.Bind(&req); err != nil {
_ = c.Error(err)
return
}
resp := openapi.TaskConfigResponse{
FailedTaskList: []struct {
ErrorMsg string `json:"error_msg"`
TaskName string `json:"task_name"`
}{},
SuccessTaskList: []string{},
}
for _, task := range config.SubTaskConfigsToOpenAPITask(s.scheduler.GetSubTaskCfgs()) {
if err := ha.PutOpenAPITaskConfig(s.etcdClient, task, req.Overwrite); err != nil {
resp.FailedTaskList = append(resp.FailedTaskList, struct {
ErrorMsg string `json:"error_msg"`
TaskName string `json:"task_name"`
}{
ErrorMsg: err.Error(),
TaskName: task.Name,
})
} else {
resp.SuccessTaskList = append(resp.SuccessTaskList, task.Name)
}
}
c.IndentedJSON(http.StatusAccepted, resp)
}

// DMAPICreateTaskConfig create task_config_template url is: (POST /api/task/configs).
func (s *Server) DMAPICreateTaskConfig(c *gin.Context) {
task := &openapi.Task{}
if err := c.Bind(task); err != nil {
_ = c.Error(err)
return
}
if err := task.Adjust(); err != nil {
_ = c.Error(err)
return
}
// prepare target db config
newCtx := c.Request.Context()
toDBCfg := config.GetTargetDBCfgFromOpenAPITask(task)
if adjustDBErr := adjustTargetDB(newCtx, toDBCfg); adjustDBErr != nil {
_ = c.Error(terror.WithClass(adjustDBErr, terror.ClassDMMaster))
return
}
if err := ha.PutOpenAPITaskConfig(s.etcdClient, *task, false); err != nil {
_ = c.Error(err)
return
}
c.IndentedJSON(http.StatusCreated, task)
}

// DMAPIGetTaskConfigList get task_config_template list url is: (GET /api/v1/task/configs).
func (s *Server) DMAPIGetTaskConfigList(c *gin.Context) {
TaskConfigList, err := ha.GetAllOpenAPITaskConfig(s.etcdClient)
if err != nil {
_ = c.Error(err)
return
}
taskList := make([]openapi.Task, len(TaskConfigList))
for i, TaskConfig := range TaskConfigList {
taskList[i] = *TaskConfig
}
resp := openapi.GetTaskListResponse{Total: len(TaskConfigList), Data: taskList}
c.IndentedJSON(http.StatusOK, resp)
}

// DMAPIDeleteTaskConfig delete task_config_template url is: (DELETE /api/v1/task/configs/{task-name}).
func (s *Server) DMAPIDeleteTaskConfig(c *gin.Context, taskName string) {
if err := ha.DeleteOpenAPITaskConfig(s.etcdClient, taskName); err != nil {
_ = c.Error(err)
return
}
c.Status(http.StatusNoContent)
}

// DMAPIGetTaskConfig get task_config_template url is: (GET /api/v1/task/configs/{task-name}).
func (s *Server) DMAPIGetTaskConfig(c *gin.Context, taskName string) {
task, err := ha.GetOpenAPITaskConfig(s.etcdClient, taskName)
if err != nil {
_ = c.Error(err)
return
}
if task == nil {
_ = c.Error(terror.ErrOpenAPITaskConfigNotExist.Generate(taskName))
return
}
c.IndentedJSON(http.StatusOK, task)
}

// DMAPUpdateTaskConfig update task_config_template url is: (PUT /api/v1/task/configs/{task-name}).
func (s *Server) DMAPUpdateTaskConfig(c *gin.Context, taskName string) {
task := &openapi.Task{}
if err := c.Bind(task); err != nil {
_ = c.Error(err)
return
}
if err := task.Adjust(); err != nil {
_ = c.Error(err)
return
}
newCtx := c.Request.Context()
toDBCfg := config.GetTargetDBCfgFromOpenAPITask(task)
if adjustDBErr := adjustTargetDB(newCtx, toDBCfg); adjustDBErr != nil {
_ = c.Error(terror.WithClass(adjustDBErr, terror.ClassDMMaster))
return
}
if err := ha.UpdateOpenAPITaskConfig(s.etcdClient, *task); err != nil {
_ = c.Error(err)
return
}
c.IndentedJSON(http.StatusOK, task)
}

func terrorHTTPErrorHandler() gin.HandlerFunc {
return func(c *gin.Context) {
c.Next()
Expand Down
133 changes: 133 additions & 0 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,33 @@ func (t *openAPISuite) TestTaskAPI(c *check.C) {
c.Assert(resultTaskList.Total, check.Equals, 1)
c.Assert(resultTaskList.Data[0].Name, check.Equals, task.Name)

// test batch import task config
taskBatchImportURL := "/api/v1/task/configs/import"
req := openapi.TaskConfigRequest{Overwrite: false}
result = testutil.NewRequest().Post(taskBatchImportURL).WithJsonBody(req).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusAccepted)
var resp openapi.TaskConfigResponse
c.Assert(result.UnmarshalBodyToObject(&resp), check.IsNil)
c.Assert(resp.SuccessTaskList, check.HasLen, 1)
c.Assert(resp.SuccessTaskList[0], check.Equals, task.Name)
c.Assert(resp.FailedTaskList, check.HasLen, 0)

// import again without overwrite will fail
result = testutil.NewRequest().Post(taskBatchImportURL).WithJsonBody(req).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusAccepted)
c.Assert(result.UnmarshalBodyToObject(&resp), check.IsNil)
c.Assert(resp.SuccessTaskList, check.HasLen, 0)
c.Assert(resp.FailedTaskList, check.HasLen, 1)
c.Assert(resp.FailedTaskList[0].TaskName, check.Equals, task.Name)

// import again with overwrite will success
req.Overwrite = true
result = testutil.NewRequest().Post(taskBatchImportURL).WithJsonBody(req).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.UnmarshalBodyToObject(&resp), check.IsNil)
c.Assert(resp.SuccessTaskList, check.HasLen, 1)
c.Assert(resp.SuccessTaskList[0], check.Equals, task.Name)
c.Assert(resp.FailedTaskList, check.HasLen, 0)

// pause and resume task
pauseTaskURL := fmt.Sprintf("%s/%s/pause", taskURL, task.Name)
result = testutil.NewRequest().Post(pauseTaskURL).GoWithHTTPHandler(t.testT, s.openapiHandles)
Expand Down Expand Up @@ -634,6 +661,112 @@ func (t *openAPISuite) TestClusterAPI(c *check.C) {
cancel1()
}

func (t *openAPISuite) TestTaskConfigsAPI(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
s := setupServer(ctx, c)
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB", `return(true)`), check.IsNil)
checker.CheckSyncConfigFunc = mockCheckSyncConfig
defer func() {
checker.CheckSyncConfigFunc = checker.CheckSyncConfig
cancel()
s.Close()
c.Assert(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB"), check.IsNil)
}()

dbCfg := config.GetDBConfigForTest()
source1 := openapi.Source{
SourceName: source1Name,
EnableGtid: false,
Host: dbCfg.Host,
Password: dbCfg.Password,
Port: dbCfg.Port,
User: dbCfg.User,
}
// create source
sourceURL := "/api/v1/sources"
result := testutil.NewRequest().Post(sourceURL).WithJsonBody(source1).GoWithHTTPHandler(t.testT, s.openapiHandles)
// check http status code
c.Assert(result.Code(), check.Equals, http.StatusCreated)

// create task config template
url := "/api/v1/task/configs"

task, err := fixtures.GenNoShardOpenAPITaskForTest()
c.Assert(err, check.IsNil)
// use a valid target db
task.TargetConfig.Host = dbCfg.Host
task.TargetConfig.Port = dbCfg.Port
task.TargetConfig.User = dbCfg.User
task.TargetConfig.Password = dbCfg.Password

// create one
result = testutil.NewRequest().Post(url).WithJsonBody(task).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusCreated)
var createTaskResp openapi.Task
err = result.UnmarshalBodyToObject(&createTaskResp)
c.Assert(err, check.IsNil)
c.Assert(task.Name, check.Equals, createTaskResp.Name)

// create again will fail
result = testutil.NewRequest().Post(url).WithJsonBody(task).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusBadRequest)
var errResp openapi.ErrorWithMessage
err = result.UnmarshalBodyToObject(&errResp)
c.Assert(err, check.IsNil)
c.Assert(errResp.ErrorCode, check.Equals, int(terror.ErrOpenAPITaskConfigExist.Code()))

// list templates
result = testutil.NewRequest().Get(url).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusOK)
var resultTaskList openapi.GetTaskListResponse
err = result.UnmarshalBodyToObject(&resultTaskList)
c.Assert(err, check.IsNil)
c.Assert(resultTaskList.Total, check.Equals, 1)
c.Assert(resultTaskList.Data[0].Name, check.Equals, task.Name)

// get detail
oneURL := fmt.Sprintf("%s/%s", url, task.Name)
result = testutil.NewRequest().Get(oneURL).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusOK)
var respTask openapi.Task
err = result.UnmarshalBodyToObject(&respTask)
c.Assert(err, check.IsNil)
c.Assert(respTask.Name, check.Equals, task.Name)

// get not exist
notExistURL := fmt.Sprintf("%s/%s", url, "notexist")
result = testutil.NewRequest().Get(notExistURL).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusBadRequest)
err = result.UnmarshalBodyToObject(&errResp)
c.Assert(err, check.IsNil)
c.Assert(errResp.ErrorCode, check.Equals, int(terror.ErrOpenAPITaskConfigNotExist.Code()))

// update
task.TaskMode = openapi.TaskTaskModeAll
result = testutil.NewRequest().Put(oneURL).WithJsonBody(task).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusOK)
err = result.UnmarshalBodyToObject(&respTask)
c.Assert(err, check.IsNil)
c.Assert(respTask.Name, check.Equals, task.Name)

// update not exist will fail
task.Name = "notexist"
result = testutil.NewRequest().Put(notExistURL).WithJsonBody(task).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusBadRequest)
err = result.UnmarshalBodyToObject(&errResp)
c.Assert(err, check.IsNil)
c.Assert(errResp.ErrorCode, check.Equals, int(terror.ErrOpenAPITaskConfigNotExist.Code()))

// delete task config template
result = testutil.NewRequest().Delete(oneURL).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusNoContent)
result = testutil.NewRequest().Get(url).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusOK)
err = result.UnmarshalBodyToObject(&resultTaskList)
c.Assert(err, check.IsNil)
c.Assert(resultTaskList.Total, check.Equals, 0)
}

func setupServer(ctx context.Context, c *check.C) *Server {
// create a new cluster
cfg1 := NewConfig()
Expand Down
Loading