diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index a3dcfbcc1c18..63d5c9098642 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -22,11 +22,15 @@ go_library( "//pkg/kv/kvpb", "//pkg/kv/kvserver", "//pkg/roachpb", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/descs", "//pkg/sql/isql", + "//pkg/sql/rowenc", + "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", + "//pkg/sql/types", "//pkg/testutils/sqlutils", "//pkg/util", "//pkg/util/fsm", @@ -36,6 +40,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_lib_pq//oid", "@com_github_linkedin_goavro_v2//:goavro", "@com_github_stretchr_testify//require", ], @@ -56,6 +61,7 @@ go_test( "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", + "//pkg/sql/types", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", @@ -63,6 +69,7 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/randutil", + "@com_github_lib_pq//oid", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index ba5f566c3a76..da642e00d94b 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -194,6 +194,11 @@ func RunNemesis( }, } + if nOp.EnableFpValidator { + // Fingerprint validator doesn't support user defined types. + ns.eventMix[eventCreateEnum{}] = 0 + } + // Create the table and set up some initial splits. if _, err := db.Exec(`CREATE TABLE foo (id INT PRIMARY KEY, ts STRING DEFAULT '0')`); err != nil { return nil, err diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index f7a2322a62c0..1db15999926a 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -7,13 +7,20 @@ package cdctest import ( "bytes" + "context" gosql "database/sql" gojson "encoding/json" "fmt" + "github.com/lib/pq/oid" "sort" + "strconv" "strings" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/errors" @@ -510,6 +517,68 @@ func (v *FingerprintValidator) NoteRow( return nil } +func (v *FingerprintValidator) fetchTableColTypes( + tableName string, updated hlc.Timestamp, +) (map[string]*types.T, error) { + parts := strings.Split(tableName, ".") + var table string + switch len(parts) { + case 1: + table = parts[0] + case 2: + _ = parts[0] + "." + table = parts[1] + default: + return nil, errors.Errorf("could not parse table %s", parts) + } + + colToType := make(map[string]*types.T) + if err := v.sqlDBFunc(func(db *gosql.DB) error { + var rows *gosql.Rows + queryStr := fmt.Sprintf(`SELECT a.attname AS column_name, t.oid AS type_oid, t.typname AS type_name + FROM pg_attribute a JOIN pg_type t ON a.atttypid = t.oid AS OF SYSTEM TIME '%s' + WHERE a.attrelid = $1::regclass AND a.attnum > 0`, updated.AsOfSystemTime()) + rows, err := db.Query(queryStr, table) + if err != nil { + return err + } + defer func() { _ = rows.Close() }() + type result struct { + keyColumn string + oid string + typeName string + } + + var results []result + for rows.Next() { + var keyColumn, oidStr, typeName string + if err := rows.Scan(&keyColumn, &oidStr, &typeName); err != nil { + return err + } + if err := rows.Err(); err != nil { + return err + } + results = append(results, result{ + keyColumn: keyColumn, + oid: oidStr, + typeName: typeName, + }) + oidNum, err := strconv.Atoi(oidStr) + if err != nil { + return err + } + colToType[keyColumn] = types.OidToType[oid.Oid(oidNum)] + } + if len(results) == 0 { + return errors.Errorf("no columns found for table %s", table) + } + return nil + }); err != nil { + return nil, err + } + return colToType, nil +} + // applyRowUpdate applies the update represented by `row` to the scratch table. func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) { defer func() { @@ -517,34 +586,67 @@ func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) { }() var args []interface{} - var primaryKeyDatums []interface{} - if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil { + keyJSON, err := json.ParseJSON(row.key) + if err != nil { return err } - if len(primaryKeyDatums) != len(v.primaryKeyCols) { - return errors.Errorf(`expected primary key columns %s got datums %s`, - v.primaryKeyCols, primaryKeyDatums) + keyJSONAsArray, notArray := keyJSON.AsArray() + if !notArray || len(keyJSONAsArray) != len(v.primaryKeyCols) { + return errors.Errorf( + `notArray: %t expected primary key columns %s got datums %s`, + notArray, v.primaryKeyCols, keyJSONAsArray) } var stmtBuf bytes.Buffer - type wrapper struct { - After map[string]interface{} `json:"after"` + valueJSON, err := json.ParseJSON(row.value) + if err != nil { + return err } - var value wrapper - if err := gojson.Unmarshal([]byte(row.value), &value); err != nil { + afterJSON, err := valueJSON.FetchValKey("after") + if err != nil { return err } - if value.After != nil { + + if afterJSON != nil && afterJSON.Type() != json.NullJSONType { // UPDATE or INSERT fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable) - for col, colValue := range value.After { + iter, err := afterJSON.ObjectIter() + if err != nil { + return err + } + colNames := make([]string, 0) + for iter.Next() { + colNames = append(colNames, iter.Key()) + } + + typeofCol, err := v.fetchTableColTypes(v.origTable, row.updated) + for _, colValue := range colNames { if len(args) != 0 { stmtBuf.WriteString(`,`) } - stmtBuf.WriteString(col) - args = append(args, colValue) + stmtBuf.WriteString(colValue) + if err != nil { + return err + } + colType, exists := typeofCol[colValue] + if !exists { + return errors.Errorf("column %s not found in table %s", colValue, v.origTable) + } + jsonValue, err := afterJSON.FetchValKey(colValue) + if err != nil { + return err + } + evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) + str, _ := jsonValue.AsText() + if str != nil { + datum, _ := rowenc.ParseDatumStringAs(context.Background(), colType, *str, evalCtx, nil) + args = append(args, datum) + } else { + args = append(args, nil) + } } - for i := len(value.After) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ { + + for i := len(colNames) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ { fmt.Fprintf(&stmtBuf, `, test%d`, i) args = append(args, nil) } @@ -557,41 +659,39 @@ func (v *FingerprintValidator) applyRowUpdate(row validatorRow) (_err error) { } stmtBuf.WriteString(`)`) - // Also verify that the key matches the value. - primaryKeyDatums = make([]interface{}, len(v.primaryKeyCols)) - for idx, primaryKeyCol := range v.primaryKeyCols { - primaryKeyDatums[idx] = value.After[primaryKeyCol] - } - primaryKeyJSON, err := gojson.Marshal(primaryKeyDatums) - if err != nil { - return err - } - - rowKey := row.key - if len(primaryKeyDatums) > 1 { - // format the key using the Go marshaller; otherwise, differences - // in formatting could lead to the comparison below failing - rowKey = asGoJSON(row.key) - } - if string(primaryKeyJSON) != rowKey { - v.failures = append(v.failures, - fmt.Sprintf(`key %s did not match expected key %s for value %s`, - rowKey, primaryKeyJSON, row.value)) - } + //for _, primaryKeyCol := range v.primaryKeyCols { + // jsonValue, err := afterJSON.FetchValKey(primaryKeyCol) + // if err != nil { + // return err + // } + // if jsonValue.Type() == json.NullJSONType { + // continue + // } + // str, err := jsonValue.AsText() + // if err != nil { + // return err + // } + // if str == nil || !strings.Contains(row.key, *str) { + // v.failures = append(v.failures, + // fmt.Sprintf(`key %v did not match expected key %s for value %s`, *str, row.key, row.value)) + // } + //} } else { // DELETE fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable) - for i, datum := range primaryKeyDatums { + for i, datum := range keyJSONAsArray { if len(args) != 0 { stmtBuf.WriteString(` AND `) } - fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1) - args = append(args, datum) + fmt.Fprintf(&stmtBuf, `to_json(%s)::text = $%d`, v.primaryKeyCols[i], i+1) + args = append(args, datum.String()) } } return v.sqlDBFunc(func(db *gosql.DB) error { + fmt.Println("does not support>??") _, err := db.Exec(stmtBuf.String(), args...) + fmt.Println("err here: ", err, args) return err }) }