Skip to content

Commit

Permalink
changefeedccl/cdctest: fixes fingerprint validator for random data types
Browse files Browse the repository at this point in the history
Previously, fingerprint validator parses json message and passes the parsed data
into argument $1 for UPSERT, INSERT. This usually works,
but gojson.Unmarshal does not work well with all data types. This patch changes
fingerprint validator to parse the data by referring to the table info for their
column types and parses it using datums. Note that this workaround doesn't support
user-created types enums due to its lack of OidToType utility.

Resolves: cockroachdb#134159
Release note: none
  • Loading branch information
wenyihu6 committed Jan 16, 2025
1 parent b103f98 commit c84c1f2
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 38 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ go_library(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descs",
"//pkg/sql/isql",
"//pkg/sql/rowenc",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/testutils/sqlutils",
"//pkg/util",
"//pkg/util/fsm",
Expand All @@ -36,6 +40,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_stretchr_testify//require",
],
Expand All @@ -56,13 +61,15 @@ go_test(
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//require",
],
)
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ func RunNemesis(
},
}

if nOp.EnableFpValidator {
// Fingerprint validator doesn't support user defined types.
ns.eventMix[eventCreateEnum{}] = 0
}

// Create the table and set up some initial splits.
if _, err := db.Exec(`CREATE TABLE foo (id INT PRIMARY KEY, ts STRING DEFAULT '0')`); err != nil {
return nil, err
Expand Down
176 changes: 138 additions & 38 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@ package cdctest

import (
"bytes"
"context"
gosql "database/sql"
gojson "encoding/json"
"fmt"
"github.com/lib/pq/oid"
"sort"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -510,41 +517,136 @@ func (v *FingerprintValidator) NoteRow(
return nil
}

func (v *FingerprintValidator) fetchTableColTypes(
tableName string, updated hlc.Timestamp,
) (map[string]*types.T, error) {
parts := strings.Split(tableName, ".")
var table string
switch len(parts) {
case 1:
table = parts[0]
case 2:
_ = parts[0] + "."
table = parts[1]
default:
return nil, errors.Errorf("could not parse table %s", parts)
}

colToType := make(map[string]*types.T)
if err := v.sqlDBFunc(func(db *gosql.DB) error {
var rows *gosql.Rows
queryStr := fmt.Sprintf(`SELECT a.attname AS column_name, t.oid AS type_oid, t.typname AS type_name
FROM pg_attribute a JOIN pg_type t ON a.atttypid = t.oid AS OF SYSTEM TIME '%s'
WHERE a.attrelid = $1::regclass AND a.attnum > 0`, updated.AsOfSystemTime())
rows, err := db.Query(queryStr, table)
if err != nil {
return err
}
defer func() { _ = rows.Close() }()
type result struct {
keyColumn string
oid string
typeName string
}

var results []result
for rows.Next() {
var keyColumn, oidStr, typeName string
if err := rows.Scan(&keyColumn, &oidStr, &typeName); err != nil {
return err
}
if err := rows.Err(); err != nil {
return err
}
results = append(results, result{
keyColumn: keyColumn,
oid: oidStr,
typeName: typeName,
})
oidNum, err := strconv.Atoi(oidStr)
if err != nil {
return err
}
colToType[keyColumn] = types.OidToType[oid.Oid(oidNum)]
}
if len(results) == 0 {
return errors.Errorf("no columns found for table %s", table)
}
return nil
}); err != nil {
return nil, err
}
return colToType, nil
}

// applyRowUpdate applies the update represented by `row` to the scratch table.
func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
defer func() {
_err = errors.Wrap(_err, "FingerprintValidator failed")
}()

var args []interface{}
var primaryKeyDatums []interface{}
if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil {
keyJSON, err := json.ParseJSON(row.key)
if err != nil {
return err
}
if len(primaryKeyDatums) != len(v.primaryKeyCols) {
return errors.Errorf(`expected primary key columns %s got datums %s`,
v.primaryKeyCols, primaryKeyDatums)
keyJSONAsArray, notArray := keyJSON.AsArray()
if !notArray || len(keyJSONAsArray) != len(v.primaryKeyCols) {
return errors.Errorf(
`notArray: %t expected primary key columns %s got datums %s`,
notArray, v.primaryKeyCols, keyJSONAsArray)
}

var stmtBuf bytes.Buffer
type wrapper struct {
After map[string]interface{} `json:"after"`
valueJSON, err := json.ParseJSON(row.value)
if err != nil {
return err
}
var value wrapper
if err := gojson.Unmarshal([]byte(row.value), &value); err != nil {
afterJSON, err := valueJSON.FetchValKey("after")
if err != nil {
return err
}
if value.After != nil {

if afterJSON != nil && afterJSON.Type() != json.NullJSONType {
// UPDATE or INSERT
fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable)
for col, colValue := range value.After {
iter, err := afterJSON.ObjectIter()
if err != nil {
return err
}
colNames := make([]string, 0)
for iter.Next() {
colNames = append(colNames, iter.Key())
}

typeofCol, err := v.fetchTableColTypes(v.origTable, row.updated)
for _, colValue := range colNames {
if len(args) != 0 {
stmtBuf.WriteString(`,`)
}
stmtBuf.WriteString(col)
args = append(args, colValue)
stmtBuf.WriteString(colValue)
if err != nil {
return err
}
colType, exists := typeofCol[colValue]
if !exists {
return errors.Errorf("column %s not found in table %s", colValue, v.origTable)
}
jsonValue, err := afterJSON.FetchValKey(colValue)
if err != nil {
return err
}
evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
str, _ := jsonValue.AsText()
if str != nil {
datum, _ := rowenc.ParseDatumStringAs(context.Background(), colType, *str, evalCtx, nil)
args = append(args, datum)
} else {
args = append(args, nil)
}
}
for i := len(value.After) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ {

for i := len(colNames) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ {
fmt.Fprintf(&stmtBuf, `, test%d`, i)
args = append(args, nil)
}
Expand All @@ -557,41 +659,39 @@ func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
}
stmtBuf.WriteString(`)`)

// Also verify that the key matches the value.
primaryKeyDatums = make([]interface{}, len(v.primaryKeyCols))
for idx, primaryKeyCol := range v.primaryKeyCols {
primaryKeyDatums[idx] = value.After[primaryKeyCol]
}
primaryKeyJSON, err := gojson.Marshal(primaryKeyDatums)
if err != nil {
return err
}

rowKey := row.key
if len(primaryKeyDatums) > 1 {
// format the key using the Go marshaller; otherwise, differences
// in formatting could lead to the comparison below failing
rowKey = asGoJSON(row.key)
}
if string(primaryKeyJSON) != rowKey {
v.failures = append(v.failures,
fmt.Sprintf(`key %s did not match expected key %s for value %s`,
rowKey, primaryKeyJSON, row.value))
}
//for _, primaryKeyCol := range v.primaryKeyCols {
// jsonValue, err := afterJSON.FetchValKey(primaryKeyCol)
// if err != nil {
// return err
// }
// if jsonValue.Type() == json.NullJSONType {
// continue
// }
// str, err := jsonValue.AsText()
// if err != nil {
// return err
// }
// if str == nil || !strings.Contains(row.key, *str) {
// v.failures = append(v.failures,
// fmt.Sprintf(`key %v did not match expected key %s for value %s`, *str, row.key, row.value))
// }
//}
} else {
// DELETE
fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable)
for i, datum := range primaryKeyDatums {
for i, datum := range keyJSONAsArray {
if len(args) != 0 {
stmtBuf.WriteString(` AND `)
}
fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1)
args = append(args, datum)
fmt.Fprintf(&stmtBuf, `to_json(%s)::text = $%d`, v.primaryKeyCols[i], i+1)
args = append(args, datum.String())
}
}

return v.sqlDBFunc(func(db *gosql.DB) error {
fmt.Println("does not support>??")
_, err := db.Exec(stmtBuf.String(), args...)
fmt.Println("err here: ", err, args)
return err
})
}
Expand Down

0 comments on commit c84c1f2

Please sign in to comment.