Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Cassandra queries for tables #6614

Merged
4 changes: 3 additions & 1 deletion common/persistence/nosql/nosqlplugin/cassandra/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ const (
// return types.DomainAlreadyExistsError error if failed or already exists
// Must return ConditionFailure error if other condition doesn't match
func (db *cdb) InsertDomain(ctx context.Context, row *nosqlplugin.DomainRow) error {
query := db.session.Query(templateCreateDomainQuery, row.Info.ID, row.Info.Name).WithContext(ctx)
timeStamp := db.timeSrc.Now()
query := db.session.Query(templateCreateDomainQuery, row.Info.ID, row.Info.Name, timeStamp).WithContext(ctx)
applied, err := query.MapScanCAS(make(map[string]interface{}))
if err != nil {
return err
Expand Down Expand Up @@ -98,6 +99,7 @@ func (db *cdb) InsertDomain(ctx context.Context, row *nosqlplugin.DomainRow) err
failoverEndTime,
row.LastUpdatedTime.UnixNano(),
metadataNotificationVersion,
timeStamp,
)
db.updateMetadataBatch(batch, metadataNotificationVersion)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ const (
`}`

templateCreateDomainQuery = `INSERT INTO domains (` +
`id, domain) ` +
`VALUES(?, {name: ?}) IF NOT EXISTS`
`id, domain, created_time) ` +
`VALUES(?, {name: ?}, ?) IF NOT EXISTS`

templateGetDomainQuery = `SELECT domain.name ` +
`FROM domains ` +
Expand All @@ -65,8 +65,8 @@ const (
`WHERE id = ?`

templateCreateDomainByNameQueryWithinBatchV2 = `INSERT INTO domains_by_name_v2 (` +
`domains_partition, name, domain, config, replication_config, is_global_domain, config_version, failover_version, failover_notification_version, previous_failover_version, failover_end_time, last_updated_time, notification_version) ` +
`VALUES(?, ?, ` + templateDomainInfoType + `, ` + templateDomainConfigType + `, ` + templateDomainReplicationConfigType + `, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS`
`domains_partition, name, domain, config, replication_config, is_global_domain, config_version, failover_version, failover_notification_version, previous_failover_version, failover_end_time, last_updated_time, notification_version, created_time) ` +
`VALUES(?, ?, ` + templateDomainInfoType + `, ` + templateDomainConfigType + `, ` + templateDomainReplicationConfigType + `, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS`

templateGetDomainByNameQueryV2 = `SELECT domain.id, domain.name, domain.status, domain.description, ` +
`domain.owner_email, domain.data, config.retention, config.emit_metric, ` +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.uber.org/mock/gomock"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/testlogger"
Expand Down Expand Up @@ -191,7 +192,7 @@ func TestInsertDomain(t *testing.T) {
}).Times(1)
},
wantSessionQueries: []string{
`INSERT INTO domains (id, domain) VALUES(test-domain-id, {name: test-domain-name}) IF NOT EXISTS`,
`INSERT INTO domains (id, domain, created_time) VALUES(test-domain-id, {name: test-domain-name}, 2025-01-06T15:00:00Z) IF NOT EXISTS`,
`SELECT notification_version FROM domains_by_name_v2 WHERE domains_partition = 0 and name = cadence-domain-metadata `,
},
wantBatchQueries: []string{
Expand All @@ -208,7 +209,7 @@ func TestInsertDomain(t *testing.T) {
`previous_failover_version, ` +
`failover_end_time, ` +
`last_updated_time, ` +
`notification_version) ` +
`notification_version, created_time) ` +
`VALUES(` +
`0, ` +
`test-domain-name, ` +
Expand All @@ -222,7 +223,7 @@ func TestInsertDomain(t *testing.T) {
`-1, ` +
`1712167200000000000, ` +
`1712167200000000000, ` +
`7) ` +
`7, 2025-01-06T15:00:00Z) ` +
`IF NOT EXISTS`,
`UPDATE domains_by_name_v2 SET notification_version = 8 WHERE domains_partition = 0 and name = cadence-domain-metadata IF notification_version = 7 `,
},
Expand All @@ -249,6 +250,7 @@ func TestInsertDomain(t *testing.T) {
logger := testlogger.New(t)
dc := &persistence.DynamicConfiguration{}
db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client))
db.timeSrc = clock.NewMockedTimeSourceAt(FixedTime)

err := db.InsertDomain(context.Background(), tc.row)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

// InsertIntoHistoryTreeAndNode inserts one or two rows: tree row and node row(at least one of them)
func (db *cdb) InsertIntoHistoryTreeAndNode(ctx context.Context, treeRow *nosqlplugin.HistoryTreeRow, nodeRow *nosqlplugin.HistoryNodeRow) error {
timeStamp := db.timeSrc.Now()
if treeRow == nil && nodeRow == nil {
return fmt.Errorf("require at least a tree row or a node row to insert")
}
Expand All @@ -53,19 +54,19 @@ func (db *cdb) InsertIntoHistoryTreeAndNode(ctx context.Context, treeRow *nosqlp
// Note: for perf, prefer using batch for inserting more than one records
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
batch.Query(v2templateInsertTree,
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info)
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, timeStamp)
batch.Query(v2templateUpsertData,
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding)
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding, timeStamp)
err = db.session.ExecuteBatch(batch)
} else {
var query gocql.Query
if treeRow != nil {
query = db.session.Query(v2templateInsertTree,
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info).WithContext(ctx)
treeRow.TreeID, treeRow.BranchID, ancs, persistence.UnixNanoToDBTimestamp(treeRow.CreateTimestamp.UnixNano()), treeRow.Info, timeStamp).WithContext(ctx)
}
if nodeRow != nil {
query = db.session.Query(v2templateUpsertData,
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding).WithContext(ctx)
nodeRow.TreeID, nodeRow.BranchID, nodeRow.NodeID, nodeRow.TxnID, nodeRow.Data, nodeRow.DataEncoding, timeStamp).WithContext(ctx)
}
err = query.Exec()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ package cassandra
const (
// below are templates for history_node table
v2templateUpsertData = `INSERT INTO history_node (` +
`tree_id, branch_id, node_id, txn_id, data, data_encoding) ` +
`VALUES (?, ?, ?, ?, ?, ?) `
`tree_id, branch_id, node_id, txn_id, data, data_encoding, created_time) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?) `

v2templateReadData = `SELECT node_id, txn_id, data, data_encoding FROM history_node ` +
`WHERE tree_id = ? AND branch_id = ? AND node_id >= ? AND node_id < ? `
Expand All @@ -34,8 +34,8 @@ const (

// below are templates for history_tree table
v2templateInsertTree = `INSERT INTO history_tree (` +
`tree_id, branch_id, ancestors, fork_time, info) ` +
`VALUES (?, ?, ?, ?, ?) `
`tree_id, branch_id, ancestors, fork_time, info, created_time) ` +
`VALUES (?, ?, ?, ?, ?, ?) `

v2templateReadAllBranches = `SELECT branch_id, ancestors, fork_time, info FROM history_tree WHERE tree_id = ? `

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -120,7 +121,7 @@ func TestInsertIntoHistoryTreeAndNode(t *testing.T) {
tt.setupMocks(ctrl, session)
}

db := &cdb{session: session}
db := &cdb{session: session, timeSrc: clock.NewMockedTimeSourceAt(FixedTime)}
err := db.InsertIntoHistoryTreeAndNode(context.Background(), tt.treeRow, tt.nodeRow)
if tt.expectError {
assert.Error(t, err, "Expected an error but got none")
Expand Down
8 changes: 6 additions & 2 deletions common/persistence/nosql/nosqlplugin/cassandra/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func (db *cdb) InsertIntoQueue(
ctx context.Context,
row *nosqlplugin.QueueMessageRow,
) error {
query := db.session.Query(templateEnqueueMessageQuery, row.QueueType, row.ID, row.Payload).WithContext(ctx)
timeStamp := db.timeSrc.Now()
query := db.session.Query(templateEnqueueMessageQuery, row.QueueType, row.ID, row.Payload, timeStamp).WithContext(ctx)
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
Expand Down Expand Up @@ -172,8 +173,9 @@ func (db *cdb) InsertQueueMetadata(
queueType persistence.QueueType,
version int64,
) error {
timeStamp := db.timeSrc.Now()
clusterAckLevels := map[string]int64{}
query := db.session.Query(templateInsertQueueMetadataQuery, queueType, clusterAckLevels, version).WithContext(ctx)
query := db.session.Query(templateInsertQueueMetadataQuery, queueType, clusterAckLevels, version, timeStamp).WithContext(ctx)

// NOTE: Must pass nils to be compatible with ScyllaDB's LWT behavior
// "Scylla always returns the old version of the row, regardless of whether the condition is true or not."
Expand All @@ -193,9 +195,11 @@ func (db *cdb) UpdateQueueMetadataCas(
ctx context.Context,
row nosqlplugin.QueueMetadataRow,
) error {
timeStamp := db.timeSrc.Now()
query := db.session.Query(templateUpdateQueueMetadataQuery,
row.ClusterAckLevels,
row.Version,
timeStamp,
row.QueueType,
row.Version-1,
).WithContext(ctx)
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/nosql/nosqlplugin/cassandra/queue_cql.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
package cassandra

const (
templateEnqueueMessageQuery = `INSERT INTO queue (queue_type, message_id, message_payload) VALUES(?, ?, ?) IF NOT EXISTS`
templateEnqueueMessageQuery = `INSERT INTO queue (queue_type, message_id, message_payload, created_time) VALUES(?, ?, ?, ?) IF NOT EXISTS`
templateGetLastMessageIDQuery = `SELECT message_id FROM queue WHERE queue_type=? ORDER BY message_id DESC LIMIT 1`
templateGetMessagesQuery = `SELECT message_id, message_payload FROM queue WHERE queue_type = ? and message_id > ? LIMIT ?`
templateGetMessagesFromDLQQuery = `SELECT message_id, message_payload FROM queue WHERE queue_type = ? and message_id > ? and message_id <= ?`
templateRangeDeleteMessagesBeforeQuery = `DELETE FROM queue WHERE queue_type = ? and message_id < ?`
templateRangeDeleteMessagesBetweenQuery = `DELETE FROM queue WHERE queue_type = ? and message_id > ? and message_id <= ?`
templateDeleteMessageQuery = `DELETE FROM queue WHERE queue_type = ? and message_id = ?`
templateGetQueueMetadataQuery = `SELECT cluster_ack_level, version FROM queue_metadata WHERE queue_type = ?`
templateInsertQueueMetadataQuery = `INSERT INTO queue_metadata (queue_type, cluster_ack_level, version) VALUES(?, ?, ?) IF NOT EXISTS`
templateUpdateQueueMetadataQuery = `UPDATE queue_metadata SET cluster_ack_level = ?, version = ? WHERE queue_type = ? IF version = ?`
templateInsertQueueMetadataQuery = `INSERT INTO queue_metadata (queue_type, cluster_ack_level, version, created_time) VALUES(?, ?, ?, ?) IF NOT EXISTS`
templateUpdateQueueMetadataQuery = `UPDATE queue_metadata SET cluster_ack_level = ?, version = ?, last_updated_time = ? WHERE queue_type = ? IF version = ?`
templateGetQueueSizeQuery = `SELECT COUNT(1) AS count FROM queue WHERE queue_type=?`
)
10 changes: 7 additions & 3 deletions common/persistence/nosql/nosqlplugin/cassandra/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/google/go-cmp/cmp"
"go.uber.org/mock/gomock"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/persistence"
Expand All @@ -54,7 +55,7 @@ func TestInsertIntoQueue(t *testing.T) {
}).Times(1)
},
wantQueries: []string{
`INSERT INTO queue (queue_type, message_id, message_payload) VALUES(1, 101, [116 101 115 116 45 112 97 121 108 111 97 100 45 49 48 49]) IF NOT EXISTS`,
`INSERT INTO queue (queue_type, message_id, message_payload, created_time) VALUES(1, 101, [116 101 115 116 45 112 97 121 108 111 97 100 45 49 48 49], 2025-01-06T15:00:00Z) IF NOT EXISTS`,
},
},
{
Expand Down Expand Up @@ -95,6 +96,7 @@ func TestInsertIntoQueue(t *testing.T) {
dc := &persistence.DynamicConfiguration{}

db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client))
db.timeSrc = clock.NewMockedTimeSourceAt(FixedTime)

err := db.InsertIntoQueue(context.Background(), tc.row)

Expand Down Expand Up @@ -799,7 +801,7 @@ func TestInsertQueueMetadata(t *testing.T) {
query.EXPECT().ScanCAS(gomock.Any()).Return(false, nil).Times(1)
},
wantQueries: []string{
`INSERT INTO queue_metadata (queue_type, cluster_ack_level, version) VALUES(2, map[], 25) IF NOT EXISTS`,
`INSERT INTO queue_metadata (queue_type, cluster_ack_level, version, created_time) VALUES(2, map[], 25, 2025-01-06T15:00:00Z) IF NOT EXISTS`,
},
},
{
Expand Down Expand Up @@ -827,6 +829,7 @@ func TestInsertQueueMetadata(t *testing.T) {
dc := &persistence.DynamicConfiguration{}

db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client))
db.timeSrc = clock.NewMockedTimeSourceAt(FixedTime)

err := db.InsertQueueMetadata(context.Background(), tc.queueType, tc.version)

Expand Down Expand Up @@ -865,7 +868,7 @@ func TestUpdateQueueMetadataCas(t *testing.T) {
query.EXPECT().ScanCAS(gomock.Any()).Return(true, nil).Times(1)
},
wantQueries: []string{
`UPDATE queue_metadata SET cluster_ack_level = map[cluster1:1000 cluster2:2000], version = 25 WHERE queue_type = 2 IF version = 24`,
`UPDATE queue_metadata SET cluster_ack_level = map[cluster1:1000 cluster2:2000], version = 25, last_updated_time = 2025-01-06T15:00:00Z WHERE queue_type = 2 IF version = 24`,
},
},
{
Expand Down Expand Up @@ -910,6 +913,7 @@ func TestUpdateQueueMetadataCas(t *testing.T) {
dc := &persistence.DynamicConfiguration{}

db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client))
db.timeSrc = clock.NewMockedTimeSourceAt(FixedTime)

err := db.UpdateQueueMetadataCas(context.Background(), tc.row)

Expand Down
16 changes: 14 additions & 2 deletions common/persistence/nosql/nosqlplugin/cassandra/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func fromTaskListPartitionConfig(config *persistence.TaskListPartitionConfig) ma
// InsertTaskList insert a single tasklist row
// Return TaskOperationConditionFailure if the condition doesn't meet
func (db *cdb) InsertTaskList(ctx context.Context, row *nosqlplugin.TaskListRow) error {
timeStamp := db.timeSrc.Now()
query := db.session.Query(templateInsertTaskListQuery,
row.DomainID,
row.TaskListName,
Expand All @@ -124,6 +125,7 @@ func (db *cdb) InsertTaskList(ctx context.Context, row *nosqlplugin.TaskListRow)
row.TaskListKind,
row.LastUpdatedTime,
fromTaskListPartitionConfig(row.AdaptivePartitionConfig),
timeStamp,
).WithContext(ctx)

previous := make(map[string]interface{})
Expand All @@ -142,6 +144,7 @@ func (db *cdb) UpdateTaskList(
row *nosqlplugin.TaskListRow,
previousRangeID int64,
) error {
timeStamp := db.timeSrc.Now()
query := db.session.Query(templateUpdateTaskListQuery,
row.RangeID,
row.DomainID,
Expand All @@ -151,6 +154,7 @@ func (db *cdb) UpdateTaskList(
row.TaskListKind,
row.LastUpdatedTime,
fromTaskListPartitionConfig(row.AdaptivePartitionConfig),
timeStamp,
row.DomainID,
row.TaskListName,
row.TaskListType,
Expand Down Expand Up @@ -194,6 +198,7 @@ func (db *cdb) UpdateTaskListWithTTL(
row *nosqlplugin.TaskListRow,
previousRangeID int64,
) error {
timeStamp := db.timeSrc.Now()
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
// part 1 is used to set TTL on primary key as UPDATE can't set TTL for primary key
batch.Query(templateUpdateTaskListQueryWithTTLPart1,
Expand All @@ -202,6 +207,7 @@ func (db *cdb) UpdateTaskListWithTTL(
row.TaskListType,
rowTypeTaskList,
taskListTaskID,
timeStamp,
ttlSeconds,
)
// part 2 is for CAS and setting TTL for the rest of the columns
Expand All @@ -213,8 +219,9 @@ func (db *cdb) UpdateTaskListWithTTL(
row.TaskListType,
row.AckLevel,
row.TaskListKind,
db.timeSrc.Now(),
timeStamp,
fromTaskListPartitionConfig(row.AdaptivePartitionConfig),
timeStamp,
row.DomainID,
row.TaskListName,
row.TaskListType,
Expand Down Expand Up @@ -275,6 +282,7 @@ func (db *cdb) InsertTasks(
domainID := tasklistCondition.DomainID
taskListName := tasklistCondition.TaskListName
taskListType := tasklistCondition.TaskListType
timeStamp := db.timeSrc.Now()

for _, task := range tasksToInsert {
scheduleID := task.ScheduledID
Expand All @@ -291,7 +299,9 @@ func (db *cdb) InsertTasks(
task.RunID,
scheduleID,
task.CreatedTime,
task.PartitionConfig)
task.PartitionConfig,
timeStamp,
)
} else {
if ttl > maxCassandraTTL {
ttl = maxCassandraTTL
Expand All @@ -308,13 +318,15 @@ func (db *cdb) InsertTasks(
scheduleID,
task.CreatedTime,
task.PartitionConfig,
timeStamp,
ttl)
}
}

// The following query is used to ensure that range_id didn't change
batch.Query(templateUpdateTaskListRangeIDQuery,
tasklistCondition.RangeID,
timeStamp,
domainID,
taskListName,
taskListType,
Expand Down
Loading
Loading