Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optimistic lock to ActionRun table #26563

Merged
merged 10 commits into from
Aug 21, 2023
15 changes: 13 additions & 2 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type ActionRun struct {
EventPayload string `xorm:"LONGTEXT"`
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
Status Status `xorm:"index"`
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Expand Down Expand Up @@ -332,12 +333,22 @@ func GetRunByIndex(ctx context.Context, repoID, index int64) (*ActionRun, error)
return run, nil
}

// UpdateRun updates a run.
// It requires the inputted run has Version set.
// It will return error if the version is not matched (it means the run has been changed after loaded).
func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
sess := db.GetEngine(ctx).ID(run.ID)
if len(cols) > 0 {
sess.Cols(cols...)
}
_, err := sess.Update(run)
affected, err := sess.Update(run)
if err != nil {
return err
}
if affected == 0 {
return fmt.Errorf("run has changed")
// It's impossible that the run is not found, since Gitea never deletes runs.
}

if run.Status != 0 || util.SliceContains(cols, "status") {
if run.RepoID == 0 {
Expand All @@ -358,7 +369,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
}
}

return err
return nil
}

type ActionRunIndex db.ResourceIndex
39 changes: 24 additions & 15 deletions models/actions/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,32 +114,41 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col
if affected != 0 && util.SliceContains(cols, "status") && job.Status.IsWaiting() {
// if the status of job changes to waiting again, increase tasks version.
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
return affected, err
return 0, err
}
}

if job.RunID == 0 {
var err error
if job, err = GetRunJobByID(ctx, job.ID); err != nil {
return affected, err
return 0, err
}
}

jobs, err := GetRunJobsByRunID(ctx, job.RunID)
if err != nil {
return affected, err
{
// Other goroutines may aggregate the status of the run and update it too.
// So we need load the run and its jobs before updating the run.
run, err := GetRunByID(ctx, job.RunID)
if err != nil {
return 0, err
}
jobs, err := GetRunJobsByRunID(ctx, job.RunID)
if err != nil {
return 0, err
}
run.Status = aggregateJobStatus(jobs)
if run.Started.IsZero() && run.Status.IsRunning() {
run.Started = timeutil.TimeStampNow()
}
if run.Stopped.IsZero() && run.Status.IsDone() {
run.Stopped = timeutil.TimeStampNow()
}
if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil {
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
}
}

runStatus := aggregateJobStatus(jobs)

run := &ActionRun{
ID: job.RunID,
Status: runStatus,
}
if runStatus.IsDone() {
run.Stopped = timeutil.TimeStampNow()
}
return affected, UpdateRun(ctx, run)
return affected, nil
}

func aggregateJobStatus(jobs []*ActionRunJob) Status {
Expand Down
8 changes: 0 additions & 8 deletions models/actions/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,6 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
return nil, false, nil
}

if job.Run.Status.IsWaiting() {
job.Run.Status = StatusRunning
job.Run.Started = now
if err := UpdateRun(ctx, job.Run, "status", "started"); err != nil {
return nil, false, err
}
}

Comment on lines -320 to -327
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unnecessary, the previous UpdateRunJob will update the run.

task.Job = job

if err := commiter.Commit(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions models/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,8 @@ var migrations = []Migration{
NewMigration("Fix PackageProperty typo", v1_21.FixPackagePropertyTypo),
// v271 -> v272
NewMigration("Allow archiving labels", v1_21.AddArchivedUnixColumInLabelTable),
// v272 -> v273
NewMigration("Add Version to ActionRun table", v1_21.AddVersionToActionRunTable),
}

// GetCurrentDBVersion returns the current db version
Expand Down
14 changes: 14 additions & 0 deletions models/migrations/v1_21/v272.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package v1_21 //nolint
import (
"xorm.io/xorm"
)

func AddVersionToActionRunTable(x *xorm.Engine) error {
type ActionRun struct {
Version int `xorm:"version default 0"`
}
return x.Sync(new(ActionRun))
}