diff --git a/spanner/spansql/parser.go b/spanner/spansql/parser.go index 10dd9b646069..2de6334bc47e 100644 --- a/spanner/spansql/parser.go +++ b/spanner/spansql/parser.go @@ -2081,39 +2081,15 @@ func (p *parser) parseCreateChangeStream() (*CreateChangeStream, *parseError) { return nil, err } - if err := p.expect("FOR"); err != nil { - return nil, err - } - cs := &CreateChangeStream{Name: csname, Position: pos} - if p.eat("ALL") { - cs.WatchAllTables = true - } else { - for { - tname, err := p.parseTableOrIndexOrColumnName() - if err != nil { - return nil, err - } - pos := p.Pos() - wd := WatchDef{Table: tname, Position: pos} - - if p.sniff("(") { - columns, err := p.parseColumnNameList() - if err != nil { - return nil, err - } - wd.Columns = columns - } else { - wd.WatchAllCols = true - } - - cs.Watch = append(cs.Watch, wd) - if p.eat(",") { - continue - } - break + if p.sniff("FOR") { + watch, watchAllTables, err := p.parseChangeStreamWatches() + if err != nil { + return nil, err } + cs.Watch = watch + cs.WatchAllTables = watchAllTables } if p.sniff("OPTIONS") { @@ -2145,19 +2121,79 @@ func (p *parser) parseAlterChangeStream() (*AlterChangeStream, *parseError) { } acs := &AlterChangeStream{Name: csname, Position: pos} - if err := p.expect("SET"); err != nil { - return nil, err + + tok := p.next() + if tok.err != nil { + return nil, tok.err } - // TODO: Support for altering watch - if p.sniff("OPTIONS") { - options, err := p.parseChangeStreamOptions() - if err != nil { + switch { + default: + return nil, p.errorf("got %q, expected SET or DROP", tok.value) + case tok.caseEqual("SET"): + if p.sniff("OPTIONS") { + options, err := p.parseChangeStreamOptions() + if err != nil { + return nil, err + } + acs.Alteration = AlterChangeStreamOptions{Options: options} + return acs, nil + } + if p.sniff("FOR") { + watch, watchAllTables, err := p.parseChangeStreamWatches() + if err != nil { + return nil, err + } + acs.Alteration = AlterWatch{Watch: watch, WatchAllTables: watchAllTables} + return acs, nil + } + return nil, p.errorf("got %q, expected FOR or OPTIONS", p.next()) + case tok.caseEqual("DROP"): + if err := p.expect("FOR", "ALL"); err != nil { return nil, err } - acs.Alteration = AlterChangeStreamOptions{Options: options} + acs.Alteration = DropChangeStreamWatch{} return acs, nil } - return nil, p.errorf("got %q, expected OPTIONS", p.next()) +} + +func (p *parser) parseChangeStreamWatches() ([]WatchDef, bool, *parseError) { + debugf("parseChangeStreamWatches: %v", p) + + if err := p.expect("FOR"); err != nil { + return nil, false, err + } + + if p.eat("ALL") { + return nil, true, nil + } + + watchDefs := []WatchDef{} + for { + tname, err := p.parseTableOrIndexOrColumnName() + if err != nil { + return nil, false, err + } + pos := p.Pos() + wd := WatchDef{Table: tname, Position: pos} + + if p.sniff("(") { + columns, err := p.parseColumnNameList() + if err != nil { + return nil, false, err + } + wd.Columns = columns + } else { + wd.WatchAllCols = true + } + + watchDefs = append(watchDefs, wd) + if p.eat(",") { + continue + } + break + } + + return watchDefs, false, nil } func (p *parser) parseChangeStreamOptions() (ChangeStreamOptions, *parseError) { diff --git a/spanner/spansql/parser_test.go b/spanner/spansql/parser_test.go index c2d90c81db3d..848d5e83f049 100644 --- a/spanner/spansql/parser_test.go +++ b/spanner/spansql/parser_test.go @@ -1164,6 +1164,106 @@ func TestParseDDL(t *testing.T) { }, }, }, + { + `CREATE CHANGE STREAM csname; + CREATE CHANGE STREAM csname FOR ALL; + CREATE CHANGE STREAM csname FOR tname, tname2(cname); + CREATE CHANGE STREAM csname FOR ALL OPTIONS (retention_period = '36h', value_capture_type = 'NEW_VALUES');`, + &DDL{ + Filename: "filename", + List: []DDLStmt{ + &CreateChangeStream{ + Name: "csname", + Position: line(1), + }, + &CreateChangeStream{ + Name: "csname", + WatchAllTables: true, + Position: line(2), + }, + &CreateChangeStream{ + Name: "csname", + Watch: []WatchDef{ + {Table: "tname", WatchAllCols: true, Position: line(3)}, + {Table: "tname2", Columns: []ID{ID("cname")}, Position: line(3)}, + }, + Position: line(3), + }, + &CreateChangeStream{ + Name: "csname", + WatchAllTables: true, + Position: line(4), + Options: ChangeStreamOptions{ + RetentionPeriod: func(b string) *string { return &b }("36h"), + ValueCaptureType: func(b string) *string { return &b }("NEW_VALUES"), + }, + }, + }, + }, + }, + { + `ALTER CHANGE STREAM csname SET FOR ALL; + ALTER CHANGE STREAM csname SET FOR tname, tname2(cname); + ALTER CHANGE STREAM csname DROP FOR ALL; + ALTER CHANGE STREAM csname SET OPTIONS (retention_period = '36h', value_capture_type = 'NEW_VALUES');`, + &DDL{ + Filename: "filename", + List: []DDLStmt{ + &AlterChangeStream{ + Name: "csname", + Alteration: AlterWatch{ + WatchAllTables: true, + }, + Position: line(1), + }, + &AlterChangeStream{ + Name: "csname", + Alteration: AlterWatch{ + Watch: []WatchDef{ + { + Table: "tname", + WatchAllCols: true, + Position: Position{Line: 2, Offset: 78}, + }, + { + Table: "tname2", + Columns: []ID{"cname"}, + Position: Position{Line: 2, Offset: 85}, + }, + }, + }, + Position: line(2), + }, + &AlterChangeStream{ + Name: "csname", + Alteration: DropChangeStreamWatch{}, + Position: line(3), + }, + &AlterChangeStream{ + Name: "csname", + Alteration: AlterChangeStreamOptions{ + Options: ChangeStreamOptions{ + RetentionPeriod: func(b string) *string { return &b }("36h"), + ValueCaptureType: func(b string) *string { return &b }("NEW_VALUES"), + }, + }, + Position: line(4), + }, + }, + }, + }, + { + `DROP CHANGE STREAM csname`, + &DDL{ + Filename: "filename", + List: []DDLStmt{ + &DropChangeStream{ + Name: "csname", + Position: line(1), + }, + }, + }, + }, } for _, test := range tests { got, err := ParseDDL("filename", test.in) diff --git a/spanner/spansql/sql.go b/spanner/spansql/sql.go index 7cb84abd53af..d3a2b7484784 100644 --- a/spanner/spansql/sql.go +++ b/spanner/spansql/sql.go @@ -106,17 +106,7 @@ func (cs CreateChangeStream) SQL() string { if i > 0 { str += ", " } - str += table.Table.SQL() - if !table.WatchAllCols { - str += "(" - for i, c := range table.Columns { - if i > 0 { - str += ", " - } - str += c.SQL() - } - str += ")" - } + str += table.SQL() } } if cs.Options != (ChangeStreamOptions{}) { @@ -126,6 +116,21 @@ func (cs CreateChangeStream) SQL() string { return str } +func (w WatchDef) SQL() string { + str := w.Table.SQL() + if !w.WatchAllCols { + str += "(" + for i, c := range w.Columns { + if i > 0 { + str += ", " + } + str += c.SQL() + } + str += ")" + } + return str +} + func (dt DropTable) SQL() string { return "DROP TABLE " + dt.Name.SQL() } @@ -143,11 +148,29 @@ func (dc DropChangeStream) SQL() string { } func (acs AlterChangeStream) SQL() string { - return "ALTER CHANGE STREAM " + acs.Name.SQL() + " SET " + acs.Alteration.SQL() + return "ALTER CHANGE STREAM " + acs.Name.SQL() + " " + acs.Alteration.SQL() +} + +func (scsw AlterWatch) SQL() string { + str := "SET FOR " + if scsw.WatchAllTables { + return str + "ALL" + } + for i, table := range scsw.Watch { + if i > 0 { + str += ", " + } + str += table.SQL() + } + return str } func (ao AlterChangeStreamOptions) SQL() string { - return ao.Options.SQL() + return "SET " + ao.Options.SQL() +} + +func (dcsw DropChangeStreamWatch) SQL() string { + return "DROP FOR ALL" } func (cso ChangeStreamOptions) SQL() string { diff --git a/spanner/spansql/sql_test.go b/spanner/spansql/sql_test.go index 0c36405d29a6..3e24352e89fb 100644 --- a/spanner/spansql/sql_test.go +++ b/spanner/spansql/sql_test.go @@ -415,6 +415,26 @@ func TestSQL(t *testing.T) { "ALTER DATABASE dbname SET OPTIONS (optimizer_version=null, optimizer_statistics_package=null, version_retention_period=null, enable_key_visualizer=null, default_leader=null)", reparseDDL, }, + { + &CreateChangeStream{ + Name: "csname", + Watch: []WatchDef{ + {Table: "Ta", WatchAllCols: true, Position: line(1)}, + {Table: "Tsub", Columns: []ID{ID("Hash")}, Position: line(1)}, + }, + Position: line(1), + }, + "CREATE CHANGE STREAM csname FOR Ta, Tsub(`Hash`)", + reparseDDL, + }, + { + &DropChangeStream{ + Name: "csname", + Position: line(1), + }, + "DROP CHANGE STREAM csname", + reparseDDL, + }, { &CreateChangeStream{ Name: "csname", @@ -440,17 +460,52 @@ func TestSQL(t *testing.T) { "CREATE CHANGE STREAM csname FOR ALL OPTIONS (retention_period='7d', value_capture_type='NEW_VALUES')", reparseDDL, }, + { + &AlterChangeStream{ + Name: "csname", + Alteration: AlterWatch{ + WatchAllTables: true, + }, + Position: line(1), + }, + "ALTER CHANGE STREAM csname SET FOR ALL", + reparseDDL, + }, + { + &AlterChangeStream{ + Name: "csname", + Alteration: AlterWatch{ + Watch: []WatchDef{ + {Table: "Ta", WatchAllCols: true, Position: Position{Line: 1, Offset: 35}}, + {Table: "Tsub", Columns: []ID{ID("Hash")}, Position: Position{Line: 1, Offset: 39}}, + }, + }, + Position: line(1), + }, + "ALTER CHANGE STREAM csname SET FOR Ta, Tsub(`Hash`)", + reparseDDL, + }, { &AlterChangeStream{ Name: "csname", Alteration: AlterChangeStreamOptions{ Options: ChangeStreamOptions{ + RetentionPeriod: func(s string) *string { return &s }("7d"), ValueCaptureType: func(s string) *string { return &s }("NEW_VALUES"), }, }, Position: line(1), }, - "ALTER CHANGE STREAM csname SET OPTIONS (value_capture_type='NEW_VALUES')", + "ALTER CHANGE STREAM csname SET OPTIONS (retention_period='7d', value_capture_type='NEW_VALUES')", + reparseDDL, + }, + { + &AlterChangeStream{ + Name: "csname", + Alteration: DropChangeStreamWatch{}, + Position: line(1), + }, + "ALTER CHANGE STREAM csname DROP FOR ALL", reparseDDL, }, { diff --git a/spanner/spansql/types.go b/spanner/spansql/types.go index 8db0541d045a..89f818dea2ad 100644 --- a/spanner/spansql/types.go +++ b/spanner/spansql/types.go @@ -1084,10 +1084,15 @@ type ChangeStreamAlteration interface { } func (AlterWatch) isChangeStreamAlteration() {} +func (DropChangeStreamWatch) isChangeStreamAlteration() {} func (AlterChangeStreamOptions) isChangeStreamAlteration() {} type ( - AlterWatch struct{ Watch []WatchDef } + AlterWatch struct { + WatchAllTables bool + Watch []WatchDef + } + DropChangeStreamWatch struct{} AlterChangeStreamOptions struct{ Options ChangeStreamOptions } )