Skip to content

Commit

Permalink
cdctest: fix fingerprint validator for most types
Browse files Browse the repository at this point in the history
Previously, fingerprint validator parses json message and passes the parsed data
into argument $1 for UPSERT, INSERT. This usually works,
but gojson.Unmarshal does not work well with all data types. This patch changes
fingerprint validator to parse the data by referring to the table info for their
column types and parses it using datums. Note that this workaround doesn't support
user-created types enums due to its lack of OidToType utility.

Resolves: #134159
Release note: none
  • Loading branch information
wenyihu6 committed Jan 16, 2025
1 parent b103f98 commit 3502e6f
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 17 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ go_library(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descs",
"//pkg/sql/isql",
"//pkg/sql/rowenc",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/testutils/sqlutils",
"//pkg/util",
"//pkg/util/fsm",
Expand All @@ -36,6 +40,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_stretchr_testify//require",
],
Expand All @@ -56,13 +61,15 @@ go_test(
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//require",
],
)
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ func RunNemesis(
},
}

if nOp.EnableFpValidator {
// Fingerprint validator doesn't support user defined types.
ns.eventMix[eventCreateEnum{}] = 0
}

// Create the table and set up some initial splits.
if _, err := db.Exec(`CREATE TABLE foo (id INT PRIMARY KEY, ts STRING DEFAULT '0')`); err != nil {
return nil, err
Expand Down
152 changes: 135 additions & 17 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@ package cdctest

import (
"bytes"
"context"
gosql "database/sql"
gojson "encoding/json"
"fmt"
"github.com/lib/pq/oid"
"sort"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -510,41 +517,136 @@ func (v *FingerprintValidator) NoteRow(
return nil
}

func (v *FingerprintValidator) fetchTableColTypes(
tableName string, updated hlc.Timestamp,
) (map[string]*types.T, error) {
parts := strings.Split(tableName, ".")
var table string
switch len(parts) {
case 1:
table = parts[0]
case 2:
_ = parts[0] + "."
table = parts[1]
default:
return nil, errors.Errorf("could not parse table %s", parts)
}

colToType := make(map[string]*types.T)
if err := v.sqlDBFunc(func(db *gosql.DB) error {
var rows *gosql.Rows
queryStr := fmt.Sprintf(`SELECT a.attname AS column_name, t.oid AS type_oid, t.typname AS type_name
FROM pg_attribute a JOIN pg_type t ON a.atttypid = t.oid AS OF SYSTEM TIME '%s'
WHERE a.attrelid = $1::regclass AND a.attnum > 0`, updated.AsOfSystemTime())
rows, err := db.Query(queryStr, table)
if err != nil {
return err
}
defer func() { _ = rows.Close() }()
type result struct {
keyColumn string
oid string
typeName string
}

var results []result
for rows.Next() {
var keyColumn, oidStr, typeName string
if err := rows.Scan(&keyColumn, &oidStr, &typeName); err != nil {
return err
}
if err := rows.Err(); err != nil {
return err
}
results = append(results, result{
keyColumn: keyColumn,
oid: oidStr,
typeName: typeName,
})
oidNum, err := strconv.Atoi(oidStr)
if err != nil {
return err
}
colToType[keyColumn] = types.OidToType[oid.Oid(oidNum)]
}
if len(results) == 0 {
return errors.Errorf("no columns found for table %s", table)
}
return nil
}); err != nil {
return nil, err
}
return colToType, nil
}

// applyRowUpdate applies the update represented by `row` to the scratch table.
func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
defer func() {
_err = errors.Wrap(_err, "FingerprintValidator failed")
}()

var args []interface{}
var primaryKeyDatums []interface{}
if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil {
keyJSON, err := json.ParseJSON(row.key)
if err != nil {
return err
}
if len(primaryKeyDatums) != len(v.primaryKeyCols) {
return errors.Errorf(`expected primary key columns %s got datums %s`,
v.primaryKeyCols, primaryKeyDatums)
keyJSONAsArray, notArray := keyJSON.AsArray()
if !notArray || len(keyJSONAsArray) != len(v.primaryKeyCols) {
return errors.Errorf(
`notArray: %t expected primary key columns %s got datums %s`,
notArray, v.primaryKeyCols, keyJSONAsArray)
}

var stmtBuf bytes.Buffer
type wrapper struct {
After map[string]interface{} `json:"after"`
valueJSON, err := json.ParseJSON(row.value)
if err != nil {
return err
}
var value wrapper
if err := gojson.Unmarshal([]byte(row.value), &value); err != nil {
afterJSON, err := valueJSON.FetchValKey("after")
if err != nil {
return err
}
if value.After != nil {

if afterJSON != nil && afterJSON.Type() != json.NullJSONType {
// UPDATE or INSERT
fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable)
for col, colValue := range value.After {
iter, err := afterJSON.ObjectIter()
if err != nil {
return err
}
colNames := make([]string, 0)
for iter.Next() {
colNames = append(colNames, iter.Key())
}

typeofCol, err := v.fetchTableColTypes(v.origTable, row.updated)
for _, colValue := range colNames {
if len(args) != 0 {
stmtBuf.WriteString(`,`)
}
stmtBuf.WriteString(col)
args = append(args, colValue)
stmtBuf.WriteString(colValue)
if err != nil {
return err
}
colType, exists := typeofCol[colValue]
if !exists {
return errors.Errorf("column %s not found in table %s", colValue, v.origTable)
}
jsonValue, err := afterJSON.FetchValKey(colValue)
if err != nil {
return err
}
evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
str, _ := jsonValue.AsText()
if str != nil {
datum, _ := rowenc.ParseDatumStringAs(context.Background(), colType, *str, evalCtx, nil)
args = append(args, datum)
} else {
args = append(args, nil)
}
}
for i := len(value.After) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ {

for i := len(colNames) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ {
fmt.Fprintf(&stmtBuf, `, test%d`, i)
args = append(args, nil)
}
Expand All @@ -557,7 +659,23 @@ func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
}
stmtBuf.WriteString(`)`)

// TODO: add validation for primary key columns
// Also verify that the key matches the value.
var primaryKeyDatums []interface{}
if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil {
return err
}
if len(primaryKeyDatums) != len(v.primaryKeyCols) {
return errors.Errorf(`expected primary key columns %s got datums %s`,
v.primaryKeyCols, primaryKeyDatums)
}
type wrapper struct {
After map[string]interface{} `json:"after"`
}
var value wrapper
if err := gojson.Unmarshal([]byte(row.value), &value); err != nil {
return err
}
primaryKeyDatums = make([]interface{}, len(v.primaryKeyCols))
for idx, primaryKeyCol := range v.primaryKeyCols {
primaryKeyDatums[idx] = value.After[primaryKeyCol]
Expand All @@ -581,12 +699,12 @@ func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) {
} else {
// DELETE
fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable)
for i, datum := range primaryKeyDatums {
for i, datum := range keyJSONAsArray {
if len(args) != 0 {
stmtBuf.WriteString(` AND `)
}
fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1)
args = append(args, datum)
fmt.Fprintf(&stmtBuf, `to_json(%s)::text = $%d`, v.primaryKeyCols[i], i+1)
args = append(args, datum.String())
}
}

Expand Down

0 comments on commit 3502e6f

Please sign in to comment.