Skip to content

Commit

Permalink
db: improve transactions handling
Browse files Browse the repository at this point in the history
Since the transactions can be repeated (i.e. due to serializable errors
in postgres or locked errors in sqlite) we should ensure that the
objects are created inside the transaction or at the second execution
the transaction will fail since the object revision has been updated by
the previous one.

This patch adds a txID to every transaction that it's set on objects
fetched as an helper to ensure that the object is created/fetched inside
the same transaction.

To handle cases were the object is fetched/created outside the
transaction we MUST manually ensure that the object is still valid (the
tx logic must be self consistent) and only in such case restore the txID
to the current transactions and revision to the original value at the
start of the transaction.
  • Loading branch information
sgotti committed Dec 14, 2022
1 parent 0f7c541 commit 0d4c1ed
Show file tree
Hide file tree
Showing 42 changed files with 434 additions and 166 deletions.
5 changes: 3 additions & 2 deletions internal/generators/gen_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (d *DB) fetch{{ $oi.Name }}s(tx *sql.Tx, q sq.Sqlizer) ([]*types.{{ $oi.Nam
}
defer rows.Close()
return d.scan{{ $oi.Name }}s(rows)
return d.scan{{ $oi.Name }}s(rows, tx.ID())
}
func (d *DB) scan{{ $oi.Name }}(rows *stdsql.Rows, additionalFields []interface{}) (*types.{{ $oi.Name }}, string, error) {
Expand All @@ -85,7 +85,7 @@ func (d *DB) scan{{ $oi.Name }}(rows *stdsql.Rows, additionalFields []interface{
return &v, id, nil
}
func (d *DB) scan{{ $oi.Name }}s(rows *stdsql.Rows) ([]*types.{{ $oi.Name }}, []string, error) {
func (d *DB) scan{{ $oi.Name }}s(rows *stdsql.Rows, txID string) ([]*types.{{ $oi.Name }}, []string, error) {
cols, err := rows.Columns()
if err != nil {
return nil, nil, errors.WithStack(err)
Expand All @@ -112,6 +112,7 @@ func (d *DB) scan{{ $oi.Name }}s(rows *stdsql.Rows) ([]*types.{{ $oi.Name }}, []
rows.Close()
return nil, nil, errors.WithStack(err)
}
v.TxID = txID
vs = append(vs, v)
ids = append(ids, id)
}
Expand Down
8 changes: 8 additions & 0 deletions internal/generators/gen_insert_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (d *DB) Insert{{ $oi.Name }}(tx *sql.Tx, v *types.{{ $oi.Name }}) error {
return errors.Errorf("expected revision 0 got %d", v.Revision)
}
if v.TxID != tx.ID() {
return errors.Errorf("object was not created by this transaction")
}
data, err := d.insert{{ $oi.Name }}Data(tx, v)
if err != nil {
return errors.WithStack(err)
Expand Down Expand Up @@ -136,6 +140,10 @@ func (d *DB) update{{ $oi.Name }}Data(tx *sql.Tx, v *types.{{ $oi.Name }}) ([]by
return nil, errors.Errorf("expected revision > 0 got %d", v.Revision)
}
if v.TxID != tx.ID() {
return nil, errors.Errorf("object was not fetched by this transaction")
}
curRevision := v.Revision
v.Revision++
Expand Down
20 changes: 10 additions & 10 deletions internal/migration/configstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func MigrateConfigStore(ctx context.Context, r io.Reader, w io.Writer) error {
oldUserj, _ := json.Marshal(oldUser)
log.Debug().Msgf("oldUser: %s", oldUserj)

user := types.NewUser()
user := types.NewUser(newTx)
user.ID = oldUser.ID
user.Name = oldUser.Name
user.Secret = oldUser.Secret
Expand All @@ -90,7 +90,7 @@ func MigrateConfigStore(ctx context.Context, r io.Reader, w io.Writer) error {
}

for _, oldLA := range oldUser.LinkedAccounts {
la := types.NewLinkedAccount()
la := types.NewLinkedAccount(newTx)
// reuse old linked account id since it's referenced by project
la.ID = oldLA.ID
la.UserID = user.ID
Expand All @@ -110,7 +110,7 @@ func MigrateConfigStore(ctx context.Context, r io.Reader, w io.Writer) error {
}

for oldTokenName, oldTokenValue := range oldUser.Tokens {
userToken := types.NewUserToken()
userToken := types.NewUserToken(newTx)
// reuse old linked account id since it's referenced by project
userToken.UserID = user.ID
userToken.Name = oldTokenName
Expand All @@ -131,7 +131,7 @@ func MigrateConfigStore(ctx context.Context, r io.Reader, w io.Writer) error {
oldOrgj, _ := json.Marshal(oldOrg)
log.Debug().Msgf("oldOrg: %s", oldOrgj)

org := types.NewOrganization()
org := types.NewOrganization(newTx)
org.ID = oldOrg.ID
org.Name = oldOrg.Name
org.Visibility = types.Visibility(oldOrg.Visibility)
Expand All @@ -152,7 +152,7 @@ func MigrateConfigStore(ctx context.Context, r io.Reader, w io.Writer) error {
oldOrgMemberj, _ := json.Marshal(oldOrgMember)
log.Debug().Msgf("oldOrgMember: %s", oldOrgMemberj)

orgMember := types.NewOrganizationMember()
orgMember := types.NewOrganizationMember(newTx)
orgMember.ID = oldOrgMember.ID
orgMember.OrganizationID = oldOrgMember.OrganizationID
orgMember.UserID = oldOrgMember.UserID
Expand All @@ -172,7 +172,7 @@ func MigrateConfigStore(ctx context.Context, r io.Reader, w io.Writer) error {
oldProjectGroupj, _ := json.Marshal(oldProjectGroup)
log.Debug().Msgf("oldProjectGroup: %s", oldProjectGroupj)

projectGroup := types.NewProjectGroup()
projectGroup := types.NewProjectGroup(newTx)
projectGroup.ID = oldProjectGroup.ID
projectGroup.Name = oldProjectGroup.Name
projectGroup.Parent = types.Parent{
Expand All @@ -195,7 +195,7 @@ func MigrateConfigStore(ctx context.Context, r io.Reader, w io.Writer) error {
oldProjectj, _ := json.Marshal(oldProject)
log.Debug().Msgf("oldProject: %s", oldProjectj)

project := types.NewProject()
project := types.NewProject(newTx)
project.ID = oldProject.ID
project.Name = oldProject.Name
project.Parent = types.Parent{
Expand Down Expand Up @@ -229,7 +229,7 @@ func MigrateConfigStore(ctx context.Context, r io.Reader, w io.Writer) error {
oldRemoteSourcej, _ := json.Marshal(oldRemoteSource)
log.Debug().Msgf("oldRemoteSource: %s", oldRemoteSourcej)

remoteSource := types.NewRemoteSource()
remoteSource := types.NewRemoteSource(newTx)
remoteSource.ID = oldRemoteSource.ID
remoteSource.Name = oldRemoteSource.Name
remoteSource.APIURL = oldRemoteSource.APIURL
Expand Down Expand Up @@ -257,7 +257,7 @@ func MigrateConfigStore(ctx context.Context, r io.Reader, w io.Writer) error {
oldSecretj, _ := json.Marshal(oldSecret)
log.Debug().Msgf("oldSecret: %s", oldSecretj)

secret := types.NewSecret()
secret := types.NewSecret(newTx)
secret.ID = oldSecret.ID
secret.Name = oldSecret.Name
secret.Parent = types.Parent{
Expand All @@ -283,7 +283,7 @@ func MigrateConfigStore(ctx context.Context, r io.Reader, w io.Writer) error {
oldVariablej, _ := json.Marshal(oldVariable)
log.Debug().Msgf("oldVariable: %s", oldVariablej)

variable := types.NewVariable()
variable := types.NewVariable(newTx)
variable.ID = oldVariable.ID
variable.Name = oldVariable.Name
variable.Parent = types.Parent{
Expand Down
8 changes: 4 additions & 4 deletions internal/migration/runservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func MigrateRunService(ctx context.Context, r io.Reader, w io.Writer) error {

curNewRunSequence++

run := types.NewRun()
run := types.NewRun(newTx)
run.Sequence = curNewRunSequence
run.Name = oldRun.Name
run.Counter = oldRun.Counter
Expand Down Expand Up @@ -133,7 +133,7 @@ func MigrateRunService(ctx context.Context, r io.Reader, w io.Writer) error {
oldRunConfigj, _ := json.Marshal(oldRunConfig)
log.Debug().Msgf("oldRunConfig: %s", oldRunConfigj)

runConfig := types.NewRunConfig()
runConfig := types.NewRunConfig(newTx)
runConfig.Name = oldRunConfig.Name
runConfig.Group = oldRunConfig.Group
runConfig.SetupErrors = oldRunConfig.SetupErrors
Expand Down Expand Up @@ -175,7 +175,7 @@ func MigrateRunService(ctx context.Context, r io.Reader, w io.Writer) error {

log.Debug().Msgf("oldRunCounter: %d", oldRunCounter)

runCounter := types.NewRunCounter(de.ID)
runCounter := types.NewRunCounter(newTx, de.ID)
runCounter.Value = oldRunCounter

if err := newd.InsertRunCounter(newTx, runCounter); err != nil {
Expand All @@ -188,7 +188,7 @@ func MigrateRunService(ctx context.Context, r io.Reader, w io.Writer) error {
}

// Generate run sequence
runSequence := types.NewSequence(types.SequenceTypeRun)
runSequence := types.NewSequence(newTx, types.SequenceTypeRun)
runSequence.Value = curNewRunSequence
if err := newd.InsertSequence(newTx, runSequence); err != nil {
return errors.WithStack(err)
Expand Down
12 changes: 6 additions & 6 deletions internal/services/configstore/action/org.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (h *ActionHandler) CreateOrg(ctx context.Context, req *CreateOrgRequest) (*
}
}

org = types.NewOrganization()
org = types.NewOrganization(tx)
org.Name = req.Name
org.Visibility = req.Visibility
org.CreatorUserID = req.CreatorUserID
Expand All @@ -114,7 +114,7 @@ func (h *ActionHandler) CreateOrg(ctx context.Context, req *CreateOrgRequest) (*

if org.CreatorUserID != "" {
// add the creator as org member with role owner
orgmember := types.NewOrganizationMember()
orgmember := types.NewOrganizationMember(tx)
orgmember.OrganizationID = org.ID
orgmember.UserID = org.CreatorUserID
orgmember.MemberRole = types.MemberRoleOwner
Expand All @@ -125,7 +125,7 @@ func (h *ActionHandler) CreateOrg(ctx context.Context, req *CreateOrgRequest) (*
}

// create root org project group
pg := types.NewProjectGroup()
pg := types.NewProjectGroup(tx)
// use same org visibility
pg.Visibility = org.Visibility
pg.Parent = types.Parent{
Expand Down Expand Up @@ -283,7 +283,7 @@ func (h *ActionHandler) AddOrgMember(ctx context.Context, orgRef, userRef string
}
orgmember.MemberRole = role
} else {
orgmember = types.NewOrganizationMember()
orgmember = types.NewOrganizationMember(tx)
orgmember.OrganizationID = org.ID
orgmember.UserID = user.ID
orgmember.MemberRole = role
Expand Down Expand Up @@ -461,7 +461,7 @@ func (h *ActionHandler) CreateOrgInvitation(ctx context.Context, req *CreateOrgI
return util.NewAPIError(util.ErrBadRequest, errors.Errorf("invitation already exists"))
}

orgInvitation = types.NewOrgInvitation()
orgInvitation = types.NewOrgInvitation(tx)
orgInvitation.UserID = user.ID
orgInvitation.OrganizationID = org.ID
orgInvitation.Role = req.Role
Expand Down Expand Up @@ -557,7 +557,7 @@ func (h *ActionHandler) OrgInvitationAction(ctx context.Context, req *OrgInvitat
}

if req.Action == csapitypes.Accept {
orgMember := types.NewOrganizationMember()
orgMember := types.NewOrganizationMember(tx)
orgMember.OrganizationID = orgInvitation.OrganizationID
orgMember.UserID = orgInvitation.UserID
orgMember.MemberRole = orgInvitation.Role
Expand Down
2 changes: 1 addition & 1 deletion internal/services/configstore/action/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (h *ActionHandler) CreateProject(ctx context.Context, req *CreateUpdateProj
}
}

project = types.NewProject()
project = types.NewProject(tx)
project.Name = req.Name
project.Parent = req.Parent
project.Visibility = req.Visibility
Expand Down
2 changes: 1 addition & 1 deletion internal/services/configstore/action/projectgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (h *ActionHandler) CreateProjectGroup(ctx context.Context, req *CreateUpdat
return util.NewAPIError(util.ErrBadRequest, errors.Errorf("project group with name %q, path %q already exists", req.Name, pp))
}

projectGroup = types.NewProjectGroup()
projectGroup = types.NewProjectGroup(tx)
projectGroup.Name = req.Name
projectGroup.Parent = req.Parent
projectGroup.Visibility = req.Visibility
Expand Down
2 changes: 1 addition & 1 deletion internal/services/configstore/action/remotesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (h *ActionHandler) CreateRemoteSource(ctx context.Context, req *CreateUpdat
return util.NewAPIError(util.ErrBadRequest, errors.Errorf("remotesource %q already exists", req.Name))
}

remoteSource = types.NewRemoteSource()
remoteSource = types.NewRemoteSource(tx)
remoteSource.Name = req.Name
remoteSource.APIURL = req.APIURL
remoteSource.SkipVerify = req.SkipVerify
Expand Down
2 changes: 1 addition & 1 deletion internal/services/configstore/action/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (h *ActionHandler) CreateSecret(ctx context.Context, req *CreateUpdateSecre
return util.NewAPIError(util.ErrBadRequest, errors.Errorf("secret with name %q for %s with id %q already exists", req.Name, req.Parent.Kind, req.Parent.ID))
}

secret = types.NewSecret()
secret = types.NewSecret(tx)
secret.Name = req.Name
secret.Parent = req.Parent
secret.Type = req.Type
Expand Down
11 changes: 5 additions & 6 deletions internal/services/configstore/action/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,12 @@ func (h *ActionHandler) CreateUser(ctx context.Context, req *CreateUserRequest)
}
}

user = types.NewUser()
user = types.NewUser(tx)
user.Name = req.UserName
user.Secret = util.EncodeSha1Hex(uuid.Must(uuid.NewV4()).String())

if req.CreateUserLARequest != nil {

la := types.NewLinkedAccount()
la := types.NewLinkedAccount(tx)
la.UserID = user.ID
la.RemoteSourceID = rs.ID
la.RemoteUserID = req.CreateUserLARequest.RemoteUserID
Expand All @@ -94,7 +93,7 @@ func (h *ActionHandler) CreateUser(ctx context.Context, req *CreateUserRequest)
}

// create root user project group
pg := types.NewProjectGroup()
pg := types.NewProjectGroup(tx)
// use public visibility
pg.Visibility = types.VisibilityPublic
pg.Parent = types.Parent{
Expand Down Expand Up @@ -276,7 +275,7 @@ func (h *ActionHandler) CreateUserLA(ctx context.Context, req *CreateUserLAReque
return util.NewAPIError(util.ErrBadRequest, errors.Errorf("linked account for remote user id %q for remote source %q already exists", req.RemoteUserID, req.RemoteSourceName))
}

la = types.NewLinkedAccount()
la = types.NewLinkedAccount(tx)
la.UserID = user.ID
la.RemoteSourceID = rs.ID
la.RemoteUserID = req.RemoteUserID
Expand Down Expand Up @@ -469,7 +468,7 @@ func (h *ActionHandler) CreateUserToken(ctx context.Context, userRef, tokenName
return util.NewAPIError(util.ErrBadRequest, errors.Errorf("token %q for user %q already exists", tokenName, userRef))
}

token = types.NewUserToken()
token = types.NewUserToken(tx)
token.UserID = user.ID
token.Name = tokenName
token.Value = util.EncodeSha1Hex(uuid.Must(uuid.NewV4()).String())
Expand Down
2 changes: 1 addition & 1 deletion internal/services/configstore/action/variable.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (h *ActionHandler) CreateVariable(ctx context.Context, req *CreateUpdateVar
return util.NewAPIError(util.ErrBadRequest, errors.Errorf("variable with name %q for %s with id %q already exists", req.Name, req.Parent.Kind, req.Parent.ID))
}

variable = types.NewVariable()
variable = types.NewVariable(tx)
variable.Name = req.Name
variable.Parent = req.Parent
variable.Values = req.Values
Expand Down
Loading

0 comments on commit 0d4c1ed

Please sign in to comment.