diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index c5c9227af7c9..098454b9aedd 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -6917,7 +6917,7 @@ func TestBackupRestoreTenant(t *testing.T) { `1`, `true`, `system`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeShared)), - `{"capabilities": {}, "deprecatedId": "1"}`, + `{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`, }, }) restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`) @@ -6928,13 +6928,13 @@ func TestBackupRestoreTenant(t *testing.T) { `1`, `true`, `system`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeShared)), - `{"capabilities": {}, "deprecatedId": "1"}`, + `{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`, }, { `10`, `true`, `cluster-10`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeExternal)), - `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10"}`, + `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10", "lastRevertTenantTimestamp": {}}`, }, }, ) @@ -6969,13 +6969,13 @@ func TestBackupRestoreTenant(t *testing.T) { `1`, `true`, `system`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeShared)), - `{"capabilities": {}, "deprecatedId": "1"}`, + `{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`, }, { `10`, `false`, `NULL`, strconv.Itoa(int(mtinfopb.DataStateDrop)), strconv.Itoa(int(mtinfopb.ServiceModeNone)), - `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedDataState": "DROP", "deprecatedId": "10", "droppedName": "cluster-10"}`, + `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedDataState": "DROP", "deprecatedId": "10", "droppedName": "cluster-10", "lastRevertTenantTimestamp": {}}`, }, }, ) @@ -7004,13 +7004,13 @@ func TestBackupRestoreTenant(t *testing.T) { `1`, `true`, `system`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeShared)), - `{"capabilities": {}, "deprecatedId": "1"}`, + `{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`, }, { `10`, `true`, `cluster-10`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeExternal)), - `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10"}`, + `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10", "lastRevertTenantTimestamp": {}}`, }, }, ) @@ -7045,7 +7045,7 @@ func TestBackupRestoreTenant(t *testing.T) { `1`, `true`, `system`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeShared)), - `{"capabilities": {}, "deprecatedId": "1"}`, + `{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`, }, }) restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`) @@ -7056,13 +7056,13 @@ func TestBackupRestoreTenant(t *testing.T) { `1`, `true`, `system`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeShared)), - `{"capabilities": {}, "deprecatedId": "1"}`, + `{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`, }, { `10`, `true`, `cluster-10`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeExternal)), - `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10"}`, + `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10", "lastRevertTenantTimestamp": {}}`, }, }, ) @@ -7085,7 +7085,7 @@ func TestBackupRestoreTenant(t *testing.T) { `1`, `true`, `system`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeShared)), - `{"capabilities": {}, "deprecatedId": "1"}`, + `{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`, }, }) restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/clusterwide'`) @@ -7096,13 +7096,13 @@ func TestBackupRestoreTenant(t *testing.T) { `1`, `true`, `system`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeShared)), - `{"capabilities": {}, "deprecatedId": "1"}`, + `{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`, }, { `10`, `true`, `cluster-10`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeExternal)), - `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10"}`, + `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10", "lastRevertTenantTimestamp": {}}`, }, }, ) @@ -7147,7 +7147,7 @@ func TestBackupRestoreTenant(t *testing.T) { `1`, `true`, `system`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeShared)), - `{"capabilities": {}, "deprecatedId": "1"}`, + `{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`, }, }) restoreDB.Exec(t, `RESTORE FROM 'nodelocal://1/clusterwide' WITH include_all_virtual_clusters`) @@ -7158,25 +7158,25 @@ func TestBackupRestoreTenant(t *testing.T) { `1`, `true`, `system`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeShared)), - `{"capabilities": {}, "deprecatedId": "1"}`, + `{"capabilities": {}, "deprecatedId": "1", "lastRevertTenantTimestamp": {}}`, }, { `10`, `true`, `cluster-10`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeExternal)), - `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10"}`, + `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "10", "lastRevertTenantTimestamp": {}}`, }, { `11`, `true`, `cluster-11`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeExternal)), - `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "11"}`, + `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "11", "lastRevertTenantTimestamp": {}}`, }, { `20`, `true`, `cluster-20`, strconv.Itoa(int(mtinfopb.DataStateReady)), strconv.Itoa(int(mtinfopb.ServiceModeExternal)), - `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "20"}`, + `{"capabilities": {"canUseNodelocalStorage": true}, "deprecatedId": "20", "lastRevertTenantTimestamp": {}}`, }, }, ) diff --git a/pkg/ccl/cmdccl/clusterrepl/main.go b/pkg/ccl/cmdccl/clusterrepl/main.go index d923d7198f59..d7b1c59e10d0 100644 --- a/pkg/ccl/cmdccl/clusterrepl/main.go +++ b/pkg/ccl/cmdccl/clusterrepl/main.go @@ -99,7 +99,7 @@ func streamPartition(ctx context.Context, streamAddr *url.URL) error { return err } - replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(*tenant)) + replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(*tenant), streampb.ReplicationProducerRequest{}) if err != nil { return err } diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index 5d6a25befe05..ef14a1c6106c 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -60,7 +60,7 @@ type Client interface { // Create initializes a stream with the source, potentially reserving any // required resources, such as protected timestamps, and returns an ID which // can be used to interact with this stream in the future. - Create(ctx context.Context, tenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error) + Create(ctx context.Context, tenant roachpb.TenantName, req streampb.ReplicationProducerRequest) (streampb.ReplicationProducerSpec, error) // Destroy informs the source of the stream that it may terminate production // and release resources such as protected timestamps. diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index c95da05afede..36d7b2573b78 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -45,7 +45,7 @@ func (sc testStreamClient) Dial(_ context.Context) error { // Create implements the Client interface. func (sc testStreamClient) Create( - _ context.Context, _ roachpb.TenantName, + _ context.Context, _ roachpb.TenantName, _ streampb.ReplicationProducerRequest, ) (streampb.ReplicationProducerSpec, error) { return streampb.ReplicationProducerSpec{ StreamID: streampb.StreamID(1), @@ -242,7 +242,7 @@ func ExampleClient() { _ = client.Close(ctx) }() - prs, err := client.Create(ctx, "system") + prs, err := client.Create(ctx, "system", streampb.ReplicationProducerRequest{}) if err != nil { panic(err) } diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index 785e12003f69..a47d659c22f6 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -63,14 +63,25 @@ var _ Client = &partitionedStreamClient{} // Create implements Client interface. func (p *partitionedStreamClient) Create( - ctx context.Context, tenantName roachpb.TenantName, + ctx context.Context, tenantName roachpb.TenantName, req streampb.ReplicationProducerRequest, ) (streampb.ReplicationProducerSpec, error) { ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create") defer sp.Finish() p.mu.Lock() defer p.mu.Unlock() + + var row pgx.Row + if !req.ReplicationStartTime.IsEmpty() { + reqBytes, err := protoutil.Marshal(&req) + if err != nil { + return streampb.ReplicationProducerSpec{}, err + } + row = p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1, $2)`, tenantName, reqBytes) + } else { + row = p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantName) + } + var rawReplicationProducerSpec []byte - row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantName) err := row.Scan(&rawReplicationProducerSpec) if err != nil { return streampb.ReplicationProducerSpec{}, errors.Wrapf(err, "error creating replication stream for tenant %s", tenantName) diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go index 148b287550db..8115ce576b2c 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go @@ -147,7 +147,7 @@ func TestPartitionStreamReplicationClientWithNonRunningJobs(t *testing.T) { }) }) t.Run("paused-job", func(t *testing.T) { - rps, err := client.Create(ctx, testTenantName) + rps, err := client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{}) require.NoError(t, err) targetStreamID := rps.StreamID h.SysSQL.Exec(t, `PAUSE JOB $1`, targetStreamID) @@ -173,7 +173,7 @@ func TestPartitionStreamReplicationClientWithNonRunningJobs(t *testing.T) { }) }) t.Run("cancelled-job", func(t *testing.T) { - rps, err := client.Create(ctx, testTenantName) + rps, err := client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{}) require.NoError(t, err) targetStreamID := rps.StreamID h.SysSQL.Exec(t, `CANCEL JOB $1`, targetStreamID) @@ -249,11 +249,11 @@ INSERT INTO d.t2 VALUES (2); [][]string{{string(status)}}) } - rps, err := client.Create(ctx, testTenantName) + rps, err := client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{}) require.NoError(t, err) streamID := rps.StreamID // We can create multiple replication streams for the same tenant. - _, err = client.Create(ctx, testTenantName) + _, err = client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{}) require.NoError(t, err) expectStreamState(streamID, jobs.StatusRunning) @@ -348,7 +348,7 @@ INSERT INTO d.t2 VALUES (2); h.SysSQL.Exec(t, ` SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms'; `) - rps, err = client.Create(ctx, testTenantName) + rps, err = client.Create(ctx, testTenantName, streampb.ReplicationProducerRequest{}) require.NoError(t, err) streamID = rps.StreamID require.NoError(t, client.Complete(ctx, streamID, true)) diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index d2cafd7dcf16..977235fc93b3 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -387,7 +387,7 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top // Create implements the Client interface. func (m *RandomStreamClient) Create( - ctx context.Context, tenantName roachpb.TenantName, + ctx context.Context, tenantName roachpb.TenantName, _ streampb.ReplicationProducerRequest, ) (streampb.ReplicationProducerSpec, error) { log.Infof(ctx, "creating random stream for tenant %s", tenantName) return streampb.ReplicationProducerSpec{ diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index 01f42f2283a8..53b278b36caa 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -33,7 +33,10 @@ import ( "github.com/cockroachdb/errors" ) -const alterReplicationJobOp = "ALTER VIRTUAL CLUSTER REPLICATION" +const ( + alterReplicationJobOp = "ALTER VIRTUAL CLUSTER REPLICATION" + createReplicationOp = "CREATE VIRTUAL CLUSTER FROM REPLICATION" +) var alterReplicationCutoverHeader = colinfo.ResultColumns{ {Name: "cutover_time", Typ: types.Decimal}, @@ -42,11 +45,17 @@ var alterReplicationCutoverHeader = colinfo.ResultColumns{ // ResolvedTenantReplicationOptions represents options from an // evaluated CREATE VIRTUAL CLUSTER FROM REPLICATION command. type resolvedTenantReplicationOptions struct { - retention *int32 + resumeTimestamp hlc.Timestamp + retention *int32 } func evalTenantReplicationOptions( - ctx context.Context, options tree.TenantReplicationOptions, eval exprutil.Evaluator, + ctx context.Context, + options tree.TenantReplicationOptions, + eval exprutil.Evaluator, + evalCtx *eval.Context, + semaCtx *tree.SemaContext, + op string, ) (*resolvedTenantReplicationOptions, error) { r := &resolvedTenantReplicationOptions{} if options.Retention != nil { @@ -65,6 +74,14 @@ func evalTenantReplicationOptions( retSeconds := int32(retSeconds64) r.retention = &retSeconds } + if options.ResumeTimestamp != nil { + ts, err := evalSystemTimeExpr(ctx, evalCtx, semaCtx, options.ResumeTimestamp, op) + if err != nil { + return nil, err + } + r.resumeTimestamp = ts + } + return r, nil } @@ -89,12 +106,19 @@ func alterReplicationJobTypeCheck( ); err != nil { return false, nil, err } + if alterStmt.Options.ResumeTimestamp != nil { + evalCtx := &p.ExtendedEvalContext().Context + if _, err := typeCheckSystemTimeExpr(ctx, evalCtx, + p.SemaCtx(), alterStmt.Options.ResumeTimestamp, alterReplicationJobOp); err != nil { + return false, nil, err + } + } if cutoverTime := alterStmt.Cutover; cutoverTime != nil { if cutoverTime.Timestamp != nil { evalCtx := &p.ExtendedEvalContext().Context - if _, err := typeCheckCutoverTime(ctx, evalCtx, - p.SemaCtx(), cutoverTime.Timestamp); err != nil { + if _, err := typeCheckSystemTimeExpr(ctx, evalCtx, + p.SemaCtx(), cutoverTime.Timestamp, alterReplicationJobOp); err != nil { return false, nil, err } } @@ -131,6 +155,11 @@ func alterReplicationJobHook( "only the system tenant can alter tenant") } + if alterTenantStmt.Options.ResumeTimestamp != nil { + return nil, nil, nil, false, pgerror.New(pgcode.InvalidParameterValue, "resume timestamp cannot be altered") + } + + evalCtx := &p.ExtendedEvalContext().Context var cutoverTime hlc.Timestamp if alterTenantStmt.Cutover != nil { if !alterTenantStmt.Cutover.Latest { @@ -138,8 +167,7 @@ func alterReplicationJobHook( return nil, nil, nil, false, errors.AssertionFailedf("unexpected nil cutover expression") } - evalCtx := &p.ExtendedEvalContext().Context - ct, err := evalCutoverTime(ctx, evalCtx, p.SemaCtx(), alterTenantStmt.Cutover.Timestamp) + ct, err := evalSystemTimeExpr(ctx, evalCtx, p.SemaCtx(), alterTenantStmt.Cutover.Timestamp, alterReplicationJobOp) if err != nil { return nil, nil, nil, false, err } @@ -148,7 +176,7 @@ func alterReplicationJobHook( } exprEval := p.ExprEvaluator(alterReplicationJobOp) - options, err := evalTenantReplicationOptions(ctx, alterTenantStmt.Options, exprEval) + options, err := evalTenantReplicationOptions(ctx, alterTenantStmt.Options, exprEval, evalCtx, p.SemaCtx(), alterReplicationJobOp) if err != nil { return nil, nil, nil, false, err } @@ -292,18 +320,28 @@ func alterTenantOptions( } -func typeCheckCutoverTime( - ctx context.Context, evalCtx *eval.Context, semaCtx *tree.SemaContext, cutoverExpr tree.Expr, +// typeCheckSystemTimeExpr type checks an Expr as a system time. It +// accepts the same types as AS OF SYSTEM TIME expressions and +// functions that evaluate to one of those types. +// +// The types need to be kept in sync with those supported by +// asof.DatumToHLC. +// +// TODO(ssd): AOST and SPLIT are restricted to the use of constant expressions +// or particular follower-read related functions. Do we want to do that here as well? +// One nice side effect of allowing functions is that users can use NOW(). +func typeCheckSystemTimeExpr( + ctx context.Context, + evalCtx *eval.Context, + semaCtx *tree.SemaContext, + systemTimeExpr tree.Expr, + op string, ) (tree.TypedExpr, error) { - typedExpr, err := tree.TypeCheckAndRequire(ctx, cutoverExpr, semaCtx, types.Any, alterReplicationJobOp) + typedExpr, err := tree.TypeCheckAndRequire(ctx, systemTimeExpr, semaCtx, types.Any, op) if err != nil { return nil, err } - // TODO(ssd): AOST and SPLIT are restricted to the use of constant expressions - // or particular follower-read related functions. Do we want to do that here as well? - // One nice side effect of allowing functions is that users can use NOW(). - // These are the types currently supported by asof.DatumToHLC. switch typedExpr.ResolvedType().Family() { case types.IntervalFamily, types.TimestampTZFamily, types.TimestampFamily, types.StringFamily, types.DecimalFamily, types.IntFamily: return typedExpr, nil @@ -312,10 +350,17 @@ func typeCheckCutoverTime( } } -func evalCutoverTime( - ctx context.Context, evalCtx *eval.Context, semaCtx *tree.SemaContext, cutoverExpr tree.Expr, +// evalSystemTimeExpr evaluates an Expr as a system time. It accepts +// the same types as AS OF SYSTEM TIME expressions and functions that +// evaluate to one of those types. +func evalSystemTimeExpr( + ctx context.Context, + evalCtx *eval.Context, + semaCtx *tree.SemaContext, + systemTimeExpr tree.Expr, + op string, ) (hlc.Timestamp, error) { - typedExpr, err := typeCheckCutoverTime(ctx, evalCtx, semaCtx, cutoverExpr) + typedExpr, err := typeCheckSystemTimeExpr(ctx, evalCtx, semaCtx, systemTimeExpr, op) if err != nil { return hlc.Timestamp{}, err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go index 4f6fab1931f5..0d0adc1eab42 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go @@ -110,12 +110,7 @@ func (r *streamIngestManagerImpl) RevertTenantToTimestamp( } // Set the data state to Add during the destructive operation. - // - // TODO(ssd): We likely also want to record the revert - // timestamp in the tenant metadata so that we have a - // record of the revert and so that we can use it to - // verify other operations that may want to start from - // that same timestamp. + tenantRecord.LastRevertTenantTimestamp = revertTo tenantRecord.DataState = mtinfopb.DataStateAdd return sql.UpdateTenantRecord(ctx, r.evalCtx.Settings, txn, tenantRecord) }); err != nil { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 0434d1a836d2..6e23b719969c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -109,7 +109,7 @@ func completeIngestion( ) error { details := ingestionJob.Details().(jobspb.StreamIngestionDetails) log.Infof(ctx, "activating destination tenant %d", details.DestinationTenantID) - if err := activateTenant(ctx, execCtx, details.DestinationTenantID, cutoverTimestamp); err != nil { + if err := activateTenant(ctx, execCtx, details, cutoverTimestamp); err != nil { return err } @@ -271,7 +271,7 @@ func (s *streamIngestionResumer) handleResumeError( func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}) error { // Protect the destination tenant's keyspan from garbage collection. jobExecCtx := execCtx.(sql.JobExecContext) - err := s.protectDestinationTenant(ctx, execCtx) + err := s.protectDestinationTenant(ctx, jobExecCtx) if err != nil { return s.handleResumeError(ctx, jobExecCtx, err) } @@ -305,7 +305,7 @@ func releaseDestinationTenantProtectedTimestamp( // The method persists the ID of the protected timestamp record in the // replication job's Payload. func (s *streamIngestionResumer) protectDestinationTenant( - ctx context.Context, execCtx interface{}, + ctx context.Context, execCtx sql.JobExecContext, ) error { oldDetails := s.job.Details().(jobspb.StreamIngestionDetails) @@ -315,7 +315,7 @@ func (s *streamIngestionResumer) protectDestinationTenant( return nil } - execCfg := execCtx.(sql.JobExecContext).ExecCfg() + execCfg := execCtx.ExecCfg() target := ptpb.MakeTenantsTarget([]roachpb.TenantID{oldDetails.DestinationTenantID}) ptsID := uuid.MakeV4() @@ -474,28 +474,29 @@ func maybeRevertToCutoverTimestamp( func activateTenant( ctx context.Context, - execCtx interface{}, - newTenantID roachpb.TenantID, + execCtx sql.JobExecContext, + details jobspb.StreamIngestionDetails, cutoverTimestamp hlc.Timestamp, ) error { - p := execCtx.(sql.JobExecContext) - execCfg := p.ExecCfg() + execCfg := execCtx.ExecCfg() + return execCfg.InternalDB.Txn(ctx, func( ctx context.Context, txn isql.Txn, ) error { - info, err := sql.GetTenantRecordByID(ctx, txn, newTenantID, execCfg.Settings) + info, err := sql.GetTenantRecordByID(ctx, txn, details.DestinationTenantID, execCfg.Settings) if err != nil { return err } + info.DataState = mtinfopb.DataStateReady info.PhysicalReplicationConsumerJobID = 0 info.PreviousSourceTenant = &mtinfopb.PreviousSourceTenant{ + TenantID: details.SourceTenantID, + ClusterID: details.SourceClusterID, CutoverTimestamp: cutoverTimestamp, } - info.DataState = mtinfopb.DataStateReady - - return sql.UpdateTenantRecord(ctx, p.ExecCfg().Settings, txn, info) + return sql.UpdateTenantRecord(ctx, execCfg.Settings, txn, info) }) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 5ec3e97308eb..a3b7b8bdc7bd 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -10,6 +10,7 @@ package streamingest import ( "context" + gosql "database/sql" "fmt" "net/url" "testing" @@ -28,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -44,44 +46,298 @@ import ( func TestTenantStreamingCreationErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - srcServer, srcDB, _ := serverutils.StartServer(t, base.TestServerArgs{DefaultTestTenant: base.TODOTestTenantDisabled}) - defer srcServer.Stopper().Stop(ctx) - destServer, destDB, _ := serverutils.StartServer(t, base.TestServerArgs{DefaultTestTenant: base.TODOTestTenantDisabled}) - defer destServer.Stopper().Stop(ctx) + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{DefaultTestTenant: base.TestControlsTenantsExplicitly}) + defer srv.Stopper().Stop(context.Background()) + + sysSQL := sqlutils.MakeSQLRunner(db) + sysSQL.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + sysSQL.Exec(t, `SET CLUSTER SETTING physical_replication.enabled = true`) + + srcPgURL, cleanupSink := sqlutils.PGUrl(t, srv.SystemLayer().AdvSQLAddr(), t.Name(), url.User(username.RootUser)) + defer cleanupSink() - srcTenantID := serverutils.TestTenantID() - srcTenantName := roachpb.TenantName("source") - _, srcTenantConn := serverutils.StartTenant(t, srcServer, base.TestTenantArgs{ - TenantID: srcTenantID, - TenantName: srcTenantName, + t.Run("source cannot be system tenant", func(t *testing.T) { + sysSQL.ExpectErr(t, `pq: neither the source tenant "system" nor the destination tenant "dest" \(0\) can be the system tenant`, + "CREATE TENANT dest FROM REPLICATION OF system ON $1", srcPgURL.String()) + }) + t.Run("destination cannot be system tenant", func(t *testing.T) { + sysSQL.ExpectErr(t, `pq: neither the source tenant "source" nor the destination tenant "system" \(0\) can be the system tenant`, + "CREATE TENANT system FROM REPLICATION OF source ON $1", srcPgURL.String()) + }) + t.Run("destination cannot exist without resume timestamp", func(t *testing.T) { + sysSQL.Exec(t, "CREATE TENANT foo") + sysSQL.ExpectErr(t, "pq: tenant with name \"foo\" already exists", + "CREATE TENANT foo FROM REPLICATION OF source ON $1", srcPgURL.String()) + }) + t.Run("destination tenant cannot be online", func(t *testing.T) { + sysSQL.Exec(t, "CREATE TENANT bar") + sysSQL.Exec(t, "ALTER VIRTUAL CLUSTER bar START SERVICE SHARED") + sysSQL.ExpectErr(t, "service mode must be none", + "CREATE TENANT bar FROM REPLICATION OF source ON $1 WITH RESUME TIMESTAMP = now()", srcPgURL.String()) + }) + t.Run("destination tenant must have known revert timestamp", func(t *testing.T) { + sysSQL.Exec(t, "CREATE TENANT baz") + sysSQL.ExpectErr(t, "no last revert timestamp found", + "CREATE TENANT baz FROM REPLICATION OF source ON $1 WITH RESUME TIMESTAMP = now()", srcPgURL.String()) }) - defer func() { require.NoError(t, srcTenantConn.Close()) }() + t.Run("destination tenant revert timestamp must match resume timestamp", func(t *testing.T) { + sysSQL.Exec(t, "CREATE TENANT bat") + sysSQL.Exec(t, "SELECT crdb_internal.unsafe_revert_tenant_to_timestamp('bat', cluster_logical_timestamp())") + sysSQL.ExpectErr(t, "doesn't match last revert timestamp", + "CREATE TENANT bat FROM REPLICATION OF source ON $1 WITH RESUME TIMESTAMP = cluster_logical_timestamp()", srcPgURL.String()) + }) + t.Run("external connection must be reachable", func(t *testing.T) { + badPgURL := srcPgURL + badPgURL.Host = "nonexistent_test_endpoint" + sysSQL.ExpectErr(t, "pq: failed to construct External Connection details: failed to connect", + fmt.Sprintf(`CREATE EXTERNAL CONNECTION "replication-source-addr" AS "%s"`, + badPgURL.String())) + }) +} - // Set required cluster settings. - SrcSysSQL := sqlutils.MakeSQLRunner(srcDB) - SrcSysSQL.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) +func TestTenantStreamingFailback(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() - DestSysSQL := sqlutils.MakeSQLRunner(destDB) - DestSysSQL.Exec(t, `SET CLUSTER SETTING physical_replication.enabled = true;`) + serverA, aDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }) + defer serverA.Stopper().Stop(ctx) + serverB, bDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }) + defer serverB.Stopper().Stop(ctx) + + newTenantConn := func(t *testing.T, srv serverutils.ApplicationLayerInterface, tenantName string) *gosql.DB { + var conn *gosql.DB + testutils.SucceedsSoon(t, func() error { + db, err := srv.SQLConnE(fmt.Sprintf("cluster:%s", tenantName)) + if err != nil { + return err + } + if err := db.Ping(); err != nil { + return err + } + conn = db + return nil + }) + return conn + } - // Sink to read data from. - srcPgURL, cleanupSink := sqlutils.PGUrl(t, srcServer.AdvSQLAddr(), t.Name(), url.User(username.RootUser)) - defer cleanupSink() + // ALTER VIRTUAL CLUSTER STOP SERVICE does not block until the + // service is stopped. But, we need to wait until the + // SQLServer is stopped to ensure that nothing is writing to + // the relevant keyspace. + waitUntilStopped := func(t *testing.T, srv serverutils.ApplicationLayerInterface, tenantName string) { + // TODO(ssd): We may want to do something like this, + // but this query is driven first from the + // system.tenants table and doesn't strictly represent + // the in-memory state of the tenant controller. + // + // client := srv.GetAdminClient(t) + // testutils.SucceedsSoon(t, func() error { + // resp, err := client.ListTenants(ctx, &serverpb.ListTenantsRequest{}) + // if err != nil { + // return err + // } + // for _, tenant := range resp.Tenants { + // if tenant.TenantName == tenantName { + // t.Logf("tenant %q is still running", tenantName) + // return errors.Newf("tenant %q still running") + // } + // } + // t.Logf("tenant %q is not running", tenantName) + // return nil + // }) + testutils.SucceedsSoon(t, func() error { + db, err := srv.SQLConnE(fmt.Sprintf("cluster:%s", tenantName)) + if err != nil { + return err + } + if err := db.Ping(); err == nil { + t.Logf("tenant %q is still accepting connections", tenantName) + return errors.Newf("tenant %q still accepting connections") + } + t.Logf("tenant %q is not accepting connections", tenantName) + return nil + }) + } - DestSysSQL.ExpectErr(t, "pq: neither the source tenant \"source\" nor the destination tenant \"system\" \\(0\\) can be the system tenant", - `CREATE TENANT system FROM REPLICATION OF source ON $1`, srcPgURL.String()) + sqlA := sqlutils.MakeSQLRunner(aDB) + sqlB := sqlutils.MakeSQLRunner(bDB) + + serverAURL, cleanupURLA := sqlutils.PGUrl(t, serverA.SQLAddr(), t.Name(), url.User(username.RootUser)) + defer cleanupURLA() + serverBURL, cleanupURLB := sqlutils.PGUrl(t, serverB.SQLAddr(), t.Name(), url.User(username.RootUser)) + defer cleanupURLB() + + for _, s := range []string{ + "SET CLUSTER SETTING physical_replication.enabled = true", + "SET CLUSTER SETTING kv.rangefeed.enabled = true", + "SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '200ms'", + "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'", + "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50ms'", + + "SET CLUSTER SETTING physical_replication.consumer.heartbeat_frequency = '1s'", + "SET CLUSTER SETTING physical_replication.consumer.job_checkpoint_frequency = '100ms'", + "SET CLUSTER SETTING physical_replication.consumer.minimum_flush_interval = '10ms'", + "SET CLUSTER SETTING physical_replication.consumer.cutover_signal_poll_interval = '100ms'", + "SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '100ms'", + } { + sqlA.Exec(t, s) + sqlB.Exec(t, s) + } + compareAtTimetamp := func(ts string) { + fingerprintQueryFmt := "SELECT fingerprint FROM [SHOW EXPERIMENTAL_FINGERPRINTS FROM TENANT %s] AS OF SYSTEM TIME %s" + var fingerprintF int64 + sqlA.QueryRow(t, fmt.Sprintf(fingerprintQueryFmt, "f", ts)).Scan(&fingerprintF) + var fingerprintG int64 + sqlB.QueryRow(t, fmt.Sprintf(fingerprintQueryFmt, "g", ts)).Scan(&fingerprintG) + require.Equal(t, fingerprintF, fingerprintG, "fingerprint mismatch at %s", ts) - DestSysSQL.Exec(t, "CREATE TENANT \"100\"") - DestSysSQL.ExpectErr(t, "pq: tenant with name \"100\" already exists", - `CREATE TENANT "100" FROM REPLICATION OF source ON $1`, srcPgURL.String()) + } - badPgURL := srcPgURL - badPgURL.Host = "nonexistent_test_endpoint" - DestSysSQL.ExpectErr(t, "pq: failed to construct External Connection details: failed to connect", - fmt.Sprintf(`CREATE EXTERNAL CONNECTION "replication-source-addr" AS "%s"`, - badPgURL.String())) + // The overall test plan looks like: + // + // SETUP + // Create tenant f on severA + // Start service for tenant f + // Write to tenant f + // Replicate tenant f on serverA to tenant g on serverB + // + // FAILOVER + // Complete replication on tenant g as of ts1 + // Fingerprint f and g as of ts1 + // + // SPLIT BRAIN + // Start service for tenant g + // Write to f and g + // Get ts2 + // + // RESET AND RESYNC + // Stop service for tenant f + // Replicate tenant g on serverB to tenant f on serverA as of ts1 + // Fingerprint f and g as of ts1 + // Fingerprint f and g as of ts2 + // + // FAIL BACK + // Get ts3 + // Complete replication on tenant f as of ts3 + // Replicate tenant f on serverA to tenant g on serverB + // Fingerprint f and g as of ts1 + // Fingerprint f and g as of ts2 + // Fingerprint f and g as of ts3 + // Confirm rows written to f during split brain have been reverted; rows written to g remain + + // SETUP + t.Logf("creating tenant f") + sqlA.Exec(t, "CREATE VIRTUAL CLUSTER f") + sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f START SERVICE SHARED") + + tenFDB := newTenantConn(t, serverA.SystemLayer(), "f") + defer tenFDB.Close() + sqlTenF := sqlutils.MakeSQLRunner(tenFDB) + + sqlTenF.Exec(t, "CREATE DATABASE test") + sqlTenF.Exec(t, "CREATE TABLE test.t (k PRIMARY KEY) AS SELECT generate_series(1, 100)") + + t.Logf("starting replication f->g") + sqlB.Exec(t, "CREATE VIRTUAL CLUSTER g FROM REPLICATION OF f ON $1", serverAURL.String()) + + // FAILOVER + _, consumerGJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlB, roachpb.TenantName("g")) + var ts1 string + sqlA.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts1) + t.Logf("waiting for g@%s", ts1) + replicationtestutils.WaitUntilReplicatedTime(t, + replicationtestutils.DecimalTimeToHLC(t, ts1), + sqlB, + jobspb.JobID(consumerGJobID)) + + t.Logf("completing replication on g@%s", ts1) + sqlB.Exec(t, fmt.Sprintf("ALTER VIRTUAL CLUSTER g COMPLETE REPLICATION TO SYSTEM TIME '%s'", ts1)) + + jobutils.WaitForJobToSucceed(t, sqlB, jobspb.JobID(consumerGJobID)) + compareAtTimetamp(ts1) + + // SPLIT BRAIN + // g is now the "primary" + // f is still running unfortunately + sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g START SERVICE SHARED") + tenGDB := newTenantConn(t, serverB.SystemLayer(), "g") + defer tenGDB.Close() + sqlTenG := sqlutils.MakeSQLRunner(tenGDB) + + sqlTenF.Exec(t, "INSERT INTO test.t VALUES (777)") // This value should be abandoned + sqlTenG.Exec(t, "INSERT INTO test.t VALUES (555)") // This value should be synced later + var ts2 string + sqlA.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts2) + + // RESET AND RESYNC + // Stop service for tenant f + // Replicate tenant g on serverB to tenant f on serverA as of ts1 + // Fingerprint f and g as of ts1 + // Fingerprint f and g as of ts2 + sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f STOP SERVICE") + waitUntilStopped(t, serverA.SystemLayer(), "f") + t.Logf("starting replication g->f") + sqlA.Exec(t, fmt.Sprintf("SELECT crdb_internal.unsafe_revert_tenant_to_timestamp('f', %s)", ts1)) + sqlA.Exec(t, fmt.Sprintf("CREATE VIRTUAL CLUSTER f FROM REPLICATION OF g ON $1 WITH RESUME TIMESTAMP = '%s'", ts1), serverBURL.String()) + _, consumerFJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlA, roachpb.TenantName("f")) + t.Logf("waiting for f@%s", ts2) + replicationtestutils.WaitUntilReplicatedTime(t, + replicationtestutils.DecimalTimeToHLC(t, ts2), + sqlA, + jobspb.JobID(consumerFJobID)) + + compareAtTimetamp(ts1) + compareAtTimetamp(ts2) + + // FAIL BACK + // Get ts3 + // Complete replication on tenant f as of ts3 + // Replicate tenant f on serverA to tenant g on serverB + // Fingerprint f and g as of ts1 + // Fingerprint f and g as of ts2 + // Fingerprint f and g as of ts3 + // Confirm rows written to f during split brain have been reverted; rows written to g remain + var ts3 string + sqlA.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts3) + t.Logf("completing replication on f@%s", ts3) + sqlA.Exec(t, fmt.Sprintf("ALTER VIRTUAL CLUSTER f COMPLETE REPLICATION TO SYSTEM TIME '%s'", ts3)) + jobutils.WaitForJobToSucceed(t, sqlA, jobspb.JobID(consumerFJobID)) + sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f START SERVICE SHARED") + + sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g STOP SERVICE") + waitUntilStopped(t, serverB.SystemLayer(), "g") + t.Logf("starting replication f->g") + sqlB.Exec(t, fmt.Sprintf("SELECT crdb_internal.unsafe_revert_tenant_to_timestamp('g', %s)", ts3)) + sqlB.Exec(t, fmt.Sprintf("CREATE VIRTUAL CLUSTER g FROM REPLICATION OF f ON $1 WITH RESUME TIMESTAMP = '%s'", ts3), serverAURL.String()) + _, consumerGJobID = replicationtestutils.GetStreamJobIds(t, ctx, sqlB, roachpb.TenantName("g")) + t.Logf("waiting for g@%s", ts3) + replicationtestutils.WaitUntilReplicatedTime(t, + replicationtestutils.DecimalTimeToHLC(t, ts3), + sqlB, + jobspb.JobID(consumerGJobID)) + + // As of now, we are back in our original position, but with + // an extra write from when we were failed over. + compareAtTimetamp(ts1) + compareAtTimetamp(ts2) + compareAtTimetamp(ts3) + + tenF2DB := newTenantConn(t, serverA.SystemLayer(), "f") + defer tenF2DB.Close() + sqlTenF = sqlutils.MakeSQLRunner(tenF2DB) + sqlTenF.CheckQueryResults(t, "SELECT max(k) FROM test.t", [][]string{{"555"}}) } func TestCutoverBuiltin(t *testing.T) { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index 9750dfd61734..a2482aa15b48 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -71,6 +73,15 @@ func ingestionTypeCheck( ingestionStmt.ReplicationSourceAddress, ingestionStmt.Options.Retention}, } + if ingestionStmt.Options.ResumeTimestamp != nil { + if _, err := typeCheckSystemTimeExpr(ctx, + &p.ExtendedEvalContext().Context, + p.SemaCtx(), + ingestionStmt.Options.ResumeTimestamp, + createReplicationOp); err != nil { + return false, nil, err + } + } if ingestionStmt.Like.OtherTenant != nil { toTypeCheck = append(toTypeCheck, exprutil.TenantSpec{TenantSpec: ingestionStmt.Like.OtherTenant}, @@ -127,7 +138,8 @@ func ingestionPlanHook( } } - options, err := evalTenantReplicationOptions(ctx, ingestionStmt.Options, exprEval) + evalCtx := &p.ExtendedEvalContext().Context + options, err := evalTenantReplicationOptions(ctx, ingestionStmt.Options, exprEval, evalCtx, p.SemaCtx(), createReplicationOp) if err != nil { return nil, nil, nil, false, err } @@ -158,7 +170,6 @@ func ingestionPlanHook( } streamAddress = streamingccl.StreamAddress(streamURL.String()) - // TODO(adityamaru): Add privileges checks. Probably the same as RESTORE. if roachpb.IsSystemTenantName(roachpb.TenantName(sourceTenant)) || roachpb.IsSystemTenantName(roachpb.TenantName(dstTenantName)) || roachpb.IsSystemTenantID(dstTenantID) { @@ -166,39 +177,92 @@ func ingestionPlanHook( sourceTenant, dstTenantName, dstTenantID) } - // Determine which template will be used as config template to - // create the new tenant below. - tenantInfo, err := sql.GetTenantTemplate(ctx, p.ExecCfg().Settings, p.InternalSQLTxn(), nil, likeTenantID, likeTenantName) - if err != nil { - return err - } - - // Create a new tenant for the replication stream. + // If we don't have a resume timestamp, make a new tenant jobID := p.ExecCfg().JobRegistry.MakeJobID() - tenantInfo.PhysicalReplicationConsumerJobID = jobID - // dstTenantID may be zero which will cause auto-allocation. - tenantInfo.ID = dstTenantID - tenantInfo.DataState = mtinfopb.DataStateAdd - tenantInfo.Name = roachpb.TenantName(dstTenantName) - - initialTenantZoneConfig, err := sql.GetHydratedZoneConfigForTenantsRange(ctx, p.Txn(), p.ExtendedEvalContext().Descs) - if err != nil { - return err - } - destinationTenantID, err := sql.CreateTenantRecord( - ctx, p.ExecCfg().Codec, p.ExecCfg().Settings, - p.InternalSQLTxn(), - p.ExecCfg().SpanConfigKVAccessor.WithTxn(ctx, p.Txn()), - tenantInfo, initialTenantZoneConfig, - ingestionStmt.IfNotExists, - p.ExecCfg().TenantTestingKnobs, - ) - if err != nil { - return err - } else if !destinationTenantID.IsSet() { - // No error but no valid tenant ID: there was an IF NOT EXISTS - // clause and the tenant already existed. Nothing else to do. - return nil + var destinationTenantID roachpb.TenantID + if options.resumeTimestamp.IsEmpty() { + // Determine which template will be used as config template to + // create the new tenant below. + tenantInfo, err := sql.GetTenantTemplate(ctx, p.ExecCfg().Settings, p.InternalSQLTxn(), nil, likeTenantID, likeTenantName) + if err != nil { + return err + } + + // Create a new tenant for the replication stream. + tenantInfo.PhysicalReplicationConsumerJobID = jobID + // dstTenantID may be zero which will cause auto-allocation. + tenantInfo.ID = dstTenantID + tenantInfo.DataState = mtinfopb.DataStateAdd + tenantInfo.Name = roachpb.TenantName(dstTenantName) + + initialTenantZoneConfig, err := sql.GetHydratedZoneConfigForTenantsRange(ctx, p.Txn(), p.ExtendedEvalContext().Descs) + if err != nil { + return err + } + destinationTenantID, err = sql.CreateTenantRecord( + ctx, p.ExecCfg().Codec, p.ExecCfg().Settings, + p.InternalSQLTxn(), + p.ExecCfg().SpanConfigKVAccessor.WithTxn(ctx, p.Txn()), + tenantInfo, initialTenantZoneConfig, + ingestionStmt.IfNotExists, + p.ExecCfg().TenantTestingKnobs, + ) + if err != nil { + return err + } else if !destinationTenantID.IsSet() { + // No error but no valid tenant ID: there was an IF NOT EXISTS + // clause and the tenant already existed. Nothing else to do. + return nil + } + } else { + tenantRecord, err := sql.GetTenantRecordByName( + ctx, p.ExecCfg().Settings, + p.InternalSQLTxn(), + roachpb.TenantName(dstTenantName), + ) + if err != nil { + return err + } + + // Here, we try to prevent the user from making a few + // mistakes. Starting a replication stream into an + // existing tenant requires both that it is offline and + // that it is consistent as of the provided timestamp. + if tenantRecord.ServiceMode != mtinfopb.ServiceModeNone { + return errors.Newf("cannot start replication for tenant %q (%d) in service mode %s; service mode must be %s", + tenantRecord.Name, + tenantRecord.ID, + tenantRecord.ServiceMode, + mtinfopb.ServiceModeNone, + ) + } + if tenantRecord.LastRevertTenantTimestamp.IsEmpty() { + return errors.Newf("cannot start replication for tenant %q (%d) with no last revert timestamp found; likely that this tenant cannot be safely streamed into", + tenantRecord.Name, + tenantRecord.ID, + ) + } + if !tenantRecord.LastRevertTenantTimestamp.Equal(options.resumeTimestamp) { + return errors.Newf("cannot start replication for tenant %q (%d) with resume timestamp %s that doesn't match last revert timestamp %s", + tenantRecord.Name, + tenantRecord.ID, + options.resumeTimestamp, + tenantRecord.LastRevertTenantTimestamp, + ) + } + + // Reset the last revert timestamp. + tenantRecord.LastRevertTenantTimestamp = hlc.Timestamp{} + tenantRecord.PhysicalReplicationConsumerJobID = jobID + tenantRecord.DataState = mtinfopb.DataStateAdd + if err := sql.UpdateTenantRecord(ctx, p.ExecCfg().Settings, + p.InternalSQLTxn(), tenantRecord); err != nil { + return err + } + destinationTenantID, err = roachpb.MakeTenantID(tenantRecord.ID) + if err != nil { + return err + } } // Create a new stream with stream client. @@ -206,10 +270,24 @@ func ingestionPlanHook( if err != nil { return err } + // Create the producer job first for the purpose of observability, user is // able to know the producer job id immediately after executing // CREATE VIRTUAL CLUSTER ... FROM REPLICATION. - replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(sourceTenant)) + req := streampb.ReplicationProducerRequest{} + if !options.resumeTimestamp.IsEmpty() { + req = streampb.ReplicationProducerRequest{ + ReplicationStartTime: options.resumeTimestamp, + + // NB: These are checked against any + // PreviousSourceTenant on the source's tenant + // record. + TenantID: destinationTenantID, + ClusterID: p.ExtendedEvalContext().ClusterID, + } + } + + replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(sourceTenant), req) if err != nil { return err } @@ -221,11 +299,15 @@ func ingestionPlanHook( StreamAddress: string(streamAddress), StreamID: uint64(replicationProducerSpec.StreamID), Span: keys.MakeTenantSpan(destinationTenantID), + ReplicationTTLSeconds: retentionTTLSeconds, + DestinationTenantID: destinationTenantID, - SourceTenantName: roachpb.TenantName(sourceTenant), DestinationTenantName: roachpb.TenantName(dstTenantName), - ReplicationTTLSeconds: retentionTTLSeconds, - ReplicationStartTime: replicationProducerSpec.ReplicationStartTime, + + SourceTenantName: roachpb.TenantName(sourceTenant), + SourceTenantID: replicationProducerSpec.SourceTenantID, + SourceClusterID: replicationProducerSpec.SourceClusterID, + ReplicationStartTime: replicationProducerSpec.ReplicationStartTime, } jobDescription, err := streamIngestionJobDescription(p, from, ingestionStmt) @@ -236,8 +318,10 @@ func ingestionPlanHook( jr := jobs.Record{ Description: jobDescription, Username: p.User(), - Progress: jobspb.StreamIngestionProgress{}, - Details: streamIngestionDetails, + Progress: jobspb.StreamIngestionProgress{ + ReplicatedTime: options.resumeTimestamp, + }, + Details: streamIngestionDetails, } _, err = p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index 234129cfbe58..9eea4fbb5afc 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -70,7 +70,7 @@ var _ streamclient.Client = &mockStreamClient{} // Create implements the Client interface. func (m *mockStreamClient) Create( - _ context.Context, _ roachpb.TenantName, + _ context.Context, _ roachpb.TenantName, _ streampb.ReplicationProducerRequest, ) (streampb.ReplicationProducerSpec, error) { panic("unimplemented") } @@ -682,7 +682,7 @@ func TestRandomClientGeneration(t *testing.T) { randomStreamClient, ok := streamClient.(*streamclient.RandomStreamClient) require.True(t, ok) - rps, err := randomStreamClient.Create(ctx, tenantName) + rps, err := randomStreamClient.Create(ctx, tenantName, streampb.ReplicationProducerRequest{}) require.NoError(t, err) topo, err := randomStreamClient.Plan(ctx, rps.StreamID) diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index a427e3520046..f890afc37ead 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -131,6 +131,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { initialTimestamp := s.spec.InitialScanTimestamp if s.spec.PreviousReplicatedTimestamp.IsEmpty() { + log.Infof(ctx, "starting event stream with initial scan at %s", initialTimestamp) opts = append(opts, rangefeed.WithInitialScan(func(ctx context.Context) {}), rangefeed.WithScanRetryBehavior(rangefeed.ScanRetryRemaining), @@ -149,6 +150,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { } else { initialTimestamp = s.spec.PreviousReplicatedTimestamp // When resuming from cursor, advance frontier to the cursor position. + log.Infof(ctx, "resuming event stream (no initial scan) from %s", initialTimestamp) for _, sp := range s.spec.Spans { if _, err := frontier.Forward(sp, s.spec.PreviousReplicatedTimestamp); err != nil { return err diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager.go b/pkg/ccl/streamingccl/streamproducer/replication_manager.go index b9f309d08acf..0a7c0bb212bf 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_manager.go @@ -34,9 +34,9 @@ type replicationStreamManagerImpl struct { // StartReplicationStream implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) StartReplicationStream( - ctx context.Context, tenantName roachpb.TenantName, + ctx context.Context, tenantName roachpb.TenantName, req streampb.ReplicationProducerRequest, ) (streampb.ReplicationProducerSpec, error) { - return startReplicationProducerJob(ctx, r.evalCtx, r.txn, tenantName) + return startReplicationProducerJob(ctx, r.evalCtx, r.txn, tenantName, req) } // HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface. diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index 41ad27649d05..be9db204c5f7 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" @@ -57,7 +58,11 @@ func jobIsNotRunningError(id jobspb.JobID, status jobs.Status, op string) error // 1. Tracks the liveness of the replication stream consumption. // 2. Updates the protected timestamp for spans being replicated. func startReplicationProducerJob( - ctx context.Context, evalCtx *eval.Context, txn isql.Txn, tenantName roachpb.TenantName, + ctx context.Context, + evalCtx *eval.Context, + txn isql.Txn, + tenantName roachpb.TenantName, + req streampb.ReplicationProducerRequest, ) (streampb.ReplicationProducerSpec, error) { execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) @@ -69,7 +74,36 @@ func startReplicationProducerJob( if err != nil { return streampb.ReplicationProducerSpec{}, err } - tenantID := tenantRecord.ID + tenantID, err := roachpb.MakeTenantID(tenantRecord.ID) + if err != nil { + return streampb.ReplicationProducerSpec{}, err + } + + var replicationStartTime hlc.Timestamp + if !req.ReplicationStartTime.IsEmpty() { + if tenantRecord.PreviousSourceTenant != nil { + cid := tenantRecord.PreviousSourceTenant.ClusterID + if !req.ClusterID.Equal(uuid.UUID{}) && !cid.Equal(uuid.UUID{}) { + if !req.ClusterID.Equal(cid) { + return streampb.ReplicationProducerSpec{}, errors.Errorf("requesting cluster ID %s does not match previous source cluster ID %s", + req.ClusterID, cid) + } + } + + tid := tenantRecord.PreviousSourceTenant.TenantID + if !req.TenantID.Equal(roachpb.TenantID{}) && !tid.Equal(roachpb.TenantID{}) { + if !req.TenantID.Equal(tid) { + return streampb.ReplicationProducerSpec{}, errors.Errorf("requesting tenant ID %s does not match previous source tenant ID %s", + req.TenantID, tid) + } + } + } + replicationStartTime = req.ReplicationStartTime + } else { + replicationStartTime = hlc.Timestamp{ + WallTime: evalCtx.GetStmtTimestamp().UnixNano(), + } + } registry := execConfig.JobRegistry timeout := streamingccl.StreamReplicationJobLivenessTimeout.Get(&evalCtx.Settings.SV) @@ -86,20 +120,20 @@ func startReplicationProducerJob( } ptp := execConfig.ProtectedTimestampProvider.WithTxn(txn) - statementTime := hlc.Timestamp{ - WallTime: evalCtx.GetStmtTimestamp().UnixNano(), - } - deprecatedSpansToProtect := roachpb.Spans{makeTenantSpan(tenantID)} - targetToProtect := ptpb.MakeTenantsTarget([]roachpb.TenantID{roachpb.MustMakeTenantID(tenantID)}) - pts := jobsprotectedts.MakeRecord(ptsID, int64(jr.JobID), statementTime, + deprecatedSpansToProtect := roachpb.Spans{keys.MakeTenantSpan(tenantID)} + targetToProtect := ptpb.MakeTenantsTarget([]roachpb.TenantID{tenantID}) + pts := jobsprotectedts.MakeRecord(ptsID, int64(jr.JobID), replicationStartTime, deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect) if err := ptp.Protect(ctx, pts); err != nil { return streampb.ReplicationProducerSpec{}, err } + return streampb.ReplicationProducerSpec{ StreamID: streampb.StreamID(jr.JobID), - ReplicationStartTime: statementTime, + SourceTenantID: tenantID, + SourceClusterID: evalCtx.ClusterID, + ReplicationStartTime: replicationStartTime, }, nil } diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 8ea11fd57ed7..c61a6e1f694d 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -125,6 +125,15 @@ message StreamIngestionDetails { // source cluster. util.hlc.Timestamp replication_start_time = 12 [(gogoproto.nullable) = false]; + roachpb.TenantID source_tenant_id = 13 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "SourceTenantID"]; + + bytes source_cluster_id = 14 [ + (gogoproto.nullable) = false, + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.customname) = "SourceClusterID"]; + reserved 5, 6; } diff --git a/pkg/multitenant/mtinfopb/BUILD.bazel b/pkg/multitenant/mtinfopb/BUILD.bazel index 38642fef4eac..ad8795b491a7 100644 --- a/pkg/multitenant/mtinfopb/BUILD.bazel +++ b/pkg/multitenant/mtinfopb/BUILD.bazel @@ -22,6 +22,7 @@ proto_library( deps = [ "//pkg/kv/kvpb:kvpb_proto", "//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb:tenantcapabilitiespb_proto", + "//pkg/roachpb:roachpb_proto", "//pkg/util/hlc:hlc_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", ], @@ -38,6 +39,7 @@ go_proto_library( "//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb", "//pkg/roachpb", # keep "//pkg/util/hlc", + "//pkg/util/uuid", # keep "@com_github_gogo_protobuf//gogoproto", ], ) diff --git a/pkg/multitenant/mtinfopb/info.proto b/pkg/multitenant/mtinfopb/info.proto index b1b0a062cf50..e2ce450bdabe 100644 --- a/pkg/multitenant/mtinfopb/info.proto +++ b/pkg/multitenant/mtinfopb/info.proto @@ -14,6 +14,7 @@ option go_package = "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"; import "gogoproto/gogo.proto"; import "kv/kvpb/api.proto"; +import "roachpb/data.proto"; import "multitenant/tenantcapabilities/tenantcapabilitiespb/capabilities.proto"; import "util/hlc/timestamp.proto"; @@ -74,22 +75,37 @@ message ProtoInfo { // source cluster of that replication stream. optional PreviousSourceTenant previous_source_tenant = 7; - // Next ID: 8 + // LastReverTenantTimestamp is the timestamp at which we last called + // RevertRange in preparation for a stream resumption into this + // tenant. + // + // This is cleared on ALTER VIRTUAL CLUSTER START SERVICE and CREATE + // VIRTUAL CLUSTER FROM REPLICATION STREAM. + optional util.hlc.Timestamp last_revert_tenant_timestamp = 8 [(gogoproto.nullable) = false]; + + // Next ID: 9 } message PreviousSourceTenant { option (gogoproto.equal) = true; - // TODO(ssd): Store something that allows us to confirm the identity - // of the cluster later. We could use some combination of the source - // host's ClusterID() and tenant ID or the tenant's cluster ID(). I - // don't think that would account for backup/restore though. + // TODO(ssd): It would be nice to store something here that + // identified a particular MVCC history so that we could also use + // this to compare against things like restored tenants. + optional bytes cluster_id = 1 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "ClusterID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" + ]; + + optional roachpb.TenantID tenant_id = 2 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "TenantID" + ]; // CutoverTimestamp is the time at which we cutover from the source // tenant. - optional util.hlc.Timestamp cutover_timestamp = 1 [ - (gogoproto.nullable) = false - ]; + optional util.hlc.Timestamp cutover_timestamp = 3 [(gogoproto.nullable) = false]; } diff --git a/pkg/repstream/streampb/BUILD.bazel b/pkg/repstream/streampb/BUILD.bazel index 9a44f4bbff53..33a659e1665f 100644 --- a/pkg/repstream/streampb/BUILD.bazel +++ b/pkg/repstream/streampb/BUILD.bazel @@ -30,6 +30,7 @@ go_proto_library( "//pkg/roachpb", "//pkg/util", "//pkg/util/hlc", + "//pkg/util/uuid", # keep "@com_github_gogo_protobuf//gogoproto", ], ) diff --git a/pkg/repstream/streampb/stream.proto b/pkg/repstream/streampb/stream.proto index f183346113c7..d8768123d23a 100644 --- a/pkg/repstream/streampb/stream.proto +++ b/pkg/repstream/streampb/stream.proto @@ -35,6 +35,46 @@ message ReplicationProducerSpec { // through the lifetime of a replication stream. This will be the timestamp as // of which each partition will perform its initial rangefeed scan. util.hlc.Timestamp replication_start_time = 2 [(gogoproto.nullable) = false]; + + bytes source_cluster_id = 3 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "SourceClusterID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" + ]; + + roachpb.TenantID source_tenant_id = 4 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "SourceTenantID" + ]; +} + +// ReplicationProducerRequest is sent by the consuming cluster when +// starting a replication stream on an existing tenant. +message ReplicationProducerRequest { + // ReplicationStartTime is the initial timestamp from which the replication + // producer job will begin streaming MVCC revisions. This timestamp is picked + // once when the replication producer job is created, and is never updated + // through the lifetime of a replication stream. This will be the timestamp as + // of which each partition will perform its initial rangefeed scan. + util.hlc.Timestamp replication_start_time = 1 [(gogoproto.nullable) = false]; + + + // ClusterID identifies the requesting cluster. If set, this is + // compared to the ClusterID in the tenant record's + // PreviousSourceTenant. + bytes cluster_id = 2 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "ClusterID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" + ]; + + // TenantID identifies the requesting cluster's destination + // tenant. If set, this is compared to the TenantID in the tenant + // record's PreviousSourceTenant. + roachpb.TenantID tenant_id = 3 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "TenantID" + ]; } // StreamPartitionSpec is the stream partition specification. diff --git a/pkg/sql/catalog/bootstrap/bootstrap_test.go b/pkg/sql/catalog/bootstrap/bootstrap_test.go index 938877f6baf8..4cdc5648b13f 100644 --- a/pkg/sql/catalog/bootstrap/bootstrap_test.go +++ b/pkg/sql/catalog/bootstrap/bootstrap_test.go @@ -176,7 +176,7 @@ func TestSystemDatabaseSchemaBootstrapVersionBumped(t *testing.T) { // If you need to update this value (i.e. failed this test), check whether // you need to bump systemschema.SystemDatabaseSchemaBootstrapVersion too. - const prevSystemHash = "3e74cb9b592efdffed097636c60fb506e393f5256bdac3dfc51a21d1902de64d" + const prevSystemHash = "cd85ebe773d840ecaf39fd4ccbba6345f54d7414a95b5bf5be2fa4102c26c650" _, curSystemHash := getAndHashInitialValuesToString(0 /* tenantID */) if prevSystemHash != curSystemHash { diff --git a/pkg/sql/catalog/bootstrap/testdata/testdata b/pkg/sql/catalog/bootstrap/testdata/testdata index 5219f1fb3662..577036301d4e 100644 --- a/pkg/sql/catalog/bootstrap/testdata/testdata +++ b/pkg/sql/catalog/bootstrap/testdata/testdata @@ -1,4 +1,4 @@ -system hash=3e74cb9b592efdffed097636c60fb506e393f5256bdac3dfc51a21d1902de64d +system hash=cd85ebe773d840ecaf39fd4ccbba6345f54d7414a95b5bf5be2fa4102c26c650 ---- [{"key":"04646573632d696467656e","value":"01c801"} ,{"key":"8b"} @@ -72,7 +72,7 @@ system hash=3e74cb9b592efdffed097636c60fb506e393f5256bdac3dfc51a21d1902de64d ,{"key":"8f"} ,{"key":"8f898888","value":"01c801"} ,{"key":"90"} -,{"key":"90898988","value":"0a2a160a080110001a0020002a00160673797374656d13021304"} +,{"key":"90898988","value":"0a2a160c080110001a0020002a004200160673797374656d13021304"} ,{"key":"908a1273797374656d000188","value":"0389"} ,{"key":"908b8a8988","value":"03"} ,{"key":"91"} diff --git a/pkg/sql/logictest/testdata/logic_test/tenant b/pkg/sql/logictest/testdata/logic_test/tenant index 443df461f0a7..36ac0a75d8d5 100644 --- a/pkg/sql/logictest/testdata/logic_test/tenant +++ b/pkg/sql/logictest/testdata/logic_test/tenant @@ -4,7 +4,7 @@ query IBIT colnames SELECT id, active, length(info), name FROM system.tenants ORDER BY id ---- id active length name -1 true 10 system +1 true 12 system # Create a few tenants. diff --git a/pkg/sql/logictest/testdata/logic_test/tenant_builtins b/pkg/sql/logictest/testdata/logic_test/tenant_builtins index eae9f081dc99..3f2fcaa82892 100644 --- a/pkg/sql/logictest/testdata/logic_test/tenant_builtins +++ b/pkg/sql/logictest/testdata/logic_test/tenant_builtins @@ -4,7 +4,7 @@ query IBIT colnames SELECT id, active, length(info), name FROM system.tenants ORDER BY id ---- id active length name -1 true 10 system +1 true 12 system # Create three tenants. diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 31a3385c273b..d9bd03e08d8b 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -4558,6 +4558,11 @@ replication_options: { $$.val = &tree.TenantReplicationOptions{Retention: $3.expr()} } +| + RESUME TIMESTAMP '=' d_expr + { + $$.val = &tree.TenantReplicationOptions{ResumeTimestamp: $4.expr()} + } // %Help: CREATE SCHEDULE // %Category: Group diff --git a/pkg/sql/parser/testdata/create_virtual_cluster b/pkg/sql/parser/testdata/create_virtual_cluster index 0d149f113b26..c4b780891071 100644 --- a/pkg/sql/parser/testdata/create_virtual_cluster +++ b/pkg/sql/parser/testdata/create_virtual_cluster @@ -94,6 +94,14 @@ CREATE VIRTUAL CLUSTER ("destination-hyphen") FROM REPLICATION OF ("source-hyphe CREATE VIRTUAL CLUSTER "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON '_' WITH RETENTION = '_' -- literals removed CREATE VIRTUAL CLUSTER _ FROM REPLICATION OF _ ON 'pgurl' WITH RETENTION = '36h' -- identifiers removed +parse +CREATE VIRTUAL CLUSTER "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH RESUME TIMESTAMP = '132412341234.000000' +---- +CREATE VIRTUAL CLUSTER "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH RESUME TIMESTAMP = '132412341234.000000' +CREATE VIRTUAL CLUSTER ("destination-hyphen") FROM REPLICATION OF ("source-hyphen") ON ('pgurl') WITH RESUME TIMESTAMP = ('132412341234.000000') -- fully parenthesized +CREATE VIRTUAL CLUSTER "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON '_' WITH RESUME TIMESTAMP = '_' -- literals removed +CREATE VIRTUAL CLUSTER _ FROM REPLICATION OF _ ON 'pgurl' WITH RESUME TIMESTAMP = '132412341234.000000' -- identifiers removed + parse CREATE VIRTUAL CLUSTER "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH OPTIONS (RETENTION = '36h') ---- diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go index 053ec47908b5..b01b37ad0f0a 100644 --- a/pkg/sql/sem/builtins/fixed_oids.go +++ b/pkg/sql/sem/builtins/fixed_oids.go @@ -2482,6 +2482,7 @@ var builtinOidsArray = []string{ 2511: `merge_stats_metadata(arg1: jsonb) -> jsonb`, 2512: `merge_statement_stats(arg1: jsonb) -> jsonb`, 2513: `merge_transaction_stats(arg1: jsonb) -> jsonb`, + 2514: `crdb_internal.start_replication_stream(tenant_name: string, spec: bytes) -> bytes`, } var builtinOidsBySignature map[string]oid.Oid diff --git a/pkg/sql/sem/builtins/replication_builtins.go b/pkg/sql/sem/builtins/replication_builtins.go index 0c4182bc58e6..4a347932114f 100644 --- a/pkg/sql/sem/builtins/replication_builtins.go +++ b/pkg/sql/sem/builtins/replication_builtins.go @@ -141,7 +141,7 @@ var replicationBuiltins = map[string]builtinDefinition{ return nil, err } tenantName := string(tree.MustBeDString(args[0])) - replicationProducerSpec, err := mgr.StartReplicationStream(ctx, roachpb.TenantName(tenantName)) + replicationProducerSpec, err := mgr.StartReplicationStream(ctx, roachpb.TenantName(tenantName), streampb.ReplicationProducerRequest{}) if err != nil { return nil, err } @@ -157,6 +157,42 @@ var replicationBuiltins = map[string]builtinDefinition{ "notify that the replication is still ongoing.", Volatility: volatility.Volatile, }, + tree.Overload{ + Types: tree.ParamTypes{ + {Name: "tenant_name", Typ: types.String}, + {Name: "spec", Typ: types.Bytes}, + }, + ReturnType: tree.FixedReturnType(types.Bytes), + Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) + if err != nil { + return nil, err + } + tenantName := string(tree.MustBeDString(args[0])) + reqBytes := []byte(tree.MustBeDBytes(args[1])) + + req := streampb.ReplicationProducerRequest{} + if err := protoutil.Unmarshal(reqBytes, &req); err != nil { + return nil, err + } + + replicationProducerSpec, err := mgr.StartReplicationStream(ctx, roachpb.TenantName(tenantName), req) + if err != nil { + return nil, err + } + + rawReplicationProducerSpec, err := protoutil.Marshal(&replicationProducerSpec) + if err != nil { + return nil, err + } + return tree.NewDBytes(tree.DBytes(rawReplicationProducerSpec)), err + }, + Info: "This function can be used on the producer side to start a replication stream for " + + "the specified tenant. The returned stream ID uniquely identifies created stream. " + + "The caller must periodically invoke crdb_internal.heartbeat_stream() function to " + + "notify that the replication is still ongoing.", + Volatility: volatility.Volatile, + }, ), "crdb_internal.replication_stream_progress": makeBuiltin( diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index 6e41f9eee8ce..5a5db9266e80 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -825,7 +825,7 @@ type StreamManagerFactory interface { type ReplicationStreamManager interface { // StartReplicationStream starts a stream replication job for the specified // tenant on the producer side. - StartReplicationStream(ctx context.Context, tenantName roachpb.TenantName) (streampb.ReplicationProducerSpec, error) + StartReplicationStream(ctx context.Context, tenantName roachpb.TenantName, req streampb.ReplicationProducerRequest) (streampb.ReplicationProducerSpec, error) // SetupSpanConfigsStream creates and plans a replication stream to stream the span config updates for a specific tenant. SetupSpanConfigsStream(ctx context.Context, tenantName roachpb.TenantName) (ValueGenerator, error) diff --git a/pkg/sql/sem/tree/create.go b/pkg/sql/sem/tree/create.go index 512d835d33fa..52eaedae149f 100644 --- a/pkg/sql/sem/tree/create.go +++ b/pkg/sql/sem/tree/create.go @@ -2235,7 +2235,8 @@ type CreateTenantFromReplication struct { // TenantReplicationOptions options for the CREATE VIRTUAL CLUSTER FROM REPLICATION command. type TenantReplicationOptions struct { - Retention Expr + Retention Expr + ResumeTimestamp Expr } var _ NodeFormatter = &TenantReplicationOptions{} @@ -2284,6 +2285,17 @@ func (o *TenantReplicationOptions) Format(ctx *FmtCtx) { ctx.WriteByte(')') } } + if o.ResumeTimestamp != nil { + ctx.WriteString("RESUME TIMESTAMP = ") + _, canOmitParentheses := o.ResumeTimestamp.(alreadyDelimitedAsSyntacticDExpr) + if !canOmitParentheses { + ctx.WriteByte('(') + } + ctx.FormatNode(o.ResumeTimestamp) + if !canOmitParentheses { + ctx.WriteByte(')') + } + } } // CombineWith merges other TenantReplicationOptions into this struct. @@ -2296,13 +2308,23 @@ func (o *TenantReplicationOptions) CombineWith(other *TenantReplicationOptions) } else { o.Retention = other.Retention } + + if o.ResumeTimestamp != nil { + if other.ResumeTimestamp != nil { + return errors.New("RESUME TIMESTAMP option specified multiple times") + } + } else { + o.ResumeTimestamp = other.ResumeTimestamp + } + return nil } // IsDefault returns true if this backup options struct has default value. func (o TenantReplicationOptions) IsDefault() bool { options := TenantReplicationOptions{} - return o.Retention == options.Retention + return o.Retention == options.Retention && + o.ResumeTimestamp == options.ResumeTimestamp } type SuperRegion struct { diff --git a/pkg/sql/sem/tree/walk.go b/pkg/sql/sem/tree/walk.go index 058a62c953a2..96980f046956 100644 --- a/pkg/sql/sem/tree/walk.go +++ b/pkg/sql/sem/tree/walk.go @@ -1041,6 +1041,16 @@ func (n *AlterTenantReplication) walkStmt(v Visitor) Statement { ret.Options.Retention = e } } + if n.Options.ResumeTimestamp != nil { + e, changed := WalkExpr(v, n.Options.ResumeTimestamp) + if changed { + if ret == n { + ret = n.copyNode() + } + ret.Options.ResumeTimestamp = e + } + } + return ret } @@ -1104,6 +1114,16 @@ func (n *CreateTenantFromReplication) walkStmt(v Visitor) Statement { ret.Options.Retention = e } } + if n.Options.ResumeTimestamp != nil { + e, changed := WalkExpr(v, n.Options.ResumeTimestamp) + if changed { + if ret == n { + ret = n.copyNode() + } + ret.Options.ResumeTimestamp = e + } + } + if n.Like.OtherTenant != nil { ts, changed := walkTenantSpec(v, n.TenantSpec) if changed { diff --git a/pkg/sql/tenant_update.go b/pkg/sql/tenant_update.go index b34d2758de57..c02cefc6d8ea 100644 --- a/pkg/sql/tenant_update.go +++ b/pkg/sql/tenant_update.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" @@ -236,6 +237,7 @@ func (p *planner) setTenantService( } info.ServiceMode = newMode + info.LastRevertTenantTimestamp = hlc.Timestamp{} return UpdateTenantRecord(ctx, p.ExecCfg().Settings, p.InternalSQLTxn(), info) }