Skip to content

Commit

Permalink
KS-205: add workflow name to spec (#13265)
Browse files Browse the repository at this point in the history
* KS-205: add workflow name to spec

* fix test

* fix sql and test

* fix tests

* remove empty wf owner,name check

* fix bad merge of main

* rename migration

---------

Co-authored-by: Bolek <[email protected]>
  • Loading branch information
krehermann and bolekk authored May 23, 2024
1 parent 1437410 commit 5db47b6
Show file tree
Hide file tree
Showing 12 changed files with 288 additions and 7 deletions.
5 changes: 5 additions & 0 deletions .changeset/wild-berries-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#db_update Add name to workflow spec. Add unique constraint to (owner,name) for workflow spec
3 changes: 2 additions & 1 deletion core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ func Test_Service_ProposeJob(t *testing.T) {
// variables for workflow spec
wfID = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef"
wfOwner = "00000000000000000000000000000000000000aa"
wfName = "my-workflow"
specYaml = `
triggers:
- id: "a-trigger"
Expand All @@ -666,7 +667,7 @@ targets:
inputs:
consensus_output: $(a-consensus.outputs)
`
wfSpec = testspecs.GenerateWorkflowSpec(wfID, wfOwner, specYaml).Toml()
wfSpec = testspecs.GenerateWorkflowSpec(wfID, wfOwner, wfName, specYaml).Toml()
proposalIDWF = int64(11)
remoteUUIDWF = uuid.New()
argsWF = &feeds.ProposeJobArgs{
Expand Down
183 changes: 183 additions & 0 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/pelletier/go-toml/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable"

Expand Down Expand Up @@ -1803,6 +1805,187 @@ func Test_CountPipelineRunsByJobID(t *testing.T) {
})
}

func Test_ORM_FindJobByWorkflow(t *testing.T) {
type fields struct {
ds sqlutil.DataSource
}
type args struct {
spec job.WorkflowSpec
before func(t *testing.T, o job.ORM, s job.WorkflowSpec) int32
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
name: "wf not job found",
fields: fields{
ds: pgtest.NewSqlxDB(t),
},
args: args{
// before is nil, so no job is inserted
spec: job.WorkflowSpec{
ID: 1,
WorkflowID: "workflow 1",
Workflow: "abcd",
WorkflowOwner: "me",
WorkflowName: "myworkflow",
},
},
wantErr: true,
},
{
name: "wf job found",
fields: fields{
ds: pgtest.NewSqlxDB(t),
},
args: args{
spec: job.WorkflowSpec{
ID: 1,
WorkflowID: "workflow 2",
Workflow: "anything",
WorkflowOwner: "me",
WorkflowName: "myworkflow",
},
before: mustInsertWFJob,
},
wantErr: false,
},

{
name: "wf wrong name",
fields: fields{
ds: pgtest.NewSqlxDB(t),
},
args: args{
spec: job.WorkflowSpec{
ID: 1,
WorkflowID: "workflow 3",
Workflow: "anything",
WorkflowOwner: "me",
WorkflowName: "wf3",
},
before: func(t *testing.T, o job.ORM, s job.WorkflowSpec) int32 {
s.WorkflowName = "notmyworkflow"
return mustInsertWFJob(t, o, s)
},
},
wantErr: true,
},
{
name: "wf wrong owner",
fields: fields{
ds: pgtest.NewSqlxDB(t),
},
args: args{
spec: job.WorkflowSpec{
ID: 1,
WorkflowID: "workflow 4",
Workflow: "anything",
WorkflowOwner: "me",
WorkflowName: "wf4",
},
before: func(t *testing.T, o job.ORM, s job.WorkflowSpec) int32 {
s.WorkflowOwner = "not me"
return mustInsertWFJob(t, o, s)
},
},
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ks := cltest.NewKeyStore(t, tt.fields.ds)
pipelineORM := pipeline.NewORM(tt.fields.ds, logger.TestLogger(t), configtest.NewTestGeneralConfig(t).JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(tt.fields.ds)
o := NewTestORM(t, tt.fields.ds, pipelineORM, bridgesORM, ks)
var wantJobID int32
if tt.args.before != nil {
wantJobID = tt.args.before(t, o, tt.args.spec)
}
ctx := testutils.Context(t)
gotJ, err := o.FindJobIDByWorkflow(ctx, tt.args.spec)
if (err != nil) != tt.wantErr {
t.Errorf("orm.FindJobByWorkflow() error = %v, wantErr %v", err, tt.wantErr)
return
}
if err == nil {
assert.Equal(t, wantJobID, gotJ, "mismatch job id")
}
})
}

t.Run("multiple jobs", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
o := NewTestORM(t,
db,
pipeline.NewORM(db,
logger.TestLogger(t),
configtest.NewTestGeneralConfig(t).JobPipeline().MaxSuccessfulRuns()),
bridges.NewORM(db),
cltest.NewKeyStore(t, db))
ctx := testutils.Context(t)
s1 := job.WorkflowSpec{
WorkflowID: "workflowid",
Workflow: "anything",
WorkflowOwner: "me",
WorkflowName: "a_common_name",
}
wantJobID1 := mustInsertWFJob(t, o, s1)

s2 := job.WorkflowSpec{
WorkflowID: "another workflowid",
Workflow: "anything",
WorkflowOwner: "me",
WorkflowName: "another workflow name",
}
wantJobID2 := mustInsertWFJob(t, o, s2)

s3 := job.WorkflowSpec{
WorkflowID: "xworkflowid",
Workflow: "anything",
WorkflowOwner: "someone else",
WorkflowName: "a_common_name",
}
wantJobID3 := mustInsertWFJob(t, o, s3)

expectedIDs := []int32{wantJobID1, wantJobID2, wantJobID3}
for i, s := range []job.WorkflowSpec{s1, s2, s3} {
gotJ, err := o.FindJobIDByWorkflow(ctx, s)
require.NoError(t, err)
assert.Equal(t, expectedIDs[i], gotJ, "mismatch job id case %d, spec %v", i, s)
j, err := o.FindJob(ctx, expectedIDs[i])
require.NoError(t, err)
assert.NotNil(t, j)
t.Logf("found job %v", j)
assert.EqualValues(t, j.WorkflowSpec.Workflow, s.Workflow)
assert.EqualValues(t, j.WorkflowSpec.WorkflowID, s.WorkflowID)
assert.EqualValues(t, j.WorkflowSpec.WorkflowOwner, s.WorkflowOwner)
assert.EqualValues(t, j.WorkflowSpec.WorkflowName, s.WorkflowName)
}
})
}

func mustInsertWFJob(t *testing.T, orm job.ORM, s job.WorkflowSpec) int32 {
t.Helper()
ctx := testutils.Context(t)
_, err := toml.Marshal(s.Workflow)
require.NoError(t, err)
j := job.Job{
Type: job.Workflow,
WorkflowSpec: &s,
ExternalJobID: uuid.New(),
Name: null.StringFrom(s.WorkflowOwner + "_" + s.WorkflowName),
SchemaVersion: 1,
}
err = orm.CreateJob(ctx, &j)
require.NoError(t, err)
return j.ID
}

func mustInsertPipelineRun(t *testing.T, orm pipeline.ORM, j job.Job) pipeline.Run {
t.Helper()
ctx := testutils.Context(t)
Expand Down
28 changes: 28 additions & 0 deletions core/services/job/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ type WorkflowSpec struct {
WorkflowID string `toml:"workflowId"`
Workflow string `toml:"workflow"`
WorkflowOwner string `toml:"workflowOwner"`
WorkflowName string `toml:"workflowName"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
}
Expand All @@ -863,5 +864,9 @@ func (w *WorkflowSpec) Validate() error {
return fmt.Errorf("incorrect length for owner %s: expected %d, got %d", w.WorkflowOwner, workflowOwnerLen, len(w.WorkflowOwner))
}

if w.WorkflowName == "" {
return fmt.Errorf("workflow name is required")
}

return nil
}
23 changes: 21 additions & 2 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type ORM interface {

DataSource() sqlutil.DataSource
WithDataSource(source sqlutil.DataSource) ORM

FindJobIDByWorkflow(ctx context.Context, spec WorkflowSpec) (int32, error)
}

type ORMConfig interface {
Expand Down Expand Up @@ -395,8 +397,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
case Stream:
// 'stream' type has no associated spec, nothing to do here
case Workflow:
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, created_at, updated_at)
VALUES (:workflow, :workflow_id, :workflow_owner, NOW(), NOW())
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW())
RETURNING id;`
specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec)
if err != nil {
Expand Down Expand Up @@ -1043,6 +1045,23 @@ func (o *orm) FindJobIDsWithBridge(ctx context.Context, name string) (jids []int
return
}

func (o *orm) FindJobIDByWorkflow(ctx context.Context, spec WorkflowSpec) (jobID int32, err error) {
stmt := `
SELECT jobs.id FROM jobs
INNER JOIN workflow_specs ws on jobs.workflow_spec_id = ws.id AND ws.workflow_owner = $1 AND ws.workflow_name = $2
`
err = o.ds.GetContext(ctx, &jobID, stmt, spec.WorkflowOwner, spec.WorkflowName)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
err = fmt.Errorf("error searching for job by workflow (owner,name) ('%s','%s'): %w", spec.WorkflowOwner, spec.WorkflowName, err)
}
err = fmt.Errorf("FindJobIDByWorkflow failed: %w", err)
return
}

return
}

// PipelineRunsByJobsIDs returns pipeline runs for multiple jobs, not preloading data
func (o *orm) PipelineRunsByJobsIDs(ctx context.Context, ids []int32) (runs []pipeline.Run, err error) {
err = o.transact(ctx, false, func(tx *orm) error {
Expand Down
11 changes: 11 additions & 0 deletions core/services/workflows/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type = "workflow"
schemaVersion = 1
workflowId = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef"
workflowOwner = "00000000000000000000000000000000000000aa"
workflowName = "test"
`,
true,
},
Expand All @@ -38,6 +39,16 @@ invalid syntax{{{{
`
type = "work flows"
schemaVersion = 1
`,
false,
},
{
"missing name",
`
type = "workflow"
schemaVersion = 1
workflowId = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef"
workflowOwner = "00000000000000000000000000000000000000aa"
`,
false,
},
Expand Down
22 changes: 22 additions & 0 deletions core/store/migrate/migrations/0238_workflow_spec_name.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE workflow_specs ADD COLUMN workflow_name varchar(255);

-- ensure that we can forward migrate to non-null name
UPDATE workflow_specs
SET
workflow_name = workflow_id
WHERE
workflow_name IS NULL;

ALTER TABLE workflow_specs ALTER COLUMN workflow_name SET NOT NULL;

-- unique constraint on workflow_owner and workflow_name
ALTER TABLE workflow_specs ADD CONSTRAINT unique_workflow_owner_name unique (workflow_owner, workflow_name);
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
ALTER TABLE workflow_specs DROP CONSTRAINT unique_workflow_owner_name;
ALTER TABLE workflow_specs DROP COLUMN workflow_name;
-- +goose StatementEnd
5 changes: 3 additions & 2 deletions core/testdata/testspecs/v2_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,17 +872,18 @@ func (w WorkflowSpec) Toml() string {
return w.toml
}

func GenerateWorkflowSpec(id, owner, spec string) WorkflowSpec {
func GenerateWorkflowSpec(id, owner, name, spec string) WorkflowSpec {
template := `
type = "workflow"
schemaVersion = 1
name = "test-spec"
workflowId = "%s"
workflowOwner = "%s"
workflowName = "%s"
workflow = """
%s
"""
`
toml := fmt.Sprintf(template, id, owner, spec)
toml := fmt.Sprintf(template, id, owner, name, spec)
return WorkflowSpec{toml: toml}
}
Loading

0 comments on commit 5db47b6

Please sign in to comment.