Skip to content

Commit

Permalink
Merge pull request #139251 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-25.1-139203

release-25.1: scexec: treat BatchTimestampBeforeGCError as permanent
  • Loading branch information
rafiss authored Jan 16, 2025
2 parents 9b04836 + eaedea4 commit 387f570
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 105 deletions.
1 change: 1 addition & 0 deletions pkg/jobs/jobsprotectedts/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/sql/isql",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
Expand Down
19 changes: 13 additions & 6 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -107,22 +108,28 @@ func (p *Manager) TryToProtectBeforeGC(
protectedTSInstallCancel := make(chan struct{})
var unprotectCallback Cleaner
waitGrp.GoCtx(func(ctx context.Context) error {
// If we are starting up the system config can be nil, we are okay letting
// the job restart, due to the GC interval and lack of protected timestamp.
// If we are starting up, the system config can be nil. We are okay letting
// the job restart due to the GC interval and lack of protected timestamp.
systemConfig := p.systemConfig.GetSystemConfig()
if systemConfig == nil {
return nil
}
// Determine what the GC interval is on the table, which will help us
// figure out when to apply a protected timestamp, as a percentage of this
// time.
// figure out when to apply a protected timestamp as a percentage of the
// time until GC can occur.
zoneCfg, err := systemConfig.GetZoneConfigForObject(p.codec,
config.ObjectID(tableDesc.GetID()))
if err != nil {
return err
}
waitBeforeProtectedTS := time.Duration((time.Duration(zoneCfg.GC.TTLSeconds) * time.Second).Seconds() *
timedProtectTimeStampGCPct)
waitBeforeProtectedTS := time.Duration(0)
now := timeutil.Now()
readAsOfTime := timeutil.Unix(0, readAsOf.WallTime)
gcTTL := time.Duration(zoneCfg.GC.TTLSeconds) * time.Second
if readAsOfTime.Add(gcTTL).After(now) {
timeUntilGC := readAsOfTime.Add(gcTTL).Sub(now)
waitBeforeProtectedTS = time.Duration(float64(timeUntilGC) * timedProtectTimeStampGCPct)
}

select {
case <-time.After(waitBeforeProtectedTS):
Expand Down
277 changes: 186 additions & 91 deletions pkg/sql/backfill_protected_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package sql_test
import (
"context"
gosql "database/sql"
"fmt"
"regexp"
"strings"
"sync"
Expand All @@ -23,6 +24,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -220,10 +223,10 @@ func TestValidationWithProtectedTS(t *testing.T) {
}
}

// TestBackfillQueryWithProtectedTS backfills a query into a table and confirms
// that a protected timestamp is setup. It also confirms that if the protected
// timestamp is not ready in time we do not infinitely retry.
func TestBackfillQueryWithProtectedTS(t *testing.T) {
// TestBackfillWithProtectedTS runs operations that backfill into a table and
// confirms that a protected timestamp is setup. It also confirms that if the
// protected timestamp is not ready in time we do not infinitely retry.
func TestBackfillWithProtectedTS(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -244,26 +247,68 @@ func TestBackfillQueryWithProtectedTS(t *testing.T) {
SQLEvalContext: &eval.TestingKnobs{
ForceProductionValues: true,
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx2 context.Context, request *kvpb.BatchRequest) *kvpb.Error {
// Detect the first operation on the backfill operation, this is before
// the PTS is setup, so the query will fail.
if blockBackFillsForPTSFailure.Load() &&
request.Txn != nil &&
request.Txn.Name == "schemaChangerBackfill" {
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
RunBeforeBackfill: func() error {
// Cause the backfill to pause before adding the protected
// timestamp. This knob is for testing schema changes that
// are on the declarative schema changer.
if blockBackFillsForPTSFailure.Load() {
if !blockBackFillsForPTSFailure.Swap(false) {
return nil
}
backfillQueryWait <- struct{}{}
<-backfillQueryResume
}
return nil
},
},
DistSQL: &execinfra.TestingKnobs{
RunBeforeBackfillChunk: func(sp roachpb.Span) error {
// Cause the backfill to pause after it already began running
// and has installed a protected timestamp. This knob is for
// testing schema changes that use the index backfiller.
if blockBackFillsForPTSCheck.Load() {
_, prefix, err := s.Codec().DecodeTablePrefix(sp.Key)
if err != nil || prefix != tableID {
//nolint:returnerrcheck
return nil
}
if !blockBackFillsForPTSCheck.Swap(false) {
return nil
}
backfillQueryWait <- struct{}{}
<-backfillQueryResume
}
return nil
},
},
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeQueryBackfill: func() error {
// Cause the backfill to pause before adding the protected
// timestamp. This knob is for testing CREATE MATERIALIZED VIEW.
if blockBackFillsForPTSFailure.Load() {
if !blockBackFillsForPTSFailure.Swap(false) {
return nil
}
backfillQueryWait <- struct{}{}
<-backfillQueryResume
}
// Detect the first scan on table from the backfill, this is after the
// PTS has been set-up.
return nil
},
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error {
// Detect the first scan on table from the backfill, which is
// after the PTS has been set up. This knob is for testing CREATE
// MATERIALIZED VIEW.
if blockBackFillsForPTSCheck.Load() &&
request.Txn != nil &&
request.Txn.Name == "schemaChangerBackfill" &&
request.Requests[0].GetInner().Method() == kvpb.Scan {
scan := request.Requests[0].GetScan()
_, prefix, err := s.Codec().DecodeTablePrefix(scan.Key)
if err != nil || prefix != tableID {
//nolint:returnerrcheck
return nil
}
if !blockBackFillsForPTSCheck.Swap(false) {
Expand All @@ -287,7 +332,7 @@ func TestBackfillQueryWithProtectedTS(t *testing.T) {
rSys := sqlutils.MakeSQLRunner(systemSqlDb)

// Refreshes the in-memory protected timestamp state to asOf.
refreshTo := func(t *testing.T, tableKey roachpb.Key, asOf hlc.Timestamp) error {
refreshTo := func(ctx context.Context, tableKey roachpb.Key, asOf hlc.Timestamp) error {
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
if err != nil {
return err
Expand All @@ -310,7 +355,7 @@ func TestBackfillQueryWithProtectedTS(t *testing.T) {
return repl.ReadProtectedTimestampsForTesting(ctx)
}
// Refresh forces the PTS cache to update to at least asOf.
refreshPTSCacheTo := func(t *testing.T, asOf hlc.Timestamp) error {
refreshPTSCacheTo := func(ctx context.Context, asOf hlc.Timestamp) error {
ptp := ts.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
return ptp.Refresh(ctx, asOf)
}
Expand All @@ -322,87 +367,137 @@ func TestBackfillQueryWithProtectedTS(t *testing.T) {
} {
rSys.Exec(t, sql)
}
for _, sql := range []string{
"SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false",
"ALTER DATABASE defaultdb CONFIGURE ZONE USING gc.ttlseconds = 1",
"CREATE TABLE t(n int)",
"ALTER TABLE t CONFIGURE ZONE USING range_min_bytes = 0, range_max_bytes = 67108864, gc.ttlseconds = 1",
"INSERT INTO t(n) SELECT * FROM generate_series(1, 500000)",
} {
r.Exec(t, sql)
}

getTableID := func() (tableID uint32) {
r.QueryRow(t, `SELECT table_id FROM crdb_internal.tables`+
` WHERE name = 't' AND database_name = current_database()`).Scan(&tableID)
return tableID
}
tableID = getTableID()
tableKey := ts.Codec().TablePrefix(tableID)
const initialRowCount = 500000
const rowsDeletedPerIteration = 200000
const rowsAddedPerIteration = 1

grp := ctxgroup.WithContext(ctx)
grp.Go(func() error {
// We are going to do this twice, first to cause a PTS related failure,
// and a second time for the successful case. The first time we will cause
// the GC to happen before the PTS is setup. The second time we will allow
// the PTS to be installed and then cause the GC.
for i := 0; i < 2; i++ {
<-backfillQueryWait
if _, err := db.ExecContext(ctx, "SET sql_safe_updates=off"); err != nil {
return err
}
if _, err := db.ExecContext(ctx, "BEGIN; DELETE FROM t LIMIT 250000; INSERT INTO t VALUES('9999999'); COMMIT"); err != nil {
return err
}
if err := refreshTo(t, tableKey, ts.Clock().Now()); err != nil {
return err
for _, tc := range []struct {
name string
backfillSchemaChange string
jobDescriptionPrefix string
postTestQuery string
expectedCount int
}{
{
name: "create materialized view",
backfillSchemaChange: "CREATE MATERIALIZED VIEW test AS (SELECT n from t)",
jobDescriptionPrefix: "CREATE MATERIALIZED VIEW",
postTestQuery: "SELECT count(*) FROM test",
expectedCount: initialRowCount - rowsDeletedPerIteration + rowsAddedPerIteration,
},
{
name: "create index",
backfillSchemaChange: "CREATE INDEX idx ON t(n)",
jobDescriptionPrefix: "CREATE INDEX idx",
postTestQuery: "SELECT count(*) FROM t@idx",
expectedCount: initialRowCount - 2*rowsDeletedPerIteration + 2*rowsAddedPerIteration,
},
} {
t.Run(tc.name, func(t *testing.T) {
for _, sql := range []string{
"SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false",
"ALTER DATABASE defaultdb CONFIGURE ZONE USING gc.ttlseconds = 1",
"DROP TABLE IF EXISTS t CASCADE",
"CREATE TABLE t(n int)",
"ALTER TABLE t CONFIGURE ZONE USING range_min_bytes = 0, range_max_bytes = 67108864, gc.ttlseconds = 1",
fmt.Sprintf("INSERT INTO t(n) SELECT * FROM generate_series(1, %d)", initialRowCount),
} {
r.Exec(t, sql)
}
if err := refreshPTSCacheTo(t, ts.Clock().Now()); err != nil {
return err

getTableID := func() (tableID uint32) {
r.QueryRow(t, `SELECT 't'::regclass::oid`).Scan(&tableID)
return tableID
}
if _, err := db.ExecContext(ctx, `SELECT crdb_internal.kv_enqueue_replica(range_id, 'mvccGC', true)
tableID = getTableID()
tableKey := ts.Codec().TablePrefix(tableID)

grp := ctxgroup.WithContext(ctx)
grp.GoCtx(func(ctx context.Context) error {
// We are going to do this twice, first to cause a PTS related failure,
// and a second time for the successful case. The first time we will cause
// the GC to happen before the PTS is setup. The second time we will allow
// the PTS to be installed and then cause the GC.
for i := 0; i < 2; i++ {
<-backfillQueryWait
if _, err := db.ExecContext(ctx, "SET sql_safe_updates=off"); err != nil {
return err
}
if _, err := db.ExecContext(ctx, fmt.Sprintf(
"BEGIN; DELETE FROM t LIMIT %d; INSERT INTO t VALUES('9999999'); COMMIT",
rowsDeletedPerIteration,
)); err != nil {
return err
}
if err := refreshTo(ctx, tableKey, ts.Clock().Now()); err != nil {
return err
}
if err := refreshPTSCacheTo(ctx, ts.Clock().Now()); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `
SELECT crdb_internal.kv_enqueue_replica(range_id, 'mvccGC', true)
FROM (SELECT range_id FROM [SHOW RANGES FROM TABLE t] ORDER BY start_key);`); err != nil {
return err
}
row := db.QueryRow("SELECT count(*) FROM system.protected_ts_records WHERE meta_type='jobs'")
var count int
if err := row.Scan(&count); err != nil {
return err
}
// First iteration is before the PTS is setup, so it will be 0. Second
// iteration the PTS should be setup.
expectedCount := i
if count != expectedCount {
return errors.AssertionFailedf("no protected timestamp was set up by the schema change job (expected %d, got : %d)", expectedCount, count)
}
backfillQueryResume <- struct{}{}
}
return nil
})
grp.Go(func() error {
// Backfill with the PTS being not setup early enough, which will
// lead to failure.
blockBackFillsForPTSFailure.Swap(true)
_, err := db.ExecContext(ctx, `CREATE MATERIALIZED VIEW test AS (SELECT n from t)`)
if err == nil || !testutils.IsError(err, "unable to retry backfill since fixed timestamp is before the GC timestamp") {
return errors.AssertionFailedf("expected error was not hit")
}
// Next backfill with the PTS being setup on time, which should always
// succeed.
blockBackFillsForPTSCheck.Swap(true)
_, err = db.ExecContext(ctx, `CREATE MATERIALIZED VIEW test AS (SELECT n from t)`)
return err
})
return err
}
row := db.QueryRowContext(ctx, "SELECT count(*) FROM system.protected_ts_records WHERE meta_type='jobs'")
var count int
if err := row.Scan(&count); err != nil {
return err
}
// First iteration is before the PTS is setup, so it will be 0. Second
// iteration the PTS should be setup.
expectedCount := i
if count != expectedCount {
return errors.AssertionFailedf("no protected timestamp was set up by the schema change job (expected %d, got : %d)", expectedCount, count)
}
backfillQueryResume <- struct{}{}
}
return nil
})
grp.GoCtx(func(ctx context.Context) error {
// Backfill with the PTS being not setup early enough, which will
// lead to failure.
blockBackFillsForPTSFailure.Swap(true)
_, err := db.ExecContext(ctx, tc.backfillSchemaChange)
if err == nil || !testutils.IsError(err, "unable to retry backfill since fixed timestamp is before the GC timestamp") {
return errors.AssertionFailedf("expected error was not hit")
}
testutils.SucceedsSoon(t, func() error {
// Wait until schema change is fully rolled back.
var status string
err = db.QueryRowContext(ctx, fmt.Sprintf(
"SELECT status FROM crdb_internal.jobs WHERE description LIKE '%s%%'",
tc.jobDescriptionPrefix,
)).Scan(&status)
if err != nil {
return err
}
if status != "failed" {
return errors.Newf("schema change not rolled back yet; status=%s", status)
}
return nil
})
// Next backfill with the PTS being setup on time, which should always
// succeed.
blockBackFillsForPTSCheck.Swap(true)
_, err = db.ExecContext(ctx, tc.backfillSchemaChange)
if err != nil {
return err
}
return nil
})

require.NoError(t, grp.Wait())
var rowCount int
res := r.QueryRow(t, `SELECT count(*) FROM test`)
res.Scan(&rowCount)
// Half the row count plus the one row inserted above.
const expectedCount = 250000 + 1
if rowCount != expectedCount {
t.Errorf("expected %d entries, got %d", expectedCount, rowCount)
require.NoError(t, grp.Wait())
var rowCount int
res := r.QueryRow(t, tc.postTestQuery)
res.Scan(&rowCount)
if rowCount != tc.expectedCount {
t.Errorf("expected %d entries, got %d", tc.expectedCount, rowCount)
}
require.Falsef(t, blockBackFillsForPTSFailure.Load(), "no backfill txn was detected in testing knob.")
require.Falsef(t, blockBackFillsForPTSCheck.Load(), "no backfill txn was detected in testing knob.")
})
}
require.NoError(t, db.Close())
require.Equalf(t, false, blockBackFillsForPTSFailure.Load(), "no backfill txn was dected in testing knob.")
}
Loading

0 comments on commit 387f570

Please sign in to comment.