Skip to content

Commit

Permalink
Add run_id field to tasks table in cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
fimanishi committed Jul 18, 2024
1 parent 8dde72e commit a36f51c
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 8 deletions.
2 changes: 2 additions & 0 deletions common/persistence/nosql/nosqlplugin/cassandra/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (db *cdb) InsertTasks(
taskListType,
rowTypeTask,
task.TaskID,
task.RunID,
domainID,
task.WorkflowID,
task.RunID,
Expand All @@ -269,6 +270,7 @@ func (db *cdb) InsertTasks(
taskListType,
rowTypeTask,
task.TaskID,
task.RunID,
domainID,
task.WorkflowID,
task.RunID,
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/nosql/nosqlplugin/cassandra/tasks_cql.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ const (
`}`

templateCreateTaskQuery = `INSERT INTO tasks (` +
`domain_id, task_list_name, task_list_type, type, task_id, task) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateTaskType + `)`
`domain_id, task_list_name, task_list_type, type, task_id, run_id, task) ` +
`VALUES(?, ?, ?, ?, ?, ?, ` + templateTaskType + `)`

templateCreateTaskWithTTLQuery = `INSERT INTO tasks (` +
`domain_id, task_list_name, task_list_type, type, task_id, task) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateTaskType + `) USING TTL ?`
`domain_id, task_list_name, task_list_type, type, task_id, run_id, task) ` +
`VALUES(?, ?, ?, ?, ?, ?, ` + templateTaskType + `) USING TTL ?`

templateGetTasksQuery = `SELECT task_id, task ` +
`FROM tasks ` +
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,8 @@ func TestInsertTasks(t *testing.T) {
},
mapExecuteBatchCASApplied: true,
wantQueries: []string{
`INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, task) VALUES(domain1, tasklist1, 1, 0, 3, {domain_id: domain1, workflow_id: wid1, run_id: rid1, schedule_id: 42,created_time: 2024-04-01T22:08:41Z, partition_config: map[] })`,
`INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, task) VALUES(domain1, tasklist1, 1, 0, 4, {domain_id: domain1, workflow_id: wid1, run_id: rid1, schedule_id: 43,created_time: 2024-04-01T22:08:42Z, partition_config: map[] }) USING TTL 157680000`,
`INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, run_id, task) VALUES(domain1, tasklist1, 1, 0, 3, rid1, {domain_id: domain1, workflow_id: wid1, run_id: rid1, schedule_id: 42,created_time: 2024-04-01T22:08:41Z, partition_config: map[] })`,
`INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, run_id, task) VALUES(domain1, tasklist1, 1, 0, 4, rid1, {domain_id: domain1, workflow_id: wid1, run_id: rid1, schedule_id: 43,created_time: 2024-04-01T22:08:42Z, partition_config: map[] }) USING TTL 157680000`,
`UPDATE tasks SET range_id = 25 WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345 IF range_id = 25`,
},
},
Expand Down
1 change: 1 addition & 0 deletions schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ CREATE TABLE tasks (
task_list_type int, -- enum TaskListType {ActivityTask, DecisionTask}
type int, -- enum rowType {Task, TaskList}
task_id bigint, -- unique identifier for tasks, monotonically increasing
run_id uuid,
range_id bigint, -- Used to ensure that only one process can write to the table
task frozen<task>,
task_list frozen<task_list>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE tasks ADD run_id uuid;
8 changes: 8 additions & 0 deletions schema/cassandra/cadence/versioned/v0.38/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": "0.38",
"MinCompatibleVersion": "0.38",
"Description": "Adding run_id field to the tasks table",
"SchemaUpdateCqlFiles": [
"add_run_id_to_tasks.cql"
]
}
2 changes: 1 addition & 1 deletion schema/cassandra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package cassandra
// NOTE: whenever there is a new data base schema update, plz update the following versions

// Version is the Cassandra database release version
const Version = "0.37"
const Version = "0.38"

// VisibilityVersion is the Cassandra visibility database release version
const VisibilityVersion = "0.9"
2 changes: 1 addition & 1 deletion tools/common/schema/updatetask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *UpdateTaskTestSuite) TestReadSchemaDirFromEmbeddings() {
s.NoError(err)
ans, err := readSchemaDir(fsys, "0.30", "")
s.NoError(err)
s.Equal([]string{"v0.31", "v0.32", "v0.33", "v0.34", "v0.35", "v0.36", "v0.37"}, ans)
s.Equal([]string{"v0.31", "v0.32", "v0.33", "v0.34", "v0.35", "v0.36", "v0.37", "v0.38"}, ans)

fsys, err = fs.Sub(cassandra.SchemaFS, "visibility/versioned")
s.NoError(err)
Expand Down

0 comments on commit a36f51c

Please sign in to comment.