Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reactor:multiple delete executor #482

Merged
merged 12 commits into from
Mar 4, 2023
214 changes: 214 additions & 0 deletions pkg/datasource/sql/exec/at/multi_delete_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package at
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add License header

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接执行 sh goimport.sh脚本就行


import (
"bytes"
"context"
"database/sql/driver"
"fmt"
"strings"
)

import (
"github.com/arana-db/parser/ast"
"github.com/arana-db/parser/format"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format import header

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这格式还是不对

"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/parser"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/util"
"github.com/seata/seata-go/pkg/util/log"
)

type multiDeleteExecutor struct {
baseExecutor
parserCtx *types.ParseContext
execContext *types.ExecContext
}

func (m *multiDeleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) {
m.beforeHooks(ctx, m.execContext)
defer func() {
m.afterHooks(ctx, m.execContext)
}()

beforeImage, err := m.BeforeImage(ctx)
if err != nil {
return nil, err
}

res, err := f(ctx, m.execContext.Query, m.execContext.NamedValues)
if err != nil {
return nil, err
}

afterImage, err := m.AfterImage(ctx)
if err != nil {
return nil, err
}

m.execContext.TxCtx.RoundImages.AppendBeofreImages(beforeImage)
m.execContext.TxCtx.RoundImages.AppendAfterImages(afterImage)
return res, nil
}

type multiDelete struct {
sql string
clear bool
}

//NewMultiDeleteExecutor get multiDelete executor
func NewMultiDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) executor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move NewMultiDeleteExecutor to type multiDeleteExecutor struct next line

return &multiDeleteExecutor{parserCtx: parserCtx, execContext: execContent, baseExecutor: baseExecutor{hooks: hooks}}
}

func (m *multiDeleteExecutor) BeforeImage(ctx context.Context) ([]*types.RecordImage, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

首字母小写:beforeImage

deletes := strings.Split(m.execContext.Query, ";")
multiQuery, args, err := m.buildBeforeImageSQL(deletes, m.execContext.NamedValues)
if err != nil {
return nil, err
}
var (
rowsi driver.Rows

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete 空行

image *types.RecordImage
records []*types.RecordImage
)

queryerCtx, ok := m.execContext.Conn.(driver.QueryerContext)
var queryer driver.Queryer
if !ok {
queryer, ok = m.execContext.Conn.(driver.Queryer)
}
if ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ok的条件可以删了,因为上面已经退出了

for _, sql := range multiQuery {
rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, sql, args)
defer func() {
if rowsi != nil {
rowsi.Close()
}
}()
if err != nil {
log.Errorf("ctx driver query: %+v", err)
return nil, err
}
tableName, _ := m.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, m.execContext.DBName, tableName)
if err != nil {
log.Errorf("stmt query: %+v", err)
return nil, err
}
image, err = m.buildRecordImages(rowsi, metaData)
if err != nil {
log.Errorf("record images : %+v", err)
return nil, err
}
records = append(records, image)

lockKey := m.buildLockKey(image, *metaData)
m.execContext.TxCtx.LockKeys[lockKey] = struct{}{}
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else 逻辑 改为 if !ok{} ,放在79行前面

log.Errorf("target conn should been driver.QueryerContext or driver.Queryer")
return nil, fmt.Errorf("invalid conn")
}

return records, err

}

func (m *multiDeleteExecutor) AfterImage(ctx context.Context) ([]*types.RecordImage, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

首字母小写:afterImage

return nil, nil
}

func (m *multiDeleteExecutor) buildBeforeImageSQL(multiQuery []string, args []driver.NamedValue) ([]string, []driver.NamedValue, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiQuery 参数可以从m.execContext.Query拿到,args参数可以从m.execContext.NamedValues拿到,这里可以不传参数进来

var (
err error
buf, param bytes.Buffer
p *types.ParseContext
tableName string
tables = make(map[string]multiDelete, len(multiQuery))
)

for _, query := range multiQuery {
p, err = parser.DoParser(query)
if err != nil {
return nil, nil, err
}
tableName = p.DeleteStmt.TableRefs.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O

v, ok := tables[tableName]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

151~156行代码改为如下:

ps, err := parser.DoParser(m.execContext.Query)
if err != nil {
    return nil, nil, err
}

for _, p = range ps.MultiStmt {
    tableName = p.DeleteStmt.TableRefs.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

牛蛙

if ok && v.clear {
continue
}
buf.WriteString("delete from ")
buf.WriteString(tableName)
if p.DeleteStmt.Where == nil {
tables[tableName] = multiDelete{sql: buf.String(), clear: true}
buf.Reset()
continue
} else {
buf.WriteString(" where ")
}

_ = p.DeleteStmt.Where.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, &param))
v, ok = tables[tableName]
if ok {
buf.Reset()
buf.WriteString(v.sql)
buf.WriteString(" or ")
}

buf.Write(param.Bytes())
tables[tableName] = multiDelete{sql: buf.String()}

buf.Reset()
param.Reset()
}

var (
items = make([]string, 0, len(tables))
values = make([]driver.NamedValue, 0, len(tables))
selStmt = ast.SelectStmt{
SelectStmtOpts: &ast.SelectStmtOpts{},
From: p.DeleteStmt.TableRefs,
Where: p.DeleteStmt.Where,
Fields: &ast.FieldList{Fields: []*ast.SelectField{{WildCard: &ast.WildCardField{}}}},
OrderBy: p.DeleteStmt.Order,
TableHints: p.DeleteStmt.TableHints,
LockInfo: &ast.SelectLockInfo{LockType: ast.SelectLockForUpdate},
}
)
for _, table := range tables {
p, _ = parser.DoParser(table.sql)

selStmt.From = p.DeleteStmt.TableRefs
selStmt.Where = p.DeleteStmt.Where
_ = selStmt.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, &buf))
items = append(items, buf.String())
buf.Reset()
if table.clear {
values = append(values, m.buildSelectArgs(&selStmt, nil)...)
} else {
values = append(values, m.buildSelectArgs(&selStmt, args)...)
}
}
return items, values, nil
}
78 changes: 78 additions & 0 deletions pkg/datasource/sql/exec/at/multi_delete_executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package at

import (
"database/sql/driver"
"testing"
)

import (
"github.com/seata/seata-go/pkg/datasource/sql/util"
"github.com/seata/seata-go/pkg/util/log"

"github.com/stretchr/testify/assert"
)

func TestNewMultiDeleteExecutor(t *testing.T) {
executor := NewMultiDeleteExecutor(nil, nil, nil)
_, ok := executor.(*multiDeleteExecutor)
assert.Equalf(t, true, ok, "should be *multiDeleteExecutor")
}

func Test_multiDeleteExecutor_buildBeforeImageSQL(t *testing.T) {
log.Init()
var (
builder = multiDeleteExecutor{}
)
tests := []struct {
name string
sourceQuery []string
sourceQueryArgs []driver.Value
expectQuery string
expectQueryArgs []driver.Value
}{
{
sourceQuery: []string{"delete from table_update_executor_test where id = ?", "delete from table_update_executor_test"},
sourceQueryArgs: []driver.Value{3},
expectQuery: "SELECT SQL_NO_CACHE * FROM table_update_executor_test FOR UPDATE",
expectQueryArgs: []driver.Value{},
},
{
sourceQuery: []string{"delete from table_update_executor_test2 where id = ?", "delete from table_update_executor_test2 where id = ?"},
sourceQueryArgs: []driver.Value{3, 2},
expectQuery: "SELECT SQL_NO_CACHE * FROM table_update_executor_test2 WHERE id=? OR id=? FOR UPDATE",
expectQueryArgs: []driver.Value{3, 2},
},
{
sourceQuery: []string{"delete from table_update_executor_test2 where id = ?", "delete from table_update_executor_test2 where name = ? and age = ?"},
sourceQueryArgs: []driver.Value{3, "seata-go", 4},
expectQuery: "SELECT SQL_NO_CACHE * FROM table_update_executor_test2 WHERE id=? OR name=? AND age=? FOR UPDATE",
expectQueryArgs: []driver.Value{3, "seata-go", 4},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query, args, err := builder.buildBeforeImageSQL(tt.sourceQuery, util.ValueToNamedValue(tt.sourceQueryArgs))
assert.Nil(t, err)
assert.Equal(t, 1, len(query))
assert.Equal(t, query[0], tt.expectQuery)
assert.Equal(t, util.ValueToNamedValue(tt.expectQueryArgs), args)
})
}
}