diff --git a/common/persistence/nosql/nosqlplugin/cassandra/tasks.go b/common/persistence/nosql/nosqlplugin/cassandra/tasks.go index 2c81a31af84..9f5e4401ad0 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/tasks.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/tasks.go @@ -253,6 +253,7 @@ func (db *cdb) InsertTasks( taskListType, rowTypeTask, task.TaskID, + task.RunID, domainID, task.WorkflowID, task.RunID, @@ -269,6 +270,7 @@ func (db *cdb) InsertTasks( taskListType, rowTypeTask, task.TaskID, + task.RunID, domainID, task.WorkflowID, task.RunID, diff --git a/common/persistence/nosql/nosqlplugin/cassandra/tasks_cql.go b/common/persistence/nosql/nosqlplugin/cassandra/tasks_cql.go index a91654b5834..6eb52d459b7 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/tasks_cql.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/tasks_cql.go @@ -41,12 +41,12 @@ const ( `}` templateCreateTaskQuery = `INSERT INTO tasks (` + - `domain_id, task_list_name, task_list_type, type, task_id, task) ` + - `VALUES(?, ?, ?, ?, ?, ` + templateTaskType + `)` + `domain_id, task_list_name, task_list_type, type, task_id, run_id, task) ` + + `VALUES(?, ?, ?, ?, ?, ?, ` + templateTaskType + `)` templateCreateTaskWithTTLQuery = `INSERT INTO tasks (` + - `domain_id, task_list_name, task_list_type, type, task_id, task) ` + - `VALUES(?, ?, ?, ?, ?, ` + templateTaskType + `) USING TTL ?` + `domain_id, task_list_name, task_list_type, type, task_id, run_id, task) ` + + `VALUES(?, ?, ?, ?, ?, ?, ` + templateTaskType + `) USING TTL ?` templateGetTasksQuery = `SELECT task_id, task ` + `FROM tasks ` + diff --git a/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go b/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go index 5a824afcf2f..486996d77ca 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go @@ -671,8 +671,8 @@ func TestInsertTasks(t *testing.T) { }, mapExecuteBatchCASApplied: true, wantQueries: []string{ - `INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, task) VALUES(domain1, tasklist1, 1, 0, 3, {domain_id: domain1, workflow_id: wid1, run_id: rid1, schedule_id: 42,created_time: 2024-04-01T22:08:41Z, partition_config: map[] })`, - `INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, task) VALUES(domain1, tasklist1, 1, 0, 4, {domain_id: domain1, workflow_id: wid1, run_id: rid1, schedule_id: 43,created_time: 2024-04-01T22:08:42Z, partition_config: map[] }) USING TTL 157680000`, + `INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, run_id, task) VALUES(domain1, tasklist1, 1, 0, 3, rid1, {domain_id: domain1, workflow_id: wid1, run_id: rid1, schedule_id: 42,created_time: 2024-04-01T22:08:41Z, partition_config: map[] })`, + `INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, run_id, task) VALUES(domain1, tasklist1, 1, 0, 4, rid1, {domain_id: domain1, workflow_id: wid1, run_id: rid1, schedule_id: 43,created_time: 2024-04-01T22:08:42Z, partition_config: map[] }) USING TTL 157680000`, `UPDATE tasks SET range_id = 25 WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345 IF range_id = 25`, }, }, diff --git a/schema/cassandra/cadence/schema.cql b/schema/cassandra/cadence/schema.cql index 768e71d153f..ad386037af7 100644 --- a/schema/cassandra/cadence/schema.cql +++ b/schema/cassandra/cadence/schema.cql @@ -400,6 +400,7 @@ CREATE TABLE tasks ( task_list_type int, -- enum TaskListType {ActivityTask, DecisionTask} type int, -- enum rowType {Task, TaskList} task_id bigint, -- unique identifier for tasks, monotonically increasing + run_id uuid, range_id bigint, -- Used to ensure that only one process can write to the table task frozen, task_list frozen, diff --git a/schema/cassandra/cadence/versioned/v0.38/add_run_id_to_tasks.cql b/schema/cassandra/cadence/versioned/v0.38/add_run_id_to_tasks.cql new file mode 100644 index 00000000000..12c90c83a79 --- /dev/null +++ b/schema/cassandra/cadence/versioned/v0.38/add_run_id_to_tasks.cql @@ -0,0 +1 @@ +ALTER TABLE tasks ADD run_id uuid; \ No newline at end of file diff --git a/schema/cassandra/cadence/versioned/v0.38/manifest.json b/schema/cassandra/cadence/versioned/v0.38/manifest.json new file mode 100644 index 00000000000..67c461e4050 --- /dev/null +++ b/schema/cassandra/cadence/versioned/v0.38/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "0.38", + "MinCompatibleVersion": "0.38", + "Description": "Adding run_id field to the tasks table", + "SchemaUpdateCqlFiles": [ + "add_run_id_to_tasks.cql" + ] +} diff --git a/schema/cassandra/version.go b/schema/cassandra/version.go index 7fd89de7826..1c6bbc66b04 100644 --- a/schema/cassandra/version.go +++ b/schema/cassandra/version.go @@ -23,7 +23,7 @@ package cassandra // NOTE: whenever there is a new data base schema update, plz update the following versions // Version is the Cassandra database release version -const Version = "0.37" +const Version = "0.38" // VisibilityVersion is the Cassandra visibility database release version const VisibilityVersion = "0.9" diff --git a/tools/common/schema/updatetask_test.go b/tools/common/schema/updatetask_test.go index 602fc249093..dba2c61e7a0 100644 --- a/tools/common/schema/updatetask_test.go +++ b/tools/common/schema/updatetask_test.go @@ -108,7 +108,7 @@ func (s *UpdateTaskTestSuite) TestReadSchemaDirFromEmbeddings() { s.NoError(err) ans, err := readSchemaDir(fsys, "0.30", "") s.NoError(err) - s.Equal([]string{"v0.31", "v0.32", "v0.33", "v0.34", "v0.35", "v0.36", "v0.37"}, ans) + s.Equal([]string{"v0.31", "v0.32", "v0.33", "v0.34", "v0.35", "v0.36", "v0.37", "v0.38"}, ans) fsys, err = fs.Sub(cassandra.SchemaFS, "visibility/versioned") s.NoError(err)