Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EXPERIMENTAL breaking down the vreplication flow #8044

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7d649b0
wip-vreplication-experiments
shlomi-noach May 5, 2021
c8e0924
Merge branch 'master' into vreplication-shorter-snapshots
shlomi-noach May 5, 2021
6a1b630
Merge branch 'master' into vreplication-shorter-snapshots
shlomi-noach May 9, 2021
7f30ea1
limit queries by maxpk
shlomi-noach May 9, 2021
4af712b
debug: print entire query
shlomi-noach May 10, 2021
362fab1
Merge branch 'master' into vreplication-shorter-snapshots
shlomi-noach May 10, 2021
24ccafd
Merge branch 'master' into vreplication-shorter-snapshots
shlomi-noach May 11, 2021
dcfc7ed
Merge branch 'master' into vreplication-shorter-snapshots
shlomi-noach May 18, 2021
f62e9dd
Merge branch 'master' into vreplication-shorter-snapshots
shlomi-noach May 19, 2021
0b4f34f
Merge branch 'master' into vreplication-shorter-snapshots
shlomi-noach May 19, 2021
c1a8fdd
marge master; resolve conflicts
shlomi-noach May 19, 2021
8bdf422
Merge branch 'master' into vreplication-shorter-snapshots
shlomi-noach May 20, 2021
f8073ac
unit test
shlomi-noach May 20, 2021
bc61eec
unit test
shlomi-noach May 20, 2021
a456174
safe short sql for debug
shlomi-noach May 20, 2021
e6f8d9f
expect 'replace into'
shlomi-noach May 20, 2021
11b02c3
short sql
shlomi-noach May 20, 2021
7811c72
expect 'replace'
shlomi-noach May 20, 2021
d46a1a6
fix unit tests
shlomi-noach May 20, 2021
7125705
Merge branch 'master' into vreplication-shorter-snapshots
shlomi-noach May 23, 2021
274fdd8
simplify test: accept substring, so that either 'insert into' or 'rep…
shlomi-noach May 23, 2021
90f6e4a
fix more insert/replace test cases
shlomi-noach May 23, 2021
a45384f
running with 'lightweight' snapshot, where transaction is created wit…
shlomi-noach May 23, 2021
e204d2c
lightweight wasn't a good idea. We need GTID to be <= row-copy
shlomi-noach May 23, 2021
80b3003
one extra fastforward, because our last actual batch of rows does not…
shlomi-noach May 23, 2021
212bfad
fast forward whenever a GTID is provided
shlomi-noach May 23, 2021
c1a438d
only REPLACE when no GROUP BY
shlomi-noach May 23, 2021
bed4c36
increase WaitForVReplicationToCatchup timeout
shlomi-noach May 24, 2021
94d814b
remove 'moreRows' logic, which adds a 5sec interval round trip to a s…
shlomi-noach May 24, 2021
ba772b6
add strictSnapshot bool, by which rowstreamer decides whether to run …
shlomi-noach May 24, 2021
c085c83
remove unused debug code
shlomi-noach May 24, 2021
ea21e74
reverting changes to vcopier; there's actually no logic changes neede…
shlomi-noach May 24, 2021
219cfd2
read rs.strictSnapshot from query comment directives
shlomi-noach May 24, 2021
f06f541
table_plan_builder injects strictSnapshot in query comment
shlomi-noach May 24, 2021
1188034
Merge branch 'main' into vreplication-shorter-snapshots
shlomi-noach Jun 5, 2021
9898bad
merge main, resolve conflict
shlomi-noach Jun 30, 2021
cca0d9b
remove redundant function argument
shlomi-noach Jul 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sqlparser

import (
"fmt"
"strconv"
"strings"
"unicode"
Expand Down Expand Up @@ -277,6 +278,17 @@ func (d CommentDirectives) IsSet(key string) bool {
return false
}

// IsTrue checks the directive map for the named directive and returns
// true if the directive is set and has a true value, which can be:
// - true boolean
// - 1 int
// - a string that evaluates as true
func (d CommentDirectives) IsTrue(key string) bool {
value := fmt.Sprintf("%v", d[key])
isTrue, _ := strconv.ParseBool(value)
return isTrue
}

// SkipQueryPlanCacheDirective returns true if skip query plan cache directive is set to true in query.
func SkipQueryPlanCacheDirective(stmt Statement) bool {
switch stmt := stmt.(type) {
Expand Down
48 changes: 43 additions & 5 deletions go/vt/sqlparser/comments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,9 @@ func TestExtractCommentDirectives(t *testing.T) {
}
})
}
}

func TestIsSet(t *testing.T) {
d := CommentDirectives{
"ONE_OPT": true,
"TWO_OPT": false,
Expand All @@ -369,28 +371,64 @@ func TestExtractCommentDirectives(t *testing.T) {
if !d.IsSet("ONE_OPT") {
t.Errorf("d.IsSet(ONE_OPT) should be true")
}

if d.IsSet("TWO_OPT") {
t.Errorf("d.IsSet(TWO_OPT) should be false")
}

if !d.IsSet("three") {
t.Errorf("d.IsSet(three) should be true")
}

if d.IsSet("four") {
t.Errorf("d.IsSet(four) should be false")
}

if d.IsSet("five") {
t.Errorf("d.IsSet(five) should be false")
}

if d.IsSet("six") {
t.Errorf("d.IsSet(six) should be false")
}
}

func TestIsTrue(t *testing.T) {
d := CommentDirectives{
"ONE_OPT": true,
"TWO_OPT": false,
"three": 1,
"four": 2,
"five": 0,
"six": "true",
"seven": "1",
"eight": "0",
}

if !d.IsTrue("ONE_OPT") {
t.Errorf("d.IsSet(ONE_OPT) should be true")
}
if d.IsTrue("TWO_OPT") {
t.Errorf("d.IsSet(TWO_OPT) should be false")
}
if !d.IsTrue("three") {
t.Errorf("d.IsSet(three) should be true")
}
if d.IsTrue("four") {
t.Errorf("d.IsSet(four) should be false")
}
if d.IsTrue("five") {
t.Errorf("d.IsSet(five) should be false")
}
if !d.IsTrue("six") {
t.Errorf("d.IsSet(six) should be true")
}
if !d.IsTrue("seven") {
t.Errorf("d.IsSet(seven) should be true")
}
if d.IsTrue("eight") {
t.Errorf("d.IsSet(eight) should be false")
}
if d.IsTrue("no-such-key") {
t.Errorf("d.IsSet(no-such-key) should be false")
}
}

func TestSkipQueryPlanCacheDirective(t *testing.T) {
stmt, _ := Parse("insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)")
if !SkipQueryPlanCacheDirective(stmt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestExternalConnectorCopy(t *testing.T) {

expectDBClientAndVreplicationQueries(t, []string{
"begin",
"insert into tab1(id,val) values (1,'a'), (2,'b')",
"* into tab1(id,val) values (1,'a'), (2,'b')",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* is a new "magic" prefix (like / is a magic prefix) that indicates "this is expected to be a substring in the actual result"

"/update _vt.copy_state",
"commit",
"/delete from _vt.copy_state",
Expand All @@ -72,7 +72,7 @@ func TestExternalConnectorCopy(t *testing.T) {
execStatements(t, []string{"insert into tab1 values(3, 'c')"})
expectDBClientQueries(t, []string{
"begin",
"insert into tab1(id,val) values (3,'c')",
"* into tab1(id,val) values (3,'c')",
"/update _vt.vreplication set pos=",
"commit",
})
Expand Down Expand Up @@ -162,8 +162,8 @@ func TestExternalConnectorPlay(t *testing.T) {

expectDBClientAndVreplicationQueries(t, []string{
"begin",
"insert into tab1(id,val) values (1,'a')",
"insert into tab1(id,val) values (2,'b')",
"* into tab1(id,val) values (1,'a')",
"* into tab1(id,val) values (2,'b')",
"/update _vt.vreplication set pos=",
"commit",
}, pos)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ func expectDBClientQueries(t *testing.T, queries []string, skippableOnce ...stri
}
return result
}
if query[0] == '*' {
return strings.Contains(got, query[1:])
}
return (got == query)
}
for i, query := range queries {
Expand Down Expand Up @@ -582,6 +585,8 @@ func expectNontxQueries(t *testing.T, queries []string) {
panic(err)
}
match = result
} else if query[0] == '*' {
match = strings.Contains(got, query[1:])
} else {
match = (got == query)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,17 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[st
},
})
}
sendRule.Filter = sqlparser.String(tpb.sendSelect)

strictSnapshot := false
switch tpb.onInsert {
case insertOnDup, insertIgnore:
strictSnapshot = true
}
comments := sqlparser.Comments{
fmt.Sprintf(`/*vt+ strictSnapshot=%v */`, strictSnapshot),
}
tpb.sendSelect.Comments = comments
sendRule.Filter = sqlparser.String(tpb.sendSelect)
tablePlan := tpb.generate()
tablePlan.SendRule = sendRule
tablePlan.EnumValuesMap = enumValuesMap
Expand Down Expand Up @@ -568,6 +577,9 @@ func (tpb *tablePlanBuilder) generateInsertStatement() *sqlparser.ParsedQuery {
func (tpb *tablePlanBuilder) generateInsertPart(buf *sqlparser.TrackedBuffer) *sqlparser.ParsedQuery {
if tpb.onInsert == insertIgnore {
buf.Myprintf("insert ignore into %v(", tpb.name)
} else if tpb.onInsert == insertNormal {
// the condition (tpb.onInsert == insertNormal) is true when there is no GROUP BY
buf.Myprintf("replace into %v(", tpb.name)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still WIP, replace into is not valid in:

  • Materialize, where GROUP BY is found
  • Online DDL, where a UNIQUE KEY is added

} else {
buf.Myprintf("insert into %v(", tpb.name)
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error {
// copyNext also builds the copyState metadata that contains the tables and their last
// primary key that was copied. A nil Result means that nothing has been copied.
// A table that was fully copied is removed from copyState.
func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSettings) error {
func (vc *vcopier) copyNext(ctx context.Context) error {
qr, err := vc.vr.dbClient.Execute(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id=%d", vc.vr.id))
if err != nil {
return err
Expand Down Expand Up @@ -324,6 +324,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
if err != nil {
return err
}
// Getting here means we are certain we are done with the copy, and we delete copy_state
log.Infof("Copy of %v finished at lastpk: %v", tableName, bv)
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("delete from _vt.copy_state where vrepl_id=%s and table_name=%s", strconv.Itoa(int(vc.vr.id)), encodeString(tableName))
Expand Down
62 changes: 31 additions & 31 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ func TestPlayerCopyCharPK(t *testing.T) {
"/update _vt.vreplication set message='Picked source tablet.*",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"insert into dst(idc,val) values ('a\\0',1)",
"* into dst(idc,val) values ('a\\0',1)",
`/update _vt.copy_state set lastpk='fields:{name:\\"idc\\" type:BINARY} rows:{lengths:2 values:\\"a\\\\x00\\"}' where vrepl_id=.*`,
`update dst set val=3 where idc='a\0' and ('a\0') <= ('a\0')`,
"insert into dst(idc,val) values ('c\\0',2)",
"* into dst(idc,val) values ('c\\0',2)",
`/update _vt.copy_state set lastpk='fields:{name:\\"idc\\" type:BINARY} rows:{lengths:2 values:\\"c\\\\x00\\"}' where vrepl_id=.*`,
"/delete from _vt.copy_state.*dst",
"/update _vt.vreplication set state='Running'",
Expand Down Expand Up @@ -218,12 +218,12 @@ func TestPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
"/update _vt.vreplication set message='Picked source tablet.*",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"insert into dst(idc,val) values ('a',1)",
"* into dst(idc,val) values ('a',1)",
`/update _vt.copy_state set lastpk='fields:{name:\\"idc\\" type:VARCHAR} rows:{lengths:1 values:\\"a\\"}' where vrepl_id=.*`,
`/insert into dst\(idc,val\) select 'B', 3 from dual where \( .* 'B' COLLATE .* \) <= \( .* 'a' COLLATE .* \)`,
"insert into dst(idc,val) values ('B',3)",
`/replace into dst\(idc,val\) select 'B', 3 from dual where \( .* 'B' COLLATE .* \) <= \( .* 'a' COLLATE .* \)`,
"* into dst(idc,val) values ('B',3)",
`/update _vt.copy_state set lastpk='fields:{name:\\"idc\\" type:VARCHAR} rows:{lengths:1 values:\\"B\\"}' where vrepl_id=.*`,
"insert into dst(idc,val) values ('c',2)",
"* into dst(idc,val) values ('c',2)",
`/update _vt.copy_state set lastpk='fields:{name:\\"idc\\" type:VARCHAR} rows:{lengths:1 values:\\"c\\"}' where vrepl_id=.*`,
"/delete from _vt.copy_state.*dst",
"/update _vt.vreplication set state='Running'",
Expand Down Expand Up @@ -322,10 +322,10 @@ func TestPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) {
"/update _vt.vreplication set message='Picked source tablet.*",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"insert into dst(id,idc,idc2,val) values (1,'a','a',1)",
"* into dst(id,idc,idc2,val) values (1,'a','a',1)",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} fields:{name:\\"idc\\" type:VARBINARY} fields:{name:\\"idc2\\" type:VARBINARY} rows:{lengths:1 lengths:1 lengths:1 values:\\"1aa\\"}' where vrepl_id=.*`,
`insert into dst(id,idc,idc2,val) select 1, 'B', 'B', 3 from dual where (1,'B','B') <= (1,'a','a')`,
"insert into dst(id,idc,idc2,val) values (1,'c','c',2)",
`replace into dst(id,idc,idc2,val) select 1, 'B', 'B', 3 from dual where (1,'B','B') <= (1,'a','a')`,
"* into dst(id,idc,idc2,val) values (1,'c','c',2)",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} fields:{name:\\"idc\\" type:VARBINARY} fields:{name:\\"idc2\\" type:VARBINARY} rows:{lengths:1 lengths:1 lengths:1 values:\\"1cc\\"}' where vrepl_id=.*`,
"/delete from _vt.copy_state.*dst",
"/update _vt.vreplication set state='Running'",
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestPlayerCopyTablesWithFK(t *testing.T) {
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into dst1(id,id2) values (1,1), (2,2)",
"* into dst1(id,id2) values (1,1), (2,2)",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"2\\"}' where vrepl_id=.*`,
"commit",
// copy of dst1 is done: delete from copy_state.
Expand All @@ -407,7 +407,7 @@ func TestPlayerCopyTablesWithFK(t *testing.T) {
"commit",
// copy dst2
"begin",
"insert into dst2(id,id2) values (1,21), (2,22)",
"* into dst2(id,id2) values (1,21), (2,22)",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"2\\"}' where vrepl_id=.*`,
"commit",
// copy of dst1 is done: delete from copy_state.
Expand Down Expand Up @@ -500,7 +500,7 @@ func TestPlayerCopyTables(t *testing.T) {
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into dst1(id,val) values (1,'aaa'), (2,'bbb')",
"* into dst1(id,val) values (1,'aaa'), (2,'bbb')",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"2\\"}' where vrepl_id=.*`,
"commit",
// copy of dst1 is done: delete from copy_state.
Expand Down Expand Up @@ -631,16 +631,16 @@ func TestPlayerCopyBigTable(t *testing.T) {
"/insert into _vt.copy_state",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set state='Copying'",
"insert into dst(id,val) values (1,'aaa')",
"* into dst(id,val) values (1,'aaa')",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"1\\"}' where vrepl_id=.*`,
// The next catchup executes the new row insert, but will be a no-op.
"insert into dst(id,val) select 3, 'ccc' from dual where (3) <= (1)",
"* into dst(id,val) select 3, 'ccc' from dual where (3) <= (1)",
// fastForward has nothing to add. Just saves position.
// Second row gets copied.
"insert into dst(id,val) values (2,'bbb')",
"* into dst(id,val) values (2,'bbb')",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"2\\"}' where vrepl_id=.*`,
// Third row copied without going back to catchup state.
"insert into dst(id,val) values (3,'ccc')",
"* into dst(id,val) values (3,'ccc')",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"3\\"}' where vrepl_id=.*`,
"/delete from _vt.copy_state.*dst",
// Copy is done. Go into running state.
Expand Down Expand Up @@ -746,16 +746,16 @@ func TestPlayerCopyWildcardRule(t *testing.T) {
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// The first fast-forward has no starting point. So, it just saves the current position.
"insert into src(id,val) values (1,'aaa')",
"* into src(id,val) values (1,'aaa')",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"1\\"}' where vrepl_id=.*`,
// The next catchup executes the new row insert, but will be a no-op.
"insert into src(id,val) select 3, 'ccc' from dual where (3) <= (1)",
"* into src(id,val) select 3, 'ccc' from dual where (3) <= (1)",
// fastForward has nothing to add. Just saves position.
// Second row gets copied.
"insert into src(id,val) values (2,'bbb')",
"* into src(id,val) values (2,'bbb')",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"2\\"}' where vrepl_id=.*`,
// Third row copied without going back to catchup state.
"insert into src(id,val) values (3,'ccc')",
"* into src(id,val) values (3,'ccc')",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"3\\"}' where vrepl_id=.*`,
"/delete from _vt.copy_state.*src",
// Copy is done. Go into running state.
Expand Down Expand Up @@ -883,23 +883,23 @@ func TestPlayerCopyTableContinuation(t *testing.T) {
expectNontxQueries(t, []string{
// Catchup
"/update _vt.vreplication set message='Picked source tablet.*",
"insert into dst1(id,val) select 1, 'insert in' from dual where (1,1) <= (6,6)",
"insert into dst1(id,val) select 7, 'insert out' from dual where (7,7) <= (6,6)",
"* into dst1(id,val) select 1, 'insert in' from dual where (1,1) <= (6,6)",
"* into dst1(id,val) select 7, 'insert out' from dual where (7,7) <= (6,6)",
"update dst1 set val='updated' where id=3 and (3,3) <= (6,6)",
"update dst1 set val='updated' where id=10 and (10,10) <= (6,6)",
"delete from dst1 where id=4 and (4,4) <= (6,6)",
"delete from dst1 where id=9 and (9,9) <= (6,6)",
"delete from dst1 where id=5 and (5,5) <= (6,6)",
"insert into dst1(id,val) select 5, 'move within' from dual where (5,10) <= (6,6)",
"* into dst1(id,val) select 5, 'move within' from dual where (5,10) <= (6,6)",
"delete from dst1 where id=6 and (6,6) <= (6,6)",
"insert into dst1(id,val) select 12, 'move out' from dual where (12,6) <= (6,6)",
"* into dst1(id,val) select 12, 'move out' from dual where (12,6) <= (6,6)",
"delete from dst1 where id=11 and (11,11) <= (6,6)",
"insert into dst1(id,val) select 4, 'move in' from dual where (4,11) <= (6,6)",
"* into dst1(id,val) select 4, 'move in' from dual where (4,11) <= (6,6)",
"update copied set val='bbb' where id=1",
// Fast-forward
"update dst1 set val='updated again' where id=3 and (3,3) <= (6,6)",
// Copy
"insert into dst1(id,val) values (7,'insert out'), (8,'no change'), (10,'updated'), (12,'move out')",
"* into dst1(id,val) values (7,'insert out'), (8,'no change'), (10,'updated'), (12,'move out')",
`/update _vt.copy_state set lastpk='fields:{name:\\"id1\\" type:INT32} fields:{name:\\"id2\\" type:INT32} rows:{lengths:2 lengths:1 values:\\"126\\"}' where vrepl_id=.*`,
"/delete from _vt.copy_state.*dst1",
// Copy again. There should be no events for catchup.
Expand Down Expand Up @@ -995,9 +995,9 @@ func TestPlayerCopyWildcardTableContinuation(t *testing.T) {
"/insert into _vt.vreplication",
"/update _vt.vreplication set state = 'Copying'",
"/update _vt.vreplication set message='Picked source tablet.*",
"insert into dst(id,val) select 4, 'new' from dual where (4) <= (2)",
"* into dst(id,val) select 4, 'new' from dual where (4) <= (2)",
// Copy
"insert into dst(id,val) values (3,'uncopied'), (4,'new')",
"* into dst(id,val) values (3,'uncopied'), (4,'new')",
`/update _vt.copy_state set lastpk.*`,
"/delete from _vt.copy_state.*dst",
"/update _vt.vreplication set state='Running'",
Expand Down Expand Up @@ -1082,7 +1082,7 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) {
"/update _vt.vreplication set state = 'Copying'",
"/update _vt.vreplication set message='Picked source tablet.*",
// Copy
"insert into dst(id,val) values (3,'uncopied'), (4,'new')",
"* into dst(id,val) values (3,'uncopied'), (4,'new')",
`/update _vt.copy_state set lastpk.*`,
"/delete from _vt.copy_state.*dst",
"/update _vt.vreplication set state='Running'",
Expand Down Expand Up @@ -1188,7 +1188,7 @@ func TestPlayerCopyTablesStopAfterCopy(t *testing.T) {
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into dst1(id,val) values (1,'aaa'), (2,'bbb')",
"* into dst1(id,val) values (1,'aaa'), (2,'bbb')",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"2\\"}' where vrepl_id=.*`,
"commit",
// copy of dst1 is done: delete from copy_state.
Expand Down Expand Up @@ -1267,7 +1267,7 @@ func TestPlayerCopyTableCancel(t *testing.T) {
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into dst1(id,val) values (1,'aaa'), (2,'bbb')",
"* into dst1(id,val) values (1,'aaa'), (2,'bbb')",
`/update _vt.copy_state set lastpk='fields:{name:\\"id\\" type:INT32} rows:{lengths:1 values:\\"2\\"}' where vrepl_id=.*`,
"commit",
// copy of dst1 is done: delete from copy_state.
Expand Down
Loading