diff --git a/.changeset/wild-berries-cry.md b/.changeset/wild-berries-cry.md new file mode 100644 index 00000000000..196de1a124e --- /dev/null +++ b/.changeset/wild-berries-cry.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#db_update Add name to workflow spec. Add unique constraint to (owner,name) for workflow spec diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 43d75f712a0..b8cd590a402 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -643,6 +643,7 @@ func Test_Service_ProposeJob(t *testing.T) { // variables for workflow spec wfID = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef" wfOwner = "00000000000000000000000000000000000000aa" + wfName = "my-workflow" specYaml = ` triggers: - id: "a-trigger" @@ -666,7 +667,7 @@ targets: inputs: consensus_output: $(a-consensus.outputs) ` - wfSpec = testspecs.GenerateWorkflowSpec(wfID, wfOwner, specYaml).Toml() + wfSpec = testspecs.GenerateWorkflowSpec(wfID, wfOwner, wfName, specYaml).Toml() proposalIDWF = int64(11) remoteUUIDWF = uuid.New() argsWF = &feeds.ProposeJobArgs{ diff --git a/core/services/job/job_orm_test.go b/core/services/job/job_orm_test.go index 3c7d5a7afa5..c13cf9da4b1 100644 --- a/core/services/job/job_orm_test.go +++ b/core/services/job/job_orm_test.go @@ -10,10 +10,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/lib/pq" + "github.com/pelletier/go-toml/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v4" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable" @@ -1803,6 +1805,187 @@ func Test_CountPipelineRunsByJobID(t *testing.T) { }) } +func Test_ORM_FindJobByWorkflow(t *testing.T) { + type fields struct { + ds sqlutil.DataSource + } + type args struct { + spec job.WorkflowSpec + before func(t *testing.T, o job.ORM, s job.WorkflowSpec) int32 + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "wf not job found", + fields: fields{ + ds: pgtest.NewSqlxDB(t), + }, + args: args{ + // before is nil, so no job is inserted + spec: job.WorkflowSpec{ + ID: 1, + WorkflowID: "workflow 1", + Workflow: "abcd", + WorkflowOwner: "me", + WorkflowName: "myworkflow", + }, + }, + wantErr: true, + }, + { + name: "wf job found", + fields: fields{ + ds: pgtest.NewSqlxDB(t), + }, + args: args{ + spec: job.WorkflowSpec{ + ID: 1, + WorkflowID: "workflow 2", + Workflow: "anything", + WorkflowOwner: "me", + WorkflowName: "myworkflow", + }, + before: mustInsertWFJob, + }, + wantErr: false, + }, + + { + name: "wf wrong name", + fields: fields{ + ds: pgtest.NewSqlxDB(t), + }, + args: args{ + spec: job.WorkflowSpec{ + ID: 1, + WorkflowID: "workflow 3", + Workflow: "anything", + WorkflowOwner: "me", + WorkflowName: "wf3", + }, + before: func(t *testing.T, o job.ORM, s job.WorkflowSpec) int32 { + s.WorkflowName = "notmyworkflow" + return mustInsertWFJob(t, o, s) + }, + }, + wantErr: true, + }, + { + name: "wf wrong owner", + fields: fields{ + ds: pgtest.NewSqlxDB(t), + }, + args: args{ + spec: job.WorkflowSpec{ + ID: 1, + WorkflowID: "workflow 4", + Workflow: "anything", + WorkflowOwner: "me", + WorkflowName: "wf4", + }, + before: func(t *testing.T, o job.ORM, s job.WorkflowSpec) int32 { + s.WorkflowOwner = "not me" + return mustInsertWFJob(t, o, s) + }, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ks := cltest.NewKeyStore(t, tt.fields.ds) + pipelineORM := pipeline.NewORM(tt.fields.ds, logger.TestLogger(t), configtest.NewTestGeneralConfig(t).JobPipeline().MaxSuccessfulRuns()) + bridgesORM := bridges.NewORM(tt.fields.ds) + o := NewTestORM(t, tt.fields.ds, pipelineORM, bridgesORM, ks) + var wantJobID int32 + if tt.args.before != nil { + wantJobID = tt.args.before(t, o, tt.args.spec) + } + ctx := testutils.Context(t) + gotJ, err := o.FindJobIDByWorkflow(ctx, tt.args.spec) + if (err != nil) != tt.wantErr { + t.Errorf("orm.FindJobByWorkflow() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err == nil { + assert.Equal(t, wantJobID, gotJ, "mismatch job id") + } + }) + } + + t.Run("multiple jobs", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + o := NewTestORM(t, + db, + pipeline.NewORM(db, + logger.TestLogger(t), + configtest.NewTestGeneralConfig(t).JobPipeline().MaxSuccessfulRuns()), + bridges.NewORM(db), + cltest.NewKeyStore(t, db)) + ctx := testutils.Context(t) + s1 := job.WorkflowSpec{ + WorkflowID: "workflowid", + Workflow: "anything", + WorkflowOwner: "me", + WorkflowName: "a_common_name", + } + wantJobID1 := mustInsertWFJob(t, o, s1) + + s2 := job.WorkflowSpec{ + WorkflowID: "another workflowid", + Workflow: "anything", + WorkflowOwner: "me", + WorkflowName: "another workflow name", + } + wantJobID2 := mustInsertWFJob(t, o, s2) + + s3 := job.WorkflowSpec{ + WorkflowID: "xworkflowid", + Workflow: "anything", + WorkflowOwner: "someone else", + WorkflowName: "a_common_name", + } + wantJobID3 := mustInsertWFJob(t, o, s3) + + expectedIDs := []int32{wantJobID1, wantJobID2, wantJobID3} + for i, s := range []job.WorkflowSpec{s1, s2, s3} { + gotJ, err := o.FindJobIDByWorkflow(ctx, s) + require.NoError(t, err) + assert.Equal(t, expectedIDs[i], gotJ, "mismatch job id case %d, spec %v", i, s) + j, err := o.FindJob(ctx, expectedIDs[i]) + require.NoError(t, err) + assert.NotNil(t, j) + t.Logf("found job %v", j) + assert.EqualValues(t, j.WorkflowSpec.Workflow, s.Workflow) + assert.EqualValues(t, j.WorkflowSpec.WorkflowID, s.WorkflowID) + assert.EqualValues(t, j.WorkflowSpec.WorkflowOwner, s.WorkflowOwner) + assert.EqualValues(t, j.WorkflowSpec.WorkflowName, s.WorkflowName) + } + }) +} + +func mustInsertWFJob(t *testing.T, orm job.ORM, s job.WorkflowSpec) int32 { + t.Helper() + ctx := testutils.Context(t) + _, err := toml.Marshal(s.Workflow) + require.NoError(t, err) + j := job.Job{ + Type: job.Workflow, + WorkflowSpec: &s, + ExternalJobID: uuid.New(), + Name: null.StringFrom(s.WorkflowOwner + "_" + s.WorkflowName), + SchemaVersion: 1, + } + err = orm.CreateJob(ctx, &j) + require.NoError(t, err) + return j.ID +} + func mustInsertPipelineRun(t *testing.T, orm pipeline.ORM, j job.Job) pipeline.Run { t.Helper() ctx := testutils.Context(t) diff --git a/core/services/job/mocks/orm.go b/core/services/job/mocks/orm.go index ec60137de93..e8911b25af3 100644 --- a/core/services/job/mocks/orm.go +++ b/core/services/job/mocks/orm.go @@ -248,6 +248,34 @@ func (_m *ORM) FindJobIDByAddress(ctx context.Context, address types.EIP55Addres return r0, r1 } +// FindJobIDByWorkflow provides a mock function with given fields: ctx, spec +func (_m *ORM) FindJobIDByWorkflow(ctx context.Context, spec job.WorkflowSpec) (int32, error) { + ret := _m.Called(ctx, spec) + + if len(ret) == 0 { + panic("no return value specified for FindJobIDByWorkflow") + } + + var r0 int32 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, job.WorkflowSpec) (int32, error)); ok { + return rf(ctx, spec) + } + if rf, ok := ret.Get(0).(func(context.Context, job.WorkflowSpec) int32); ok { + r0 = rf(ctx, spec) + } else { + r0 = ret.Get(0).(int32) + } + + if rf, ok := ret.Get(1).(func(context.Context, job.WorkflowSpec) error); ok { + r1 = rf(ctx, spec) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindJobIDsWithBridge provides a mock function with given fields: ctx, name func (_m *ORM) FindJobIDsWithBridge(ctx context.Context, name string) ([]int32, error) { ret := _m.Called(ctx, name) diff --git a/core/services/job/models.go b/core/services/job/models.go index 578e9e079b8..9601df2e02d 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -845,6 +845,7 @@ type WorkflowSpec struct { WorkflowID string `toml:"workflowId"` Workflow string `toml:"workflow"` WorkflowOwner string `toml:"workflowOwner"` + WorkflowName string `toml:"workflowName"` CreatedAt time.Time `toml:"-"` UpdatedAt time.Time `toml:"-"` } @@ -863,5 +864,9 @@ func (w *WorkflowSpec) Validate() error { return fmt.Errorf("incorrect length for owner %s: expected %d, got %d", w.WorkflowOwner, workflowOwnerLen, len(w.WorkflowOwner)) } + if w.WorkflowName == "" { + return fmt.Errorf("workflow name is required") + } + return nil } diff --git a/core/services/job/orm.go b/core/services/job/orm.go index d54d6fba522..71a4ebebb1e 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -78,6 +78,8 @@ type ORM interface { DataSource() sqlutil.DataSource WithDataSource(source sqlutil.DataSource) ORM + + FindJobIDByWorkflow(ctx context.Context, spec WorkflowSpec) (int32, error) } type ORMConfig interface { @@ -395,8 +397,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error { case Stream: // 'stream' type has no associated spec, nothing to do here case Workflow: - sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, created_at, updated_at) - VALUES (:workflow, :workflow_id, :workflow_owner, NOW(), NOW()) + sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at) + VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW()) RETURNING id;` specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec) if err != nil { @@ -1043,6 +1045,23 @@ func (o *orm) FindJobIDsWithBridge(ctx context.Context, name string) (jids []int return } +func (o *orm) FindJobIDByWorkflow(ctx context.Context, spec WorkflowSpec) (jobID int32, err error) { + stmt := ` +SELECT jobs.id FROM jobs +INNER JOIN workflow_specs ws on jobs.workflow_spec_id = ws.id AND ws.workflow_owner = $1 AND ws.workflow_name = $2 +` + err = o.ds.GetContext(ctx, &jobID, stmt, spec.WorkflowOwner, spec.WorkflowName) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + err = fmt.Errorf("error searching for job by workflow (owner,name) ('%s','%s'): %w", spec.WorkflowOwner, spec.WorkflowName, err) + } + err = fmt.Errorf("FindJobIDByWorkflow failed: %w", err) + return + } + + return +} + // PipelineRunsByJobsIDs returns pipeline runs for multiple jobs, not preloading data func (o *orm) PipelineRunsByJobsIDs(ctx context.Context, ids []int32) (runs []pipeline.Run, err error) { err = o.transact(ctx, false, func(tx *orm) error { diff --git a/core/services/workflows/delegate_test.go b/core/services/workflows/delegate_test.go index 68abfa2f7a1..d87e6d68466 100644 --- a/core/services/workflows/delegate_test.go +++ b/core/services/workflows/delegate_test.go @@ -23,6 +23,7 @@ type = "workflow" schemaVersion = 1 workflowId = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef" workflowOwner = "00000000000000000000000000000000000000aa" +workflowName = "test" `, true, }, @@ -38,6 +39,16 @@ invalid syntax{{{{ ` type = "work flows" schemaVersion = 1 +`, + false, + }, + { + "missing name", + ` +type = "workflow" +schemaVersion = 1 +workflowId = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef" +workflowOwner = "00000000000000000000000000000000000000aa" `, false, }, diff --git a/core/store/migrate/migrations/0238_workflow_spec_name.sql b/core/store/migrate/migrations/0238_workflow_spec_name.sql new file mode 100644 index 00000000000..8b9986b4da9 --- /dev/null +++ b/core/store/migrate/migrations/0238_workflow_spec_name.sql @@ -0,0 +1,22 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE workflow_specs ADD COLUMN workflow_name varchar(255); + +-- ensure that we can forward migrate to non-null name +UPDATE workflow_specs +SET + workflow_name = workflow_id +WHERE + workflow_name IS NULL; + +ALTER TABLE workflow_specs ALTER COLUMN workflow_name SET NOT NULL; + +-- unique constraint on workflow_owner and workflow_name +ALTER TABLE workflow_specs ADD CONSTRAINT unique_workflow_owner_name unique (workflow_owner, workflow_name); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE workflow_specs DROP CONSTRAINT unique_workflow_owner_name; +ALTER TABLE workflow_specs DROP COLUMN workflow_name; +-- +goose StatementEnd \ No newline at end of file diff --git a/core/testdata/testspecs/v2_specs.go b/core/testdata/testspecs/v2_specs.go index a0d8ea863e2..fb0e019d931 100644 --- a/core/testdata/testspecs/v2_specs.go +++ b/core/testdata/testspecs/v2_specs.go @@ -872,17 +872,18 @@ func (w WorkflowSpec) Toml() string { return w.toml } -func GenerateWorkflowSpec(id, owner, spec string) WorkflowSpec { +func GenerateWorkflowSpec(id, owner, name, spec string) WorkflowSpec { template := ` type = "workflow" schemaVersion = 1 name = "test-spec" workflowId = "%s" workflowOwner = "%s" +workflowName = "%s" workflow = """ %s """ ` - toml := fmt.Sprintf(template, id, owner, spec) + toml := fmt.Sprintf(template, id, owner, name, spec) return WorkflowSpec{toml: toml} } diff --git a/core/web/jobs_controller_test.go b/core/web/jobs_controller_test.go index 8aaae0d5ba3..359f9ba8b1c 100644 --- a/core/web/jobs_controller_test.go +++ b/core/web/jobs_controller_test.go @@ -394,6 +394,7 @@ func TestJobController_Create_HappyPath(t *testing.T) { tomlTemplate: func(_ string) string { id := "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef" owner := "00000000000000000000000000000000000000aa" + name := "my-test-workflow" workflow := ` triggers: - id: "mercury-trigger" @@ -441,14 +442,14 @@ targets: params: ["$(report)"] abi: "receive(report bytes)" ` - return testspecs.GenerateWorkflowSpec(id, owner, workflow).Toml() + return testspecs.GenerateWorkflowSpec(id, owner, name, workflow).Toml() }, assertion: func(t *testing.T, nameAndExternalJobID string, r *http.Response) { require.Equal(t, http.StatusOK, r.StatusCode) resp := cltest.ParseResponseBody(t, r) resource := presenters.JobResource{} err := web.ParseJSONAPIResponse(resp, &resource) - require.NoError(t, err) + require.NoError(t, err, "failed to parse response body: %s", resp) jb, err := jorm.FindJob(testutils.Context(t), mustInt32FromString(t, resource.ID)) require.NoError(t, err) @@ -457,6 +458,7 @@ targets: assert.Equal(t, jb.WorkflowSpec.Workflow, resource.WorkflowSpec.Workflow) assert.Equal(t, jb.WorkflowSpec.WorkflowID, resource.WorkflowSpec.WorkflowID) assert.Equal(t, jb.WorkflowSpec.WorkflowOwner, resource.WorkflowSpec.WorkflowOwner) + assert.Equal(t, jb.WorkflowSpec.WorkflowName, resource.WorkflowSpec.WorkflowName) }, }, } diff --git a/core/web/presenters/job.go b/core/web/presenters/job.go index 12b958a346d..ff59bc9bd11 100644 --- a/core/web/presenters/job.go +++ b/core/web/presenters/job.go @@ -433,6 +433,7 @@ type WorkflowSpec struct { Workflow string `json:"workflow"` WorkflowID string `json:"workflowId"` WorkflowOwner string `json:"workflowOwner"` + WorkflowName string `json:"workflowName"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` } @@ -442,6 +443,7 @@ func NewWorkflowSpec(spec *job.WorkflowSpec) *WorkflowSpec { Workflow: spec.Workflow, WorkflowID: spec.WorkflowID, WorkflowOwner: spec.WorkflowOwner, + WorkflowName: spec.WorkflowName, CreatedAt: spec.CreatedAt, UpdatedAt: spec.UpdatedAt, } diff --git a/core/web/presenters/job_test.go b/core/web/presenters/job_test.go index 7d3c31465db..ba485d27789 100644 --- a/core/web/presenters/job_test.go +++ b/core/web/presenters/job_test.go @@ -861,6 +861,7 @@ func TestJob(t *testing.T) { WorkflowID: "", Workflow: ``, WorkflowOwner: "", + WorkflowName: "", }, PipelineSpec: &pipeline.Spec{ ID: 1, @@ -896,6 +897,7 @@ func TestJob(t *testing.T) { "workflow": "", "workflowId": "", "workflowOwner": "", + "workflowName": "", "createdAt":"0001-01-01T00:00:00Z", "updatedAt":"0001-01-01T00:00:00Z" },