diff --git a/db/migration/202010221010-add-unique-index.go b/db/migration/202010221010-add-unique-index.go new file mode 100644 index 000000000..4938288ab --- /dev/null +++ b/db/migration/202010221010-add-unique-index.go @@ -0,0 +1,12 @@ +package migration + +import migrate "github.com/rubenv/sql-migrate" + +func Get202010221010() *migrate.Migration { + return &migrate.Migration{ + Id: "202010221010-add-unique-index", + Up: []string{` +CREATE UNIQUE INDEX IF NOT EXISTS uidx_workflow_worker_map ON workflow_worker_map (workflow_id, worker_id); +`}, + } +} diff --git a/db/migration/migration.go b/db/migration/migration.go index 2ba774ace..f8a9a35bb 100644 --- a/db/migration/migration.go +++ b/db/migration/migration.go @@ -6,6 +6,7 @@ func GetMigrations() *migrate.MemoryMigrationSource { return &migrate.MemoryMigrationSource{ Migrations: []*migrate.Migration{ Get202009171251(), + Get202010221010(), }, } } diff --git a/db/workflow.go b/db/workflow.go index a3010e82f..b56f39834 100644 --- a/db/workflow.go +++ b/db/workflow.go @@ -33,7 +33,7 @@ var ( // CreateWorkflow creates a new workflow func (d TinkDB) CreateWorkflow(ctx context.Context, wf Workflow, data string, id uuid.UUID) error { - tx, err := d.instance.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) + tx, err := d.instance.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) if err != nil { return errors.Wrap(err, "BEGIN transaction") } @@ -73,15 +73,13 @@ func insertInWorkflow(ctx context.Context, db *sql.DB, wf Workflow, tx *sql.Tx) } func insertIntoWfWorkerTable(ctx context.Context, db *sql.DB, wfID uuid.UUID, workerID uuid.UUID, tx *sql.Tx) error { - // TODO This command is not 100% reliable for concurrent write operations _, err := tx.Exec(` INSERT INTO workflow_worker_map (workflow_id, worker_id) - SELECT $1, $2 - WHERE - NOT EXISTS ( - SELECT workflow_id FROM workflow_worker_map WHERE workflow_id = $1 AND worker_id = $2 - ); + VALUES + ($1, $2) + ON CONFLICT (workflow_id, worker_id) + DO NOTHING; `, wfID, workerID) if err != nil { return errors.Wrap(err, "INSERT in to workflow_worker_map")