Skip to content

Commit

Permalink
jobs: produce SHOW JOBS without reading legacy info row
Browse files Browse the repository at this point in the history
Release note (sql change): SHOW JOBS is now based on a new mechanism for storing information about the progress and status of running jobs.
Epic: none.
  • Loading branch information
dt committed Jan 15, 2025
1 parent ae1da61 commit ef17976
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 65 deletions.
6 changes: 3 additions & 3 deletions pkg/bench/rttanalysis/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@ func init() {

fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) (SELECT id, 'legacy_payload', '\\x%s' FROM generate_series(1000, 3000) as id)",
hex.EncodeToString(payloadBytes)),
"INSERT INTO system.jobs(id, status, created, job_type) (SELECT id, 'succeeded', now(), 'IMPORT' FROM generate_series(1000, 3000) as id)",
"INSERT INTO system.jobs(id, status, created, job_type, owner) (SELECT id, 'succeeded', now(), 'IMPORT', 'test' FROM generate_series(1000, 3000) as id)",

// Job 3001 is a RUNNING job. We've marked it as
// claimed and added run stats that likely prevent it
// from being meaninfully used during the duration of
// the test.
fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) VALUES (3001, 'legacy_progress', '\\x%s')", hex.EncodeToString(progressBytes)),
fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) VALUES (3001, 'legacy_payload', '\\x%s')", hex.EncodeToString(payloadBytes)),
`INSERT INTO system.jobs(id, status, created, last_run, num_runs, job_type, claim_instance_id, claim_session_id) VALUES (3001, 'running', now(), now(), 200, 'IMPORT',
`INSERT INTO system.jobs(id, status, created, last_run, num_runs, job_type, owner, claim_instance_id, claim_session_id) VALUES (3001, 'running', now(), now(), 200, 'IMPORT', 'root',
(SELECT id FROM system.sql_instances WHERE session_id IS NOT NULL ORDER BY id LIMIT 1),
(SELECT session_id FROM system.sql_instances WHERE session_id IS NOT NULL ORDER BY id LIMIT 1))`,

// Job 3002 is a PAUSED job.
fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) VALUES (3002, 'legacy_progress', '\\x%s')", hex.EncodeToString(progressBytes)),
fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) VALUES (3002, 'legacy_payload', '\\x%s')", hex.EncodeToString(payloadBytes)),
`INSERT INTO system.jobs(id, status, created, last_run, num_runs, job_type, claim_instance_id, claim_session_id) VALUES (3002, 'paused', now(), now(), 200, 'IMPORT',
`INSERT INTO system.jobs(id, status, created, last_run, num_runs, job_type, owner, claim_instance_id, claim_session_id) VALUES (3002, 'paused', now(), now(), 200, 'IMPORT', 'root',
(SELECT id FROM system.sql_instances WHERE session_id IS NOT NULL ORDER BY id LIMIT 1),
(SELECT session_id FROM system.sql_instances WHERE session_id IS NOT NULL ORDER BY id LIMIT 1))`,
`ANALYZE system.jobs`,
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3404,7 +3404,7 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e
case *string:
s, ok := tree.AsDString(src)
if !ok {
return errors.Errorf("source type assertion failed")
return errors.Errorf("source type assertion failed %d %T", index, src)
}
*d = string(s)

Expand Down
8 changes: 4 additions & 4 deletions pkg/server/application_api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func TestAdminAPIJobs(t *testing.T) {
t.Fatal(err)
}
sqlDB.Exec(t,
`INSERT INTO system.jobs (id, status, num_runs, last_run, job_type) VALUES ($1, $2, $3, $4, $5)`,
job.id, job.status, job.numRuns, job.lastRun, payload.Type().String(),
`INSERT INTO system.jobs (id, status, num_runs, last_run, job_type, owner) VALUES ($1, $2, $3, $4, $5, $6)`,
job.id, job.status, job.numRuns, job.lastRun, payload.Type().String(), payload.UsernameProto.Decode().Normalized(),
)
sqlDB.Exec(t,
`INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`,
Expand Down Expand Up @@ -362,8 +362,8 @@ func TestAdminAPIJobsDetails(t *testing.T) {
t.Fatal(err)
}
sqlDB.Exec(t,
`INSERT INTO system.jobs (id, status, num_runs, last_run) VALUES ($1, $2, $3, $4)`,
job.id, job.status, job.numRuns, job.lastRun,
`INSERT INTO system.jobs (id, status, num_runs, last_run, job_type) VALUES ($1, $2, $3, $4, $5)`,
job.id, job.status, job.numRuns, job.lastRun, payload.Type().String(),
)
sqlDB.Exec(t,
`INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`,
Expand Down
220 changes: 209 additions & 11 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,14 +1139,14 @@ func wrapPayloadUnMarshalError(err error, jobID tree.Datum) error {
}

const (
jobsQuery = `SELECT id, status, created::timestamptz, payload, progress, claim_session_id, claim_instance_id FROM crdb_internal.system_jobs`
jobsQuery = `SELECT id, status, created::timestamptz, payload, progress, claim_session_id, claim_instance_id FROM crdb_internal.system_jobs j`
// Note that we are querying crdb_internal.system_jobs instead of system.jobs directly.
// The former has access control built in and will filter out jobs that the
// user is not allowed to see.
jobsQFrom = ` `
jobIDFilter = ` WHERE id = $1`
jobsStatusFilter = ` WHERE status = $1`
jobsTypeFilter = ` WHERE job_type = $1`
jobIDFilter = ` WHERE j.id = $1`
jobsStatusFilter = ` WHERE j.status = $1`
jobsTypeFilter = ` WHERE j.job_type = $1`
)

// TODO(tbg): prefix with kv_.
Expand Down Expand Up @@ -1179,32 +1179,55 @@ CREATE TABLE crdb_internal.jobs (
comment: `decoded job metadata from crdb_internal.system_jobs (KV scan)`,
indexes: []virtualIndex{{
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
q := jobsQuery + jobIDFilter
targetID := tree.MustBeDInt(unwrappedConstraint)
return makeJobsTableRows(ctx, p, addRow, q, targetID)
return makeJobsTableRows(ctx, p, addRow, jobIDFilter, targetID)
},
}, {
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
q := jobsQuery + jobsStatusFilter
targetStatus := tree.MustBeDString(unwrappedConstraint)
return makeJobsTableRows(ctx, p, addRow, q, targetStatus)
return makeJobsTableRows(ctx, p, addRow, jobsStatusFilter, targetStatus)
},
}, {
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
q := jobsQuery + jobsTypeFilter
targetType := tree.MustBeDString(unwrappedConstraint)
return makeJobsTableRows(ctx, p, addRow, q, targetType)
return makeJobsTableRows(ctx, p, addRow, jobsTypeFilter, targetType)
},
}},
populate: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
_, err := makeJobsTableRows(ctx, p, addRow, jobsQuery)
_, err := makeJobsTableRows(ctx, p, addRow, "")
return err
},
}

var useOldJobsVTable = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"sql.jobs.legacy_vtable.enabled",
"cause the crdb_internal.jobs vtable to be produced from the legacy payload info records",
false, // TODO(dt): flip this once we add permissive auth checks.
)

// makeJobsTableRows calls addRow for each job. It returns true if addRow was called
// successfully at least once.
func makeJobsTableRows(
ctx context.Context,
p *planner,
addRow func(...tree.Datum) error,
queryFilterSuffix string,
params ...interface{},
) (matched bool, err error) {

v, err := p.InternalSQLTxn().GetSystemSchemaVersion(ctx)
if err != nil {
return false, err
}
if !v.AtLeast(clusterversion.V25_1.Version()) || useOldJobsVTable.Get(&p.EvalContext().Settings.SV) {
query := jobsQuery + queryFilterSuffix
return makeLegacyJobsTableRows(ctx, p, addRow, query, params...)
}
return makeJobBasedJobsTableRows(ctx, p, addRow, queryFilterSuffix, params...)
}

func makeLegacyJobsTableRows(
ctx context.Context,
p *planner,
addRow func(...tree.Datum) error,
Expand Down Expand Up @@ -1393,6 +1416,181 @@ func makeJobsTableRows(
}
}

var enablePerJobDetailedAuthLookups = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"sql.jobs.legacy_per_job_access_via_details.enabled",
"enables granting additional access to jobs beyond owners and roles based on the specific details (tables being watched/backed up/etc) of the individual jobs (may make SHOW JOBS less performant)",
true,
)

var errLegacyPerJobAuthDisabledSentinel = pgerror.Newf(pgcode.InsufficientPrivilege, "legacy job access based on details is disabled")

func makeJobBasedJobsTableRows(
ctx context.Context,
p *planner,
addRow func(...tree.Datum) error,
whereClause string,
params ...interface{},
) (emitted bool, retErr error) {
globalPrivileges, err := jobsauth.GetGlobalJobPrivileges(ctx, p)
if err != nil {
return false, err
}

query := `SELECT
j.id, j.job_type, coalesce(j.description, ''), coalesce(j.owner, ''), j.status as state,
s.status, j.created::timestamptz, j.finished, greatest(j.created, j.finished, p.written, s.written)::timestamptz AS last_modified,
p.fraction,
p.resolved,
j.error_msg,
j.claim_instance_id
FROM system.public.jobs AS j
LEFT OUTER JOIN system.public.job_progress AS p ON j.id = p.job_id
LEFT OUTER JOIN system.public.job_status AS s ON j.id = s.job_id
` + whereClause

it, err := p.InternalSQLTxn().QueryIteratorEx(
ctx, "system-jobs-join", p.txn, sessiondata.NodeUserSessionDataOverride, query, params...)
if err != nil {
return emitted, err
}
defer func() {
if err := it.Close(); err != nil {
retErr = errors.CombineErrors(retErr, err)
}
}()

sessionJobs := make([]*jobs.Record, 0, p.extendedEvalCtx.jobs.numToCreate())
uniqueJobs := make(map[*jobs.Record]struct{})
if err := p.extendedEvalCtx.jobs.forEachToCreate(func(job *jobs.Record) error {
if _, ok := uniqueJobs[job]; ok {
return nil
}
sessionJobs = append(sessionJobs, job)
uniqueJobs[job] = struct{}{}
return nil
}); err != nil {
return emitted, err
}

// Loop while we need to skip a row.
for {
ok, err := it.Next(ctx)
if err != nil {
return emitted, err
}
// We will read the columns from the query on joined jobs tables into a wide
// row, and then copy the values from read rows into named variables to then
// use when emitting our output row. If we need to synthesize rows for jobs
// pending creation in the session, we'll do so in those same named vars to
// keep things organized.
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12
var id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID tree.Datum

if ok {
r := it.Cur()
id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID =
r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[10], r[11], r[12]

owner := username.MakeSQLUsernameFromPreNormalizedString(string(tree.MustBeDString(ownerStr)))
jobID := jobspb.JobID(tree.MustBeDInt(id))
typ, err := jobspb.TypeFromString(string(tree.MustBeDString(typStr)))
if err != nil {
return emitted, err
}

getLegacyPayloadForAuth := func(ctx context.Context) (*jobspb.Payload, error) {
if !enablePerJobDetailedAuthLookups.Get(&p.EvalContext().Settings.SV) {
return nil, errLegacyPerJobAuthDisabledSentinel
}
if p.EvalContext().Settings.Version.IsActive(ctx, clusterversion.V25_1) {
log.Warningf(ctx, "extended job access control based on job-specific details is deprecated and can make SHOW JOBS less performant; consider disabling %s",
enablePerJobDetailedAuthLookups.Name())
p.BufferClientNotice(ctx,
pgnotice.Newf("extended job access control based on job-specific details has been deprecated and can make SHOW JOBS less performant; consider disabling %s",
enablePerJobDetailedAuthLookups.Name()))
}
payload := &jobspb.Payload{}
infoStorage := jobs.InfoStorageForJob(p.InternalSQLTxn(), jobID)
payloadBytes, exists, err := infoStorage.GetLegacyPayload(ctx, "getLegacyPayload-for-custom-auth")
if err != nil {
return nil, err
}
if !exists {
return nil, errors.New("job payload not found in system.job_info")
}
if err := protoutil.Unmarshal(payloadBytes, payload); err != nil {
return nil, err
}
return payload, nil
}
if errorMsg == tree.DNull {
errorMsg = emptyString
}

if err := jobsauth.Authorize(
ctx, p, jobID, getLegacyPayloadForAuth, owner, typ, jobsauth.ViewAccess, globalPrivileges,
); err != nil {
// Filter out jobs which the user is not allowed to see.
if IsInsufficientPrivilegeError(err) {
continue
}
return emitted, err
}
} else if !ok {
if len(sessionJobs) == 0 {
return emitted, nil
}
job := sessionJobs[len(sessionJobs)-1]
sessionJobs = sessionJobs[:len(sessionJobs)-1]
payloadType, err := jobspb.DetailsType(jobspb.WrapPayloadDetails(job.Details))
if err != nil {
return emitted, err
}
// synthesize the fields we'd read from the jobs table if this job were in it.
id, typStr, desc, ownerStr, state, status, created, finished, modified, fraction, resolved, errorMsg, instanceID =
tree.NewDInt(tree.DInt(job.JobID)),
tree.NewDString(payloadType.String()),
tree.NewDString(job.Description),
tree.NewDString(job.Username.Normalized()),
tree.NewDString(string(jobs.StatusPending)),
tree.DNull,
tree.MustMakeDTimestampTZ(p.txn.ReadTimestamp().GoTime(), time.Microsecond),
tree.DNull,
tree.MustMakeDTimestampTZ(p.txn.ReadTimestamp().GoTime(), time.Microsecond),
tree.NewDFloat(tree.DFloat(0)),
tree.DZeroDecimal,
tree.DNull,
tree.NewDInt(tree.DInt(p.extendedEvalCtx.ExecCfg.JobRegistry.ID()))
}

if err = addRow(
id,
typStr,
desc,
desc,
ownerStr,
tree.DNull, // deperecated "descriptor_ids"
state,
status,
created,
created, // deprecated "started" field.
finished,
modified,
fraction,
resolved,
errorMsg,
instanceID,
tree.DNull, // deprecated "trace_id" field.
tree.DNull, // deprecated "executionErrors" field.
tree.DNull, // deprecated "executionEvents" field.
); err != nil {
return emitted, err
}
emitted = true
}
}

const crdbInternalKVProtectedTSTableQuery = `
SELECT id, ts, meta_type, meta, num_spans, spans, verified, target,
crdb_internal.pb_to_json(
Expand Down
42 changes: 1 addition & 41 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,46 +1357,6 @@ func TestInternalSystemJobsTableMirrorsSystemJobsTable(t *testing.T) {
// TODO(adityamaru): add checks for payload and progress
}

// TestCorruptPayloadError asserts that we can an error
// with the correct hint when we fail to decode a payload.
func TestCorruptPayloadError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
// Because this test modifies system.jobs and asserts its contents,
// we should disable jobs from being adopted and disable automatic jobs
// from being created.
JobsTestingKnobs: &jobs.TestingKnobs{
DisableAdoptions: true,
},
// DisableAdoptions needs this.
UpgradeManager: &upgradebase.TestingKnobs{
DontUseJobs: true,
},
SpanConfig: &spanconfig.TestingKnobs{
ManagerDisableJobCreation: true,
},
},
})
ctx := context.Background()
defer s.Stopper().Stop(ctx)
tdb := sqlutils.MakeSQLRunner(db)

tdb.Exec(t,
"INSERT INTO system.jobs (id, status, created) values ($1, $2, $3)",
1, jobs.StatusRunning, timeutil.Now(),
)
tdb.Exec(t,
"INSERT INTO system.job_info (job_id, info_key, value) values ($1, $2, $3)",
1, jobs.GetLegacyPayloadKey(), []byte("invalid payload"),
)

tdb.ExpectErrWithHint(t, "proto", "could not decode the payload for job 1. consider deleting this job from system.jobs", "SELECT * FROM crdb_internal.system_jobs")
tdb.ExpectErrWithHint(t, "proto", "could not decode the payload for job 1. consider deleting this job from system.jobs", "SELECT * FROM crdb_internal.jobs")
}

// TestInternalSystemJobsAccess asserts which entries a user can query
// based on their grants and role options.
func TestInternalSystemJobsAccess(t *testing.T) {
Expand Down Expand Up @@ -1514,7 +1474,7 @@ func TestVirtualTableDoesntHangOnQueryCanceledError(t *testing.T) {
return nil
}
opName, ok := sql.GetInternalOpName(ctx)
if !ok || opName != "system-jobs-scan" {
if !ok || !(opName == "system-jobs-scan" || opName == "system-jobs-join") {
return nil
}
numCallbacksAdded.Add(1)
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6302,12 +6302,9 @@ func TestRollbackForeignKeyAddition(t *testing.T) {

var jobID jobspb.JobID

// We filter by descriptor_ids because there's a bug where we create an extra
// We filter by running because there's a bug where we create an extra
// no-op job for the referenced table (#57624).
require.NoError(t, sqlDB.QueryRow(`
SELECT job_id FROM crdb_internal.jobs WHERE description LIKE '%ALTER TABLE%'
AND descriptor_ids[1] = 'db.t2'::regclass::int`,
).Scan(&jobID))
require.NoError(t, sqlDB.QueryRow(`SELECT job_id FROM crdb_internal.jobs WHERE description LIKE '%ALTER TABLE%' AND status = 'running'`).Scan(&jobID))
tdb.Exec(t, "CANCEL JOB $1", jobID)

close(continueNotification)
Expand Down

0 comments on commit ef17976

Please sign in to comment.