-
Notifications
You must be signed in to change notification settings - Fork 286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
reactor:multiple delete executor #482
Changes from 6 commits
a20c3e3
c3ce911
a7cb52b
c479086
756a430
e987042
1529f7b
20f5bfb
23acf4a
164cfcc
6d76920
a6d3fae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,214 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package at | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"database/sql/driver" | ||
"fmt" | ||
"strings" | ||
) | ||
|
||
import ( | ||
"github.com/arana-db/parser/ast" | ||
"github.com/arana-db/parser/format" | ||
|
||
"github.com/seata/seata-go/pkg/datasource/sql/datasource" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. format import header There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这格式还是不对 |
||
"github.com/seata/seata-go/pkg/datasource/sql/exec" | ||
"github.com/seata/seata-go/pkg/datasource/sql/parser" | ||
"github.com/seata/seata-go/pkg/datasource/sql/types" | ||
"github.com/seata/seata-go/pkg/datasource/sql/util" | ||
"github.com/seata/seata-go/pkg/util/log" | ||
) | ||
|
||
type multiDeleteExecutor struct { | ||
baseExecutor | ||
parserCtx *types.ParseContext | ||
execContext *types.ExecContext | ||
} | ||
|
||
func (m *multiDeleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) { | ||
m.beforeHooks(ctx, m.execContext) | ||
defer func() { | ||
m.afterHooks(ctx, m.execContext) | ||
}() | ||
|
||
beforeImage, err := m.BeforeImage(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
res, err := f(ctx, m.execContext.Query, m.execContext.NamedValues) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
afterImage, err := m.AfterImage(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
m.execContext.TxCtx.RoundImages.AppendBeofreImages(beforeImage) | ||
m.execContext.TxCtx.RoundImages.AppendAfterImages(afterImage) | ||
return res, nil | ||
} | ||
|
||
type multiDelete struct { | ||
sql string | ||
clear bool | ||
} | ||
|
||
//NewMultiDeleteExecutor get multiDelete executor | ||
func NewMultiDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) executor { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move NewMultiDeleteExecutor to |
||
return &multiDeleteExecutor{parserCtx: parserCtx, execContext: execContent, baseExecutor: baseExecutor{hooks: hooks}} | ||
} | ||
|
||
func (m *multiDeleteExecutor) BeforeImage(ctx context.Context) ([]*types.RecordImage, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 首字母小写:beforeImage |
||
deletes := strings.Split(m.execContext.Query, ";") | ||
multiQuery, args, err := m.buildBeforeImageSQL(deletes, m.execContext.NamedValues) | ||
if err != nil { | ||
return nil, err | ||
} | ||
var ( | ||
rowsi driver.Rows | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. delete 空行 |
||
image *types.RecordImage | ||
records []*types.RecordImage | ||
) | ||
|
||
queryerCtx, ok := m.execContext.Conn.(driver.QueryerContext) | ||
var queryer driver.Queryer | ||
if !ok { | ||
queryer, ok = m.execContext.Conn.(driver.Queryer) | ||
} | ||
if ok { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if ok的条件可以删了,因为上面已经退出了 |
||
for _, sql := range multiQuery { | ||
rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, sql, args) | ||
defer func() { | ||
if rowsi != nil { | ||
rowsi.Close() | ||
} | ||
}() | ||
if err != nil { | ||
log.Errorf("ctx driver query: %+v", err) | ||
return nil, err | ||
} | ||
tableName, _ := m.parserCtx.GetTableName() | ||
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, m.execContext.DBName, tableName) | ||
if err != nil { | ||
log.Errorf("stmt query: %+v", err) | ||
return nil, err | ||
} | ||
image, err = m.buildRecordImages(rowsi, metaData) | ||
if err != nil { | ||
log.Errorf("record images : %+v", err) | ||
return nil, err | ||
} | ||
records = append(records, image) | ||
|
||
lockKey := m.buildLockKey(image, *metaData) | ||
m.execContext.TxCtx.LockKeys[lockKey] = struct{}{} | ||
} | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. else 逻辑 改为 if !ok{} ,放在79行前面 |
||
log.Errorf("target conn should been driver.QueryerContext or driver.Queryer") | ||
return nil, fmt.Errorf("invalid conn") | ||
} | ||
|
||
return records, err | ||
|
||
} | ||
|
||
func (m *multiDeleteExecutor) AfterImage(ctx context.Context) ([]*types.RecordImage, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 首字母小写:afterImage |
||
return nil, nil | ||
} | ||
|
||
func (m *multiDeleteExecutor) buildBeforeImageSQL(multiQuery []string, args []driver.NamedValue) ([]string, []driver.NamedValue, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. multiQuery 参数可以从m.execContext.Query拿到,args参数可以从m.execContext.NamedValues拿到,这里可以不传参数进来 |
||
var ( | ||
err error | ||
buf, param bytes.Buffer | ||
p *types.ParseContext | ||
tableName string | ||
tables = make(map[string]multiDelete, len(multiQuery)) | ||
) | ||
|
||
for _, query := range multiQuery { | ||
p, err = parser.DoParser(query) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
tableName = p.DeleteStmt.TableRefs.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O | ||
|
||
v, ok := tables[tableName] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 151~156行代码改为如下: ps, err := parser.DoParser(m.execContext.Query)
if err != nil {
return nil, nil, err
}
for _, p = range ps.MultiStmt {
tableName = p.DeleteStmt.TableRefs.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 牛蛙 |
||
if ok && v.clear { | ||
continue | ||
} | ||
buf.WriteString("delete from ") | ||
buf.WriteString(tableName) | ||
if p.DeleteStmt.Where == nil { | ||
tables[tableName] = multiDelete{sql: buf.String(), clear: true} | ||
buf.Reset() | ||
continue | ||
} else { | ||
buf.WriteString(" where ") | ||
} | ||
|
||
_ = p.DeleteStmt.Where.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, ¶m)) | ||
v, ok = tables[tableName] | ||
if ok { | ||
buf.Reset() | ||
buf.WriteString(v.sql) | ||
buf.WriteString(" or ") | ||
} | ||
|
||
buf.Write(param.Bytes()) | ||
tables[tableName] = multiDelete{sql: buf.String()} | ||
|
||
buf.Reset() | ||
param.Reset() | ||
} | ||
|
||
var ( | ||
items = make([]string, 0, len(tables)) | ||
values = make([]driver.NamedValue, 0, len(tables)) | ||
selStmt = ast.SelectStmt{ | ||
SelectStmtOpts: &ast.SelectStmtOpts{}, | ||
From: p.DeleteStmt.TableRefs, | ||
Where: p.DeleteStmt.Where, | ||
Fields: &ast.FieldList{Fields: []*ast.SelectField{{WildCard: &ast.WildCardField{}}}}, | ||
OrderBy: p.DeleteStmt.Order, | ||
TableHints: p.DeleteStmt.TableHints, | ||
LockInfo: &ast.SelectLockInfo{LockType: ast.SelectLockForUpdate}, | ||
} | ||
) | ||
for _, table := range tables { | ||
p, _ = parser.DoParser(table.sql) | ||
|
||
selStmt.From = p.DeleteStmt.TableRefs | ||
selStmt.Where = p.DeleteStmt.Where | ||
_ = selStmt.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, &buf)) | ||
items = append(items, buf.String()) | ||
buf.Reset() | ||
if table.clear { | ||
values = append(values, m.buildSelectArgs(&selStmt, nil)...) | ||
} else { | ||
values = append(values, m.buildSelectArgs(&selStmt, args)...) | ||
} | ||
} | ||
return items, values, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package at | ||
|
||
import ( | ||
"database/sql/driver" | ||
"testing" | ||
) | ||
|
||
import ( | ||
"github.com/seata/seata-go/pkg/datasource/sql/util" | ||
"github.com/seata/seata-go/pkg/util/log" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestNewMultiDeleteExecutor(t *testing.T) { | ||
executor := NewMultiDeleteExecutor(nil, nil, nil) | ||
_, ok := executor.(*multiDeleteExecutor) | ||
assert.Equalf(t, true, ok, "should be *multiDeleteExecutor") | ||
} | ||
|
||
func Test_multiDeleteExecutor_buildBeforeImageSQL(t *testing.T) { | ||
log.Init() | ||
var ( | ||
builder = multiDeleteExecutor{} | ||
) | ||
tests := []struct { | ||
name string | ||
sourceQuery []string | ||
sourceQueryArgs []driver.Value | ||
expectQuery string | ||
expectQueryArgs []driver.Value | ||
}{ | ||
{ | ||
sourceQuery: []string{"delete from table_update_executor_test where id = ?", "delete from table_update_executor_test"}, | ||
sourceQueryArgs: []driver.Value{3}, | ||
expectQuery: "SELECT SQL_NO_CACHE * FROM table_update_executor_test FOR UPDATE", | ||
expectQueryArgs: []driver.Value{}, | ||
}, | ||
{ | ||
sourceQuery: []string{"delete from table_update_executor_test2 where id = ?", "delete from table_update_executor_test2 where id = ?"}, | ||
sourceQueryArgs: []driver.Value{3, 2}, | ||
expectQuery: "SELECT SQL_NO_CACHE * FROM table_update_executor_test2 WHERE id=? OR id=? FOR UPDATE", | ||
expectQueryArgs: []driver.Value{3, 2}, | ||
}, | ||
{ | ||
sourceQuery: []string{"delete from table_update_executor_test2 where id = ?", "delete from table_update_executor_test2 where name = ? and age = ?"}, | ||
sourceQueryArgs: []driver.Value{3, "seata-go", 4}, | ||
expectQuery: "SELECT SQL_NO_CACHE * FROM table_update_executor_test2 WHERE id=? OR name=? AND age=? FOR UPDATE", | ||
expectQueryArgs: []driver.Value{3, "seata-go", 4}, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
query, args, err := builder.buildBeforeImageSQL(tt.sourceQuery, util.ValueToNamedValue(tt.sourceQueryArgs)) | ||
assert.Nil(t, err) | ||
assert.Equal(t, 1, len(query)) | ||
assert.Equal(t, query[0], tt.expectQuery) | ||
assert.Equal(t, util.ValueToNamedValue(tt.expectQueryArgs), args) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add License header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
直接执行 sh goimport.sh脚本就行