diff --git a/pkg/bench/rttanalysis/jobs_test.go b/pkg/bench/rttanalysis/jobs_test.go index d6863b18f264..2ff19902c642 100644 --- a/pkg/bench/rttanalysis/jobs_test.go +++ b/pkg/bench/rttanalysis/jobs_test.go @@ -41,7 +41,7 @@ func init() { fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) (SELECT id, 'legacy_payload', '\\x%s' FROM generate_series(1000, 3000) as id)", hex.EncodeToString(payloadBytes)), - "INSERT INTO system.jobs(id, status, created, job_type) (SELECT id, 'succeeded', now(), 'IMPORT' FROM generate_series(1000, 3000) as id)", + "INSERT INTO system.jobs(id, status, created, job_type, owner) (SELECT id, 'succeeded', now(), 'IMPORT', 'test' FROM generate_series(1000, 3000) as id)", // Job 3001 is a RUNNING job. We've marked it as // claimed and added run stats that likely prevent it @@ -49,14 +49,14 @@ func init() { // the test. fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) VALUES (3001, 'legacy_progress', '\\x%s')", hex.EncodeToString(progressBytes)), fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) VALUES (3001, 'legacy_payload', '\\x%s')", hex.EncodeToString(payloadBytes)), - `INSERT INTO system.jobs(id, status, created, last_run, num_runs, job_type, claim_instance_id, claim_session_id) VALUES (3001, 'running', now(), now(), 200, 'IMPORT', + `INSERT INTO system.jobs(id, status, created, last_run, num_runs, job_type, owner, claim_instance_id, claim_session_id) VALUES (3001, 'running', now(), now(), 200, 'IMPORT', 'root', (SELECT id FROM system.sql_instances WHERE session_id IS NOT NULL ORDER BY id LIMIT 1), (SELECT session_id FROM system.sql_instances WHERE session_id IS NOT NULL ORDER BY id LIMIT 1))`, // Job 3002 is a PAUSED job. fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) VALUES (3002, 'legacy_progress', '\\x%s')", hex.EncodeToString(progressBytes)), fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) VALUES (3002, 'legacy_payload', '\\x%s')", hex.EncodeToString(payloadBytes)), - `INSERT INTO system.jobs(id, status, created, last_run, num_runs, job_type, claim_instance_id, claim_session_id) VALUES (3002, 'paused', now(), now(), 200, 'IMPORT', + `INSERT INTO system.jobs(id, status, created, last_run, num_runs, job_type, owner, claim_instance_id, claim_session_id) VALUES (3002, 'paused', now(), now(), 200, 'IMPORT', 'root', (SELECT id FROM system.sql_instances WHERE session_id IS NOT NULL ORDER BY id LIMIT 1), (SELECT session_id FROM system.sql_instances WHERE session_id IS NOT NULL ORDER BY id LIMIT 1))`, `ANALYZE system.jobs`, diff --git a/pkg/server/admin.go b/pkg/server/admin.go index d6e4504f3973..7d1dbe14dd6d 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -3404,7 +3404,7 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e case *string: s, ok := tree.AsDString(src) if !ok { - return errors.Errorf("source type assertion failed") + return errors.Errorf("source type assertion failed %d %T", index, src) } *d = string(s) diff --git a/pkg/server/application_api/jobs_test.go b/pkg/server/application_api/jobs_test.go index 35de5200433d..6956d2230612 100644 --- a/pkg/server/application_api/jobs_test.go +++ b/pkg/server/application_api/jobs_test.go @@ -172,8 +172,8 @@ func TestAdminAPIJobs(t *testing.T) { t.Fatal(err) } sqlDB.Exec(t, - `INSERT INTO system.jobs (id, status, num_runs, last_run, job_type) VALUES ($1, $2, $3, $4, $5)`, - job.id, job.status, job.numRuns, job.lastRun, payload.Type().String(), + `INSERT INTO system.jobs (id, status, num_runs, last_run, job_type, owner) VALUES ($1, $2, $3, $4, $5, $6)`, + job.id, job.status, job.numRuns, job.lastRun, payload.Type().String(), payload.UsernameProto.Decode().Normalized(), ) sqlDB.Exec(t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, @@ -362,8 +362,8 @@ func TestAdminAPIJobsDetails(t *testing.T) { t.Fatal(err) } sqlDB.Exec(t, - `INSERT INTO system.jobs (id, status, num_runs, last_run) VALUES ($1, $2, $3, $4)`, - job.id, job.status, job.numRuns, job.lastRun, + `INSERT INTO system.jobs (id, status, num_runs, last_run, job_type) VALUES ($1, $2, $3, $4, $5)`, + job.id, job.status, job.numRuns, job.lastRun, payload.Type().String(), ) sqlDB.Exec(t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 52840ff1c7cc..d8eb8d444c77 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1139,14 +1139,14 @@ func wrapPayloadUnMarshalError(err error, jobID tree.Datum) error { } const ( - jobsQuery = `SELECT id, status, created::timestamptz, payload, progress, claim_session_id, claim_instance_id FROM crdb_internal.system_jobs` + jobsQuery = `SELECT id, status, created::timestamptz, payload, progress, claim_session_id, claim_instance_id FROM crdb_internal.system_jobs j` // Note that we are querying crdb_internal.system_jobs instead of system.jobs directly. // The former has access control built in and will filter out jobs that the // user is not allowed to see. jobsQFrom = ` ` - jobIDFilter = ` WHERE id = $1` - jobsStatusFilter = ` WHERE status = $1` - jobsTypeFilter = ` WHERE job_type = $1` + jobIDFilter = ` WHERE j.id = $1` + jobsStatusFilter = ` WHERE j.status = $1` + jobsTypeFilter = ` WHERE j.job_type = $1` ) // TODO(tbg): prefix with kv_. @@ -1179,32 +1179,55 @@ CREATE TABLE crdb_internal.jobs ( comment: `decoded job metadata from crdb_internal.system_jobs (KV scan)`, indexes: []virtualIndex{{ populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) { - q := jobsQuery + jobIDFilter targetID := tree.MustBeDInt(unwrappedConstraint) - return makeJobsTableRows(ctx, p, addRow, q, targetID) + return makeJobsTableRows(ctx, p, addRow, jobIDFilter, targetID) }, }, { populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) { - q := jobsQuery + jobsStatusFilter targetStatus := tree.MustBeDString(unwrappedConstraint) - return makeJobsTableRows(ctx, p, addRow, q, targetStatus) + return makeJobsTableRows(ctx, p, addRow, jobsStatusFilter, targetStatus) }, }, { populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) { - q := jobsQuery + jobsTypeFilter targetType := tree.MustBeDString(unwrappedConstraint) - return makeJobsTableRows(ctx, p, addRow, q, targetType) + return makeJobsTableRows(ctx, p, addRow, jobsTypeFilter, targetType) }, }}, populate: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { - _, err := makeJobsTableRows(ctx, p, addRow, jobsQuery) + _, err := makeJobsTableRows(ctx, p, addRow, "") return err }, } +var useOldJobsVTable = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "sql.jobs.legacy_vtable.enabled", + "cause the crdb_internal.jobs vtable to be produced from the legacy payload info records", + false, // TODO(dt): flip this once we add permissive auth checks. +) + // makeJobsTableRows calls addRow for each job. It returns true if addRow was called // successfully at least once. func makeJobsTableRows( + ctx context.Context, + p *planner, + addRow func(...tree.Datum) error, + queryFilterSuffix string, + params ...interface{}, +) (matched bool, err error) { + + v, err := p.InternalSQLTxn().GetSystemSchemaVersion(ctx) + if err != nil { + return false, err + } + if !v.AtLeast(clusterversion.V25_1.Version()) || useOldJobsVTable.Get(&p.EvalContext().Settings.SV) { + query := jobsQuery + queryFilterSuffix + return makeLegacyJobsTableRows(ctx, p, addRow, query, params...) + } + return makeJobBasedJobsTableRows(ctx, p, addRow, queryFilterSuffix, params...) +} + +func makeLegacyJobsTableRows( ctx context.Context, p *planner, addRow func(...tree.Datum) error, @@ -1393,6 +1416,181 @@ func makeJobsTableRows( } } +var enablePerJobDetailedAuthLookups = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "sql.jobs.legacy_per_job_access_via_details.enabled", + "enables granting additional access to jobs beyond owners and roles based on the specific details (tables being watched/backed up/etc) of the individual jobs (may make SHOW JOBS less performant)", + true, +) + +var errLegacyPerJobAuthDisabledSentinel = pgerror.Newf(pgcode.InsufficientPrivilege, "legacy job access based on details is disabled") + +func makeJobBasedJobsTableRows( + ctx context.Context, + p *planner, + addRow func(...tree.Datum) error, + whereClause string, + params ...interface{}, +) (emitted bool, retErr error) { + globalPrivileges, err := jobsauth.GetGlobalJobPrivileges(ctx, p) + if err != nil { + return false, err + } + + query := `SELECT +j.id, j.job_type, coalesce(j.description, ''), coalesce(j.owner, ''), j.status as state, +s.status, j.created::timestamptz, j.finished, greatest(j.created, j.finished, p.written, s.written)::timestamptz AS last_modified, +p.fraction, +p.resolved, +j.error_msg, +j.claim_instance_id +FROM system.public.jobs AS j +LEFT OUTER JOIN system.public.job_progress AS p ON j.id = p.job_id +LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id + ` + whereClause + + it, err := p.InternalSQLTxn().QueryIteratorEx( + ctx, "system-jobs-join", p.txn, sessiondata.NodeUserSessionDataOverride, query, params...) + if err != nil { + return emitted, err + } + defer func() { + if err := it.Close(); err != nil { + retErr = errors.CombineErrors(retErr, err) + } + }() + + sessionJobs := make([]*jobs.Record, 0, p.extendedEvalCtx.jobs.numToCreate()) + uniqueJobs := make(map[*jobs.Record]struct{}) + if err := p.extendedEvalCtx.jobs.forEachToCreate(func(job *jobs.Record) error { + if _, ok := uniqueJobs[job]; ok { + return nil + } + sessionJobs = append(sessionJobs, job) + uniqueJobs[job] = struct{}{} + return nil + }); err != nil { + return emitted, err + } + + // Loop while we need to skip a row. + for { + ok, err := it.Next(ctx) + if err != nil { + return emitted, err + } + // We will read the columns from the query on joined jobs tables into a wide + // row, and then copy the values from read rows into named variables to then + // use when emitting our output row. If we need to synthesize rows for jobs + // pending creation in the session, we'll do so in those same named vars to + // keep things organized. + // 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 + var id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID tree.Datum + + if ok { + r := it.Cur() + id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID = + r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[10], r[11], r[12] + + owner := username.MakeSQLUsernameFromPreNormalizedString(string(tree.MustBeDString(ownerStr))) + jobID := jobspb.JobID(tree.MustBeDInt(id)) + typ, err := jobspb.TypeFromString(string(tree.MustBeDString(typStr))) + if err != nil { + return emitted, err + } + + getLegacyPayloadForAuth := func(ctx context.Context) (*jobspb.Payload, error) { + if !enablePerJobDetailedAuthLookups.Get(&p.EvalContext().Settings.SV) { + return nil, errLegacyPerJobAuthDisabledSentinel + } + if p.EvalContext().Settings.Version.IsActive(ctx, clusterversion.V25_1) { + log.Warningf(ctx, "extended job access control based on job-specific details is deprecated and can make SHOW JOBS less performant; consider disabling %s", + enablePerJobDetailedAuthLookups.Name()) + p.BufferClientNotice(ctx, + pgnotice.Newf("extended job access control based on job-specific details has been deprecated and can make SHOW JOBS less performant; consider disabling %s", + enablePerJobDetailedAuthLookups.Name())) + } + payload := &jobspb.Payload{} + infoStorage := jobs.InfoStorageForJob(p.InternalSQLTxn(), jobID) + payloadBytes, exists, err := infoStorage.GetLegacyPayload(ctx, "getLegacyPayload-for-custom-auth") + if err != nil { + return nil, err + } + if !exists { + return nil, errors.New("job payload not found in system.job_info") + } + if err := protoutil.Unmarshal(payloadBytes, payload); err != nil { + return nil, err + } + return payload, nil + } + if errorMsg == tree.DNull { + errorMsg = emptyString + } + + if err := jobsauth.Authorize( + ctx, p, jobID, getLegacyPayloadForAuth, owner, typ, jobsauth.ViewAccess, globalPrivileges, + ); err != nil { + // Filter out jobs which the user is not allowed to see. + if IsInsufficientPrivilegeError(err) { + continue + } + return emitted, err + } + } else if !ok { + if len(sessionJobs) == 0 { + return emitted, nil + } + job := sessionJobs[len(sessionJobs)-1] + sessionJobs = sessionJobs[:len(sessionJobs)-1] + payloadType, err := jobspb.DetailsType(jobspb.WrapPayloadDetails(job.Details)) + if err != nil { + return emitted, err + } + // synthesize the fields we'd read from the jobs table if this job were in it. + id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID = + tree.NewDInt(tree.DInt(job.JobID)), + tree.NewDString(payloadType.String()), + tree.NewDString(job.Description), + tree.NewDString(job.Username.Normalized()), + tree.NewDString(string(jobs.StatusPending)), + tree.DNull, + tree.MustMakeDTimestampTZ(p.txn.ReadTimestamp().GoTime(), time.Microsecond), + tree.DNull, + tree.MustMakeDTimestampTZ(p.txn.ReadTimestamp().GoTime(), time.Microsecond), + tree.NewDFloat(tree.DFloat(0)), + tree.DZeroDecimal, + tree.DNull, + tree.NewDInt(tree.DInt(p.extendedEvalCtx.ExecCfg.JobRegistry.ID())) + } + + if err = addRow( + id, + typStr, + desc, + desc, + ownerStr, + tree.DNull, // deperecated "descriptor_ids" + state, + status, + created, + created, // deprecated "started" field. + finished, + modified, + fraction, + resolved, + errorMsg, + instanceID, + tree.DNull, // deprecated "trace_id" field. + tree.DNull, // deprecated "executionErrors" field. + tree.DNull, // deprecated "executionEvents" field. + ); err != nil { + return emitted, err + } + emitted = true + } +} + const crdbInternalKVProtectedTSTableQuery = ` SELECT id, ts, meta_type, meta, num_spans, spans, verified, target, crdb_internal.pb_to_json( diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go index fc72b5e11cf9..1967060ae288 100644 --- a/pkg/sql/crdb_internal_test.go +++ b/pkg/sql/crdb_internal_test.go @@ -1357,46 +1357,6 @@ func TestInternalSystemJobsTableMirrorsSystemJobsTable(t *testing.T) { // TODO(adityamaru): add checks for payload and progress } -// TestCorruptPayloadError asserts that we can an error -// with the correct hint when we fail to decode a payload. -func TestCorruptPayloadError(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ - Knobs: base.TestingKnobs{ - // Because this test modifies system.jobs and asserts its contents, - // we should disable jobs from being adopted and disable automatic jobs - // from being created. - JobsTestingKnobs: &jobs.TestingKnobs{ - DisableAdoptions: true, - }, - // DisableAdoptions needs this. - UpgradeManager: &upgradebase.TestingKnobs{ - DontUseJobs: true, - }, - SpanConfig: &spanconfig.TestingKnobs{ - ManagerDisableJobCreation: true, - }, - }, - }) - ctx := context.Background() - defer s.Stopper().Stop(ctx) - tdb := sqlutils.MakeSQLRunner(db) - - tdb.Exec(t, - "INSERT INTO system.jobs (id, status, created) values ($1, $2, $3)", - 1, jobs.StatusRunning, timeutil.Now(), - ) - tdb.Exec(t, - "INSERT INTO system.job_info (job_id, info_key, value) values ($1, $2, $3)", - 1, jobs.GetLegacyPayloadKey(), []byte("invalid payload"), - ) - - tdb.ExpectErrWithHint(t, "proto", "could not decode the payload for job 1. consider deleting this job from system.jobs", "SELECT * FROM crdb_internal.system_jobs") - tdb.ExpectErrWithHint(t, "proto", "could not decode the payload for job 1. consider deleting this job from system.jobs", "SELECT * FROM crdb_internal.jobs") -} - // TestInternalSystemJobsAccess asserts which entries a user can query // based on their grants and role options. func TestInternalSystemJobsAccess(t *testing.T) { @@ -1514,7 +1474,7 @@ func TestVirtualTableDoesntHangOnQueryCanceledError(t *testing.T) { return nil } opName, ok := sql.GetInternalOpName(ctx) - if !ok || opName != "system-jobs-scan" { + if !ok || !(opName == "system-jobs-scan" || opName == "system-jobs-join") { return nil } numCallbacksAdded.Add(1) diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 2e185d0079da..bbe2f5ca2cee 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -6302,12 +6302,9 @@ func TestRollbackForeignKeyAddition(t *testing.T) { var jobID jobspb.JobID - // We filter by descriptor_ids because there's a bug where we create an extra + // We filter by running because there's a bug where we create an extra // no-op job for the referenced table (#57624). - require.NoError(t, sqlDB.QueryRow(` -SELECT job_id FROM crdb_internal.jobs WHERE description LIKE '%ALTER TABLE%' -AND descriptor_ids[1] = 'db.t2'::regclass::int`, - ).Scan(&jobID)) + require.NoError(t, sqlDB.QueryRow(`SELECT job_id FROM crdb_internal.jobs WHERE description LIKE '%ALTER TABLE%' AND status = 'running'`).Scan(&jobID)) tdb.Exec(t, "CANCEL JOB $1", jobID) close(continueNotification)