Skip to content

Commit

Permalink
openapi(dm): support config relay when create source (#3600)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 authored Nov 26, 2021
1 parent e46ded9 commit 56ea821
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 140 deletions.
98 changes: 98 additions & 0 deletions dm/dm/config/source_converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"github.com/pingcap/ticdc/dm/openapi"
)

// SourceCfgToOpenAPISource converter SourceConfig to openapi.Source.
func SourceCfgToOpenAPISource(cfg *SourceConfig) openapi.Source {
source := openapi.Source{
EnableGtid: cfg.EnableGTID,
Host: cfg.From.Host,
Password: "******", // PM's requirement, we always return obfuscated password to user
Port: cfg.From.Port,
SourceName: cfg.SourceID,
User: cfg.From.User,
Purge: &openapi.Purge{
Expires: &cfg.Purge.Expires,
Interval: &cfg.Purge.Interval,
RemainSpace: &cfg.Purge.RemainSpace,
},
RelayConfig: &openapi.RelayConfig{
EnableRelay: &cfg.EnableRelay,
RelayBinlogGtid: &cfg.RelayBinlogGTID,
RelayBinlogName: &cfg.RelayBinLogName,
RelayDir: &cfg.RelayDir,
},
}
if cfg.From.Security != nil {
// NOTE we don't return security content here, because we don't want to expose it to the user.
var certAllowedCn []string
certAllowedCn = append(certAllowedCn, cfg.From.Security.CertAllowedCN...)
source.Security = &openapi.Security{CertAllowedCn: &certAllowedCn}
}
return source
}

// OpenAPISourceToSourceCfg converter openapi.Source to SourceConfig.
func OpenAPISourceToSourceCfg(source openapi.Source) *SourceConfig {
cfg := NewSourceConfig()
from := DBConfig{
Host: source.Host,
Port: source.Port,
User: source.User,
Password: source.Password,
}
if source.Security != nil {
from.Security = &Security{
SSLCABytes: []byte(source.Security.SslCaContent),
SSLKEYBytes: []byte(source.Security.SslKeyContent),
SSLCertBytes: []byte(source.Security.SslCertContent),
}
if source.Security.CertAllowedCn != nil {
from.Security.CertAllowedCN = *source.Security.CertAllowedCn
}
}
cfg.From = from
cfg.EnableGTID = source.EnableGtid
cfg.SourceID = source.SourceName
if purge := source.Purge; purge != nil {
if purge.Expires != nil {
cfg.Purge.Expires = *purge.Expires
}
if purge.Interval != nil {
cfg.Purge.Interval = *purge.Interval
}
if purge.RemainSpace != nil {
cfg.Purge.RemainSpace = *purge.RemainSpace
}
}
if relayConfig := source.RelayConfig; relayConfig != nil {
if relayConfig.EnableRelay != nil {
cfg.EnableRelay = *relayConfig.EnableRelay
}
if relayConfig.RelayBinlogGtid != nil {
cfg.RelayBinlogGTID = *relayConfig.RelayBinlogGtid
}
if relayConfig.RelayBinlogName != nil {
cfg.RelayBinLogName = *relayConfig.RelayBinlogName
}
if relayConfig.RelayDir != nil {
cfg.RelayDir = *relayConfig.RelayDir
}
}
return cfg
}
41 changes: 41 additions & 0 deletions dm/dm/config/source_converter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"github.com/pingcap/check"
"github.com/pingcap/ticdc/dm/openapi/fixtures"
)

func (t *testConfig) TestConverterWithSourceAndOpenAPISource(c *check.C) {
sourceCfg1, err := LoadFromFile(sourceSampleFile)
c.Assert(err, check.IsNil)

// 1. test user create source from dmctl, after convert to openapi.Source then convert back to source config
sourceCfg2 := OpenAPISourceToSourceCfg(SourceCfgToOpenAPISource(sourceCfg1))

// we need set ServerID and MaxAllowedPacket manually, because user don't need to config those field in openapi
sourceCfg2.ServerID = sourceCfg1.ServerID
sourceCfg2.From.MaxAllowedPacket = sourceCfg1.From.MaxAllowedPacket

// we only need to make sure the source config that user can see is the same as the source config that user create
c.Assert(sourceCfg1.String(), check.Equals, sourceCfg2.String())

// 2. test user create source from openapi, after convert to source config then convert back to openapi.Source
openapiSource1, err := fixtures.GenOpenAPISourceForTest()
c.Assert(err, check.IsNil)
openapiSource2 := SourceCfgToOpenAPISource(OpenAPISourceToSourceCfg(openapiSource1))
openapiSource2.Password = openapiSource1.Password // we set passwd to "******" for privacy
c.Assert(openapiSource1, check.DeepEquals, openapiSource2)
}
63 changes: 2 additions & 61 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *Server) DMAPICreateSource(c *gin.Context) {
_ = c.Error(err)
return
}
cfg := modelToSourceCfg(createSourceReq)
cfg := config.OpenAPISourceToSourceCfg(createSourceReq)
if err := checkAndAdjustSourceConfigFunc(c.Request.Context(), cfg); err != nil {
_ = c.Error(err)
return
Expand All @@ -193,7 +193,7 @@ func (s *Server) DMAPIGetSourceList(c *gin.Context, params openapi.DMAPIGetSourc
sourceMap := s.scheduler.GetSourceCfgs()
sourceList := []openapi.Source{}
for key := range sourceMap {
sourceList = append(sourceList, sourceCfgToModel(sourceMap[key]))
sourceList = append(sourceList, config.SourceCfgToOpenAPISource((sourceMap[key])))
}
// fill status
if params.WithStatus != nil && *params.WithStatus {
Expand Down Expand Up @@ -840,62 +840,3 @@ func terrorHTTPErrorHandler() gin.HandlerFunc {
c.IndentedJSON(http.StatusBadRequest, openapi.ErrorWithMessage{ErrorMsg: msg, ErrorCode: code})
}
}

func sourceCfgToModel(cfg *config.SourceConfig) openapi.Source {
// PM's requirement, we always return obfuscated password to user
source := openapi.Source{
EnableGtid: cfg.EnableGTID,
Host: cfg.From.Host,
Password: "******",
Port: cfg.From.Port,
SourceName: cfg.SourceID,
User: cfg.From.User,
Purge: &openapi.Purge{
Expires: &cfg.Purge.Expires,
Interval: &cfg.Purge.Interval,
RemainSpace: &cfg.Purge.RemainSpace,
},
}
if cfg.From.Security != nil {
// NOTE we don't return security content here, because we don't want to expose it to the user.
var certAllowedCn []string
certAllowedCn = append(certAllowedCn, cfg.From.Security.CertAllowedCN...)
source.Security = &openapi.Security{CertAllowedCn: &certAllowedCn}
}
return source
}

func modelToSourceCfg(source openapi.Source) *config.SourceConfig {
cfg := config.NewSourceConfig()
from := config.DBConfig{
Host: source.Host,
Port: source.Port,
User: source.User,
Password: source.Password,
}
if source.Security != nil {
from.Security = &config.Security{
SSLCABytes: []byte(source.Security.SslCaContent),
SSLKEYBytes: []byte(source.Security.SslKeyContent),
SSLCertBytes: []byte(source.Security.SslCertContent),
}
if source.Security.CertAllowedCn != nil {
from.Security.CertAllowedCN = *source.Security.CertAllowedCn
}
}
cfg.From = from
cfg.EnableGTID = source.EnableGtid
cfg.SourceID = source.SourceName
if purge := source.Purge; purge != nil {
if purge.Expires != nil {
cfg.Purge.Expires = *purge.Expires
}
if purge.Interval != nil {
cfg.Purge.Interval = *purge.Interval
}
if purge.RemainSpace != nil {
cfg.Purge.RemainSpace = *purge.RemainSpace
}
}
return cfg
}
57 changes: 57 additions & 0 deletions dm/openapi/fixtures/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package fixtures

import (
"encoding/json"

"github.com/pingcap/ticdc/dm/openapi"
)

var sourceStr = `
{
"source_name": "mysql-01",
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"password": "123456",
"enable_gtid": false,
"security": {
"ssl_ca_content": "",
"ssl_cert_content": "",
"ssl_key_content": "",
"cert_allowed_cn": [
"string"
]
},
"purge": {
"interval": 3600,
"expires": 0,
"remain_space": 15
},
"relay_config": {
"enable_relay": true,
"relay_binlog_name": "mysql-bin.000002",
"relay_binlog_gtid": "e9a1fc22-ec08-11e9-b2ac-0242ac110003:1-7849",
"relay_dir": "./relay_log"
}
}
`

// GenOpenAPISourceForTest generates openapi.Source for test.
func GenOpenAPISourceForTest() (openapi.Source, error) {
s := openapi.Source{}
err := json.Unmarshal([]byte(sourceStr), &s)
return s, err
}
Loading

0 comments on commit 56ea821

Please sign in to comment.