Skip to content

Commit

Permalink
ENG-9317 Fix concurrent workflow create bug
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Li <[email protected]>
  • Loading branch information
ryli17 committed Oct 23, 2020
1 parent 9483ac6 commit 8274ad6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
12 changes: 12 additions & 0 deletions db/migration/202010221010-add-unique-index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package migration

import migrate "github.com/rubenv/sql-migrate"

func Get202010221010() *migrate.Migration {
return &migrate.Migration{
Id: "202010221010-add-unique-index",
Up: []string{`
CREATE UNIQUE INDEX IF NOT EXISTS uidx_workflow_worker_map ON workflow_worker_map (workflow_id, worker_id);
`},
}
}
1 change: 1 addition & 0 deletions db/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ func GetMigrations() *migrate.MemoryMigrationSource {
return &migrate.MemoryMigrationSource{
Migrations: []*migrate.Migration{
Get202009171251(),
Get202010221010(),
},
}
}
12 changes: 5 additions & 7 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (

// CreateWorkflow creates a new workflow
func (d TinkDB) CreateWorkflow(ctx context.Context, wf Workflow, data string, id uuid.UUID) error {
tx, err := d.instance.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
tx, err := d.instance.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return errors.Wrap(err, "BEGIN transaction")
}
Expand Down Expand Up @@ -73,15 +73,13 @@ func insertInWorkflow(ctx context.Context, db *sql.DB, wf Workflow, tx *sql.Tx)
}

func insertIntoWfWorkerTable(ctx context.Context, db *sql.DB, wfID uuid.UUID, workerID uuid.UUID, tx *sql.Tx) error {
// TODO This command is not 100% reliable for concurrent write operations
_, err := tx.Exec(`
INSERT INTO
workflow_worker_map (workflow_id, worker_id)
SELECT $1, $2
WHERE
NOT EXISTS (
SELECT workflow_id FROM workflow_worker_map WHERE workflow_id = $1 AND worker_id = $2
);
VALUES
($1, $2)
ON CONFLICT (workflow_id, worker_id)
DO NOTHING;
`, wfID, workerID)
if err != nil {
return errors.Wrap(err, "INSERT in to workflow_worker_map")
Expand Down

0 comments on commit 8274ad6

Please sign in to comment.