Skip to content

Commit

Permalink
changefeedccl/cdctest: fixes fingerprint validator to work with all d…
Browse files Browse the repository at this point in the history
…ata types

Previously, fingerprint validator parses json message and passes the parsed data
into argument $1 for UPSERT, INSERT. This usually works,
but gojson.Unmarshal does not work well with all data types. This patch changes
fingerprint validator to parse the data by referring to the table info for their
column types and parses it using datums.

Resolves: cockroachdb#134159
Release note: none
  • Loading branch information
wenyihu6 committed Jan 15, 2025
1 parent b103f98 commit 47a979e
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 61 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ go_library(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descs",
"//pkg/sql/isql",
"//pkg/sql/rowenc",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/testutils/sqlutils",
"//pkg/util",
"//pkg/util/fsm",
Expand All @@ -36,6 +40,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_stretchr_testify//require",
],
Expand All @@ -56,13 +61,15 @@ go_test(
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//require",
],
)
5 changes: 1 addition & 4 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ type NemesesOption struct {
var NemesesOptions = []NemesesOption{
{
EnableFpValidator: true,
EnableSQLSmith: false,
},
{
EnableFpValidator: false,
EnableSQLSmith: true,
},
}
Expand Down Expand Up @@ -194,6 +190,7 @@ func RunNemesis(
},
}

ns.eventMix[eventCreateEnum{}] = 0
// Create the table and set up some initial splits.
if _, err := db.Exec(`CREATE TABLE foo (id INT PRIMARY KEY, ts STRING DEFAULT '0')`); err != nil {
return nil, err
Expand Down
165 changes: 125 additions & 40 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@ package cdctest

import (
"bytes"
"context"
gosql "database/sql"
gojson "encoding/json"
"fmt"
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
"sort"
"strconv"
"strings"
)

// Validator checks for violations of our changefeed ordering and delivery
Expand Down Expand Up @@ -510,41 +516,133 @@ func (v *FingerprintValidator) NoteRow(
return nil
}

func (v *FingerprintValidator) fetchTableColTypes(
tableName string, updated hlc.Timestamp,
) (map[string]*types.T, error) {
parts := strings.Split(tableName, ".")
var table string
switch len(parts) {
case 1:
table = parts[0]
case 2:
_ = parts[0] + "."
table = parts[1]
default:
return nil, errors.Errorf("could not parse table %s", parts)
}

colToType := make(map[string]*types.T)
if err := v.sqlDBFunc(func(db *gosql.DB) error {
var rows *gosql.Rows
queryStr := fmt.Sprintf(`SELECT a.attname AS column_name, t.oid AS type_oid, t.typname AS type_name
FROM pg_attribute a JOIN pg_type t ON a.atttypid = t.oid AS OF SYSTEM TIME '%s'
WHERE a.attrelid = $1::regclass AND a.attnum > 0`, updated.AsOfSystemTime())
rows, err := db.Query(queryStr, table)
if err != nil {
return err
}
defer func() { _ = rows.Close() }()
type result struct {
keyColumn string
oid string
typeName string
}

var results []result
for rows.Next() {
var keyColumn, oidStr, typeName string
if err := rows.Scan(&keyColumn, &oidStr, &typeName); err != nil {
return err
}
if err := rows.Err(); err != nil {
return err
}
results = append(results, result{
keyColumn: keyColumn,
oid: oidStr,
typeName: typeName,
})
oidNum, err := strconv.Atoi(oidStr)
if err != nil {
return err
}
colToType[keyColumn] = types.OidToType[oid.Oid(oidNum)]
}
if len(results) == 0 {
return errors.Errorf("no columns found for table %s", table)
}
return nil
}); err != nil {
return nil, err
}
return colToType, nil
}

// applyRowUpdate applies the update represented by `row` to the scratch table.
func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
defer func() {
_err = errors.Wrap(_err, "FingerprintValidator failed")
}()

var args []interface{}
var primaryKeyDatums []interface{}
if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil {
keyJSON, err := json.ParseJSON(row.key)
if err != nil {
return err
}
if len(primaryKeyDatums) != len(v.primaryKeyCols) {
return errors.Errorf(`expected primary key columns %s got datums %s`,
v.primaryKeyCols, primaryKeyDatums)
keyJSONAsArray, notArray := keyJSON.AsArray()
if !notArray || len(keyJSONAsArray) != len(v.primaryKeyCols) {
return errors.Errorf(
`notArray: %t expected primary key columns %s got datums %s`,
notArray, v.primaryKeyCols, keyJSONAsArray)
}

var stmtBuf bytes.Buffer
type wrapper struct {
After map[string]interface{} `json:"after"`
valueJSON, err := json.ParseJSON(row.value)
if err != nil {
return err
}
var value wrapper
if err := gojson.Unmarshal([]byte(row.value), &value); err != nil {
afterJSON, err := valueJSON.FetchValKey("after")
if err != nil {
return err
}
if value.After != nil {

if afterJSON != nil && afterJSON.Type() != json.NullJSONType {
// UPDATE or INSERT
fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable)
for col, colValue := range value.After {
iter, err := afterJSON.ObjectIter()
if err != nil {
return err
}
colNames := make([]string, 0)
for iter.Next() {
colNames = append(colNames, iter.Key())
}

typeofCol, err := v.fetchTableColTypes(v.origTable, row.updated)
for _, colValue := range colNames {
if len(args) != 0 {
stmtBuf.WriteString(`,`)
}
stmtBuf.WriteString(col)
args = append(args, colValue)
stmtBuf.WriteString(colValue)
if err != nil {
return err
}
colType := typeofCol[colValue]
jsonValue, err := afterJSON.FetchValKey(colValue)
if err != nil {
return err
}
evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
str, _ := jsonValue.AsText()
if str != nil {
datum, _ := rowenc.ParseDatumStringAs(context.Background(), colType, *str, evalCtx, nil)
args = append(args, datum)
} else {
args = append(args, nil)
}
}
for i := len(value.After) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ {

for i := len(colNames) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ {
fmt.Fprintf(&stmtBuf, `, test%d`, i)
args = append(args, nil)
}
Expand All @@ -557,36 +655,23 @@ func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
}
stmtBuf.WriteString(`)`)

// Also verify that the key matches the value.
primaryKeyDatums = make([]interface{}, len(v.primaryKeyCols))
for idx, primaryKeyCol := range v.primaryKeyCols {
primaryKeyDatums[idx] = value.After[primaryKeyCol]
}
primaryKeyJSON, err := gojson.Marshal(primaryKeyDatums)
if err != nil {
return err
}

rowKey := row.key
if len(primaryKeyDatums) > 1 {
// format the key using the Go marshaller; otherwise, differences
// in formatting could lead to the comparison below failing
rowKey = asGoJSON(row.key)
}
if string(primaryKeyJSON) != rowKey {
v.failures = append(v.failures,
fmt.Sprintf(`key %s did not match expected key %s for value %s`,
rowKey, primaryKeyJSON, row.value))
for _, primaryKeyCol := range v.primaryKeyCols {
jsonValue, _ := afterJSON.FetchValKey(primaryKeyCol)
str, _ := jsonValue.AsText()
if !strings.Contains(row.key, *str) {
v.failures = append(v.failures,
fmt.Sprintf(`key %v did not match expected key %s for value %s`, *str, row.key, row.value))
}
}
} else {
// DELETE
fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable)
for i, datum := range primaryKeyDatums {
for i, datum := range keyJSONAsArray {
if len(args) != 0 {
stmtBuf.WriteString(` AND `)
}
fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1)
args = append(args, datum)
fmt.Fprintf(&stmtBuf, `to_json(%s)::text = $%d`, v.primaryKeyCols[i], i+1)
args = append(args, datum.String())
}
}

Expand Down
Loading

0 comments on commit 47a979e

Please sign in to comment.