Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ha(dm): support store task config in Etcd #3620

Merged
merged 21 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ ErrConfigGenColumnMapping,[code=20046:class=config:scope=internal:level=high], "
ErrConfigInvalidChunkFileSize,[code=20047:class=config:scope=internal:level=high], "Message: invalid `chunk-filesize` %v, Workaround: Please check the `chunk-filesize` config in task configuration file."
ErrConfigOnlineDDLInvalidRegex,[code=20048:class=config:scope=internal:level=high], "Message: config '%s' regex pattern '%s' invalid, reason: %s, Workaround: Please check if params is correctly in the configuration file."
ErrConfigOnlineDDLMistakeRegex,[code=20049:class=config:scope=internal:level=high], "Message: online ddl sql '%s' invalid, table %s fail to match '%s' online ddl regex, Workaround: Please update your `shadow-table-rules` or `trash-table-rules` in the configuration file."
ErrOpenAPITaskConfigExist,[code=20050:class=config:scope=internal:level=low], "Message: the openapi task config for '%s' already exist, Workaround: If you want to override it, please use the overwrite flag."
ErrOpenAPITaskConfigNotExist,[code=20051:class=config:scope=internal:level=low], "Message: the openapi task config for '%s' does not exist"
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
8 changes: 7 additions & 1 deletion dm/dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,19 @@ var (
// tb2: +a +b +c
// tb3: +a +b +c
ShardDDLOptimismDroppedColumnsKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/dropped-columns/")

// OpenAPITaskConfigKeyAdapter is used to store the openapi task config (openapi.Task), now it's only used for WebUI.
// openapi.Task is a struct that can be converted to config.StubTaskConfig so if any field of openapi.Task updated
// user should use ha.PutOpenAPITaskConfig(key, openapi.Task,overwrite) to force update the content in etcd.
// k/v: Encode(task-name) -> openapi.Task.
OpenAPITaskConfigKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/openapi-task-config/")
)

func keyAdapterKeysLen(s KeyAdapter) int {
switch s {
case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter,
WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter,
UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter:
UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter, OpenAPITaskConfigKeyAdapter:
return 1
case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter,
ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter,
Expand Down
5 changes: 5 additions & 0 deletions dm/dm/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func (t *testCommon) TestKeyAdapter(c *C) {
adapter: UpstreamSubTaskKeyAdapter,
want: "/dm-master/upstream/subtask/6d7973716c31/e4b8ade6968731f09f8084efb88f",
},
{
keys: []string{"task-1"},
adapter: OpenAPITaskConfigKeyAdapter,
want: "/dm-master/openapi-task-config/7461736b2d31",
},
}

for _, ca := range testCases {
Expand Down
12 changes: 12 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,18 @@ description = ""
workaround = "Please update your `shadow-table-rules` or `trash-table-rules` in the configuration file."
tags = ["internal", "high"]

[error.DM-config-20050]
message = "the openapi task config for '%s' already exist"
description = ""
workaround = "If you want to override it, please use the overwrite flag."
tags = ["internal", "low"]

[error.DM-config-20051]
message = "the openapi task config for '%s' does not exist"
description = ""
workaround = ""
tags = ["internal", "low"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
12 changes: 12 additions & 0 deletions dm/openapi/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package openapi

import (
"encoding/json"

"github.com/pingcap/ticdc/dm/pkg/terror"
)

Expand All @@ -30,3 +32,13 @@ func (t *Task) Adjust() error {
}
return nil
}

// FromJSON unmarshal json to task.
func (t *Task) FromJSON(data []byte) error {
return json.Unmarshal(data, t)
}

// ToJSON marshal json to task.
func (t *Task) ToJSON() ([]byte, error) {
return json.Marshal(t)
}
135 changes: 135 additions & 0 deletions dm/pkg/ha/openapi_task_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ha

import (
"context"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/clientv3util"

"github.com/pingcap/ticdc/dm/dm/common"
"github.com/pingcap/ticdc/dm/openapi"
"github.com/pingcap/ticdc/dm/pkg/etcdutil"
"github.com/pingcap/ticdc/dm/pkg/terror"
)

func openAPITaskFromResp(resp *clientv3.GetResponse) (*openapi.Task, error) {
task := &openapi.Task{}
if resp.Count == 0 {
return nil, nil
} else if resp.Count > 1 {
// this should not happen.
return task, terror.ErrConfigMoreThanOne.Generate(resp.Count, "openapi.Task", "")
}
// we make sure only have one task config.
if err := task.FromJSON(resp.Kvs[0].Value); err != nil {
return task, err
}
return task, nil
}

// PutOpenAPITaskConfig puts the openapi task config of task-name.
func PutOpenAPITaskConfig(cli *clientv3.Client, task openapi.Task, overWrite bool) error {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

key := common.OpenAPITaskConfigKeyAdapter.Encode(task.Name)
taskJSON, err := task.ToJSON()
if err != nil {
return err // it should not happen.
}
txn := cli.Txn(ctx)
if !overWrite {
txn = txn.If(clientv3util.KeyMissing(key))
}
resp, err := txn.Then(clientv3.OpPut(key, string(taskJSON))).Commit()
if err != nil {
return err
}
// user don't want to overwrite and key already exists.
if !overWrite && !resp.Succeeded {
return terror.ErrOpenAPITaskConfigExist.Generate(task.Name)
}
return nil
}

// PutOpenAPITaskConfig puts the openapi task config of task-name.
func PutOpenAPITaskConfigIfExist(cli *clientv3.Client, task openapi.Task) error {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

key := common.OpenAPITaskConfigKeyAdapter.Encode(task.Name)
taskJSON, err := task.ToJSON()
if err != nil {
return err // it should not happen.
}
txn := cli.Txn(ctx).If(clientv3util.KeyExists(key)).Then(clientv3.OpPut(key, string(taskJSON)))
resp, err := txn.Commit()
if err != nil {
return err
}
// user want to update a key not exists.
if !resp.Succeeded {
return terror.ErrOpenAPITaskConfigNotExist.Generate(task.Name)
}
return nil
}

// DeleteOpenAPITaskConfig deletes the openapi task config of task-name.
func DeleteOpenAPITaskConfig(cli *clientv3.Client, taskName string) error {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()
if _, err := cli.Delete(ctx, common.OpenAPITaskConfigKeyAdapter.Encode(taskName)); err != nil {
return err
}
return nil
}

// GetOpenAPITaskConfig gets the openapi task config of task-name.
func GetOpenAPITaskConfig(cli *clientv3.Client, taskName string) (*openapi.Task, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

var (
task *openapi.Task
resp *clientv3.GetResponse
err error
)
resp, err = cli.Get(ctx, common.OpenAPITaskConfigKeyAdapter.Encode(taskName))
if err != nil {
return task, err
}
return openAPITaskFromResp(resp)
}

// GetAllOpenAPITaskConfig gets all openapi task config s.
func GetAllOpenAPITaskConfig(cli *clientv3.Client) ([]*openapi.Task, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

resp, err := cli.Get(ctx, common.OpenAPITaskConfigKeyAdapter.Path(), clientv3.WithPrefix())
if err != nil {
return nil, err
}
tasks := make([]*openapi.Task, resp.Count)
for i, kv := range resp.Kvs {
t := &openapi.Task{}
if err := t.FromJSON(kv.Value); err != nil {
return nil, err
}
tasks[i] = t
}
return tasks, nil
}
91 changes: 91 additions & 0 deletions dm/pkg/ha/openapi_task_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ha

import (
. "github.com/pingcap/check"

"github.com/pingcap/ticdc/dm/openapi"
"github.com/pingcap/ticdc/dm/openapi/fixtures"
"github.com/pingcap/ticdc/dm/pkg/terror"
)

func (t *testForEtcd) TestOpenAPITaskConfigEtcd(c *C) {
defer clearTestInfoOperation(c)

task1, err := fixtures.GenNoShardOpenAPITaskForTest()
task1.Name = "test-1"
c.Assert(err, IsNil)
task2, err := fixtures.GenShardAndFilterOpenAPITaskForTest()
task2.Name = "test-2"
c.Assert(err, IsNil)

// no openapi task config exist.
task1InEtcd, err := GetOpenAPITaskConfig(etcdTestCli, task1.Name)
c.Assert(err, IsNil)
c.Assert(task1InEtcd, IsNil)

task2InEtcd, err := GetOpenAPITaskConfig(etcdTestCli, task2.Name)
c.Assert(err, IsNil)
c.Assert(task2InEtcd, IsNil)

tasks, err := GetAllOpenAPITaskConfig(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 0)

// put openapi task config .
c.Assert(PutOpenAPITaskConfig(etcdTestCli, task1, false), IsNil)
c.Assert(PutOpenAPITaskConfig(etcdTestCli, task2, false), IsNil)

task1InEtcd, err = GetOpenAPITaskConfig(etcdTestCli, task1.Name)
c.Assert(err, IsNil)
c.Assert(*task1InEtcd, DeepEquals, task1)

task2InEtcd, err = GetOpenAPITaskConfig(etcdTestCli, task2.Name)
c.Assert(err, IsNil)
c.Assert(*task2InEtcd, DeepEquals, task2)

tasks, err = GetAllOpenAPITaskConfig(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)

// put openapi task config again without overwrite will fail
c.Assert(terror.ErrOpenAPITaskConfigExist.Equal(PutOpenAPITaskConfig(etcdTestCli, task1, false)), IsTrue)

// in overwrite mode, it will overwrite the old one.
task1.TaskMode = openapi.TaskTaskModeFull
c.Assert(PutOpenAPITaskConfig(etcdTestCli, task1, true), IsNil)
task1InEtcd, err = GetOpenAPITaskConfig(etcdTestCli, task1.Name)
c.Assert(err, IsNil)
c.Assert(*task1InEtcd, DeepEquals, task1)

// put task config that not exist will fail
task3, err := fixtures.GenNoShardOpenAPITaskForTest()
c.Assert(err, IsNil)
task3.Name = "test-3"
c.Assert(terror.ErrOpenAPITaskConfigNotExist.Equal(PutOpenAPITaskConfigIfExist(etcdTestCli, task3)), IsTrue)

// update exist openapi task config will success
task1.TaskMode = openapi.TaskTaskModeAll
c.Assert(PutOpenAPITaskConfigIfExist(etcdTestCli, task1), IsNil)
task1InEtcd, err = GetOpenAPITaskConfig(etcdTestCli, task1.Name)
c.Assert(err, IsNil)
c.Assert(*task1InEtcd, DeepEquals, task1)

// delete task config
c.Assert(DeleteOpenAPITaskConfig(etcdTestCli, task1.Name), IsNil)
tasks, err = GetAllOpenAPITaskConfig(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
}
4 changes: 4 additions & 0 deletions dm/pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ const (
codeConfigInvalidChunkFileSize
codeConfigOnlineDDLInvalidRegex
codeConfigOnlineDDLMistakeRegex
codeConfigOpenAPITaskConfigExist
codeConfigOpenAPITaskConfigNotExist
)

// Binlog operation error code list.
Expand Down Expand Up @@ -897,6 +899,8 @@ var (
"config '%s' regex pattern '%s' invalid, reason: %s", "Please check if params is correctly in the configuration file.")
ErrConfigOnlineDDLMistakeRegex = New(codeConfigOnlineDDLMistakeRegex, ClassConfig, ScopeInternal, LevelHigh,
"online ddl sql '%s' invalid, table %s fail to match '%s' online ddl regex", "Please update your `shadow-table-rules` or `trash-table-rules` in the configuration file.")
ErrOpenAPITaskConfigExist = New(codeConfigOpenAPITaskConfigExist, ClassConfig, ScopeInternal, LevelLow, "the openapi task config for '%s' already exist", "If you want to override it, please use the overwrite flag.")
ErrOpenAPITaskConfigNotExist = New(codeConfigOpenAPITaskConfigNotExist, ClassConfig, ScopeInternal, LevelLow, "the openapi task config for '%s' does not exist", "")

// Binlog operation error.
ErrBinlogExtractPosition = New(codeBinlogExtractPosition, ClassBinlogOp, ScopeInternal, LevelHigh, "", "")
Expand Down