Skip to content

Commit

Permalink
Merge branch 'master' into starGazers
Browse files Browse the repository at this point in the history
  • Loading branch information
ibarrajo authored Mar 20, 2024
2 parents abb3088 + dbc5787 commit bd23893
Show file tree
Hide file tree
Showing 27 changed files with 516 additions and 67 deletions.
74 changes: 69 additions & 5 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/startreedata/pinot-client-go v0.2.0 // latest release supports pinot v0.12.0 which is also internal version
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible // indirect
github.com/uber/cadence-idl v0.0.0-20240212223805-34b4519b2709
github.com/uber/cadence-idl v0.0.0-20240318101217-afd574441210
github.com/uber/ringpop-go v0.8.5 // indirect
github.com/uber/tchannel-go v1.22.2 // indirect
github.com/urfave/cli v1.22.4
Expand Down
4 changes: 2 additions & 2 deletions cmd/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240212223805-34b4519b2709 h1:1u+kMB6p8P9fjK6jk3QHAl8PxLyNjO9/TMXoPOVr1O8=
github.com/uber/cadence-idl v0.0.0-20240212223805-34b4519b2709/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240318101217-afd574441210 h1:iuPF2QAZM90Jpx0s+2LnxLN+ElGsxB2JnXKGNLc18+w=
github.com/uber/cadence-idl v0.0.0-20240318101217-afd574441210/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,8 @@ const (
// DefaultHistoryMaxAutoResetPoints is the default maximum number for auto reset points
DefaultHistoryMaxAutoResetPoints = 20
)

const (
// WorkflowIDRateLimitReason is the reason set in ServiceBusyError when workflow ID rate limit is exceeded
WorkflowIDRateLimitReason = "external-workflow-id-rate-limit"
)
52 changes: 24 additions & 28 deletions common/persistence/nosql/nosqlplugin/cassandra/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"time"

"github.com/uber/cadence/common"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -191,44 +191,44 @@ func (db *cdb) SelectWorkflowExecution(ctx context.Context, shardID int, domainI
state := &nosqlplugin.WorkflowExecution{}
info := parseWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
state.ExecutionInfo = info
state.VersionHistories = p.NewDataBlob(result["version_histories"].([]byte), common.EncodingType(result["version_histories_encoding"].(string)))
state.VersionHistories = persistence.NewDataBlob(result["version_histories"].([]byte), common.EncodingType(result["version_histories_encoding"].(string)))
// TODO: remove this after all 2DC workflows complete
replicationState := parseReplicationState(result["replication_state"].(map[string]interface{}))
state.ReplicationState = replicationState

activityInfos := make(map[int64]*p.InternalActivityInfo)
activityInfos := make(map[int64]*persistence.InternalActivityInfo)
aMap := result["activity_map"].(map[int64]map[string]interface{})
for key, value := range aMap {
info := parseActivityInfo(domainID, value)
activityInfos[key] = info
}
state.ActivityInfos = activityInfos

timerInfos := make(map[string]*p.TimerInfo)
timerInfos := make(map[string]*persistence.TimerInfo)
tMap := result["timer_map"].(map[string]map[string]interface{})
for key, value := range tMap {
info := parseTimerInfo(value)
timerInfos[key] = info
}
state.TimerInfos = timerInfos

childExecutionInfos := make(map[int64]*p.InternalChildExecutionInfo)
childExecutionInfos := make(map[int64]*persistence.InternalChildExecutionInfo)
cMap := result["child_executions_map"].(map[int64]map[string]interface{})
for key, value := range cMap {
info := parseChildExecutionInfo(value)
childExecutionInfos[key] = info
}
state.ChildExecutionInfos = childExecutionInfos

requestCancelInfos := make(map[int64]*p.RequestCancelInfo)
requestCancelInfos := make(map[int64]*persistence.RequestCancelInfo)
rMap := result["request_cancel_map"].(map[int64]map[string]interface{})
for key, value := range rMap {
info := parseRequestCancelInfo(value)
requestCancelInfos[key] = info
}
state.RequestCancelInfos = requestCancelInfos

signalInfos := make(map[int64]*p.SignalInfo)
signalInfos := make(map[int64]*persistence.SignalInfo)
sMap := result["signal_map"].(map[int64]map[string]interface{})
for key, value := range sMap {
info := parseSignalInfo(value)
Expand All @@ -244,7 +244,7 @@ func (db *cdb) SelectWorkflowExecution(ctx context.Context, shardID int, domainI
state.SignalRequestedIDs = signalRequestedIDs

eList := result["buffered_events_list"].([]map[string]interface{})
bufferedEventsBlobs := make([]*p.DataBlob, 0, len(eList))
bufferedEventsBlobs := make([]*persistence.DataBlob, 0, len(eList))
for _, v := range eList {
blob := parseHistoryEventBatchBlob(v)
bufferedEventsBlobs = append(bufferedEventsBlobs, blob)
Expand Down Expand Up @@ -284,7 +284,7 @@ func (db *cdb) DeleteWorkflowExecution(ctx context.Context, shardID int, domainI
return db.executeWithConsistencyAll(query)
}

func (db *cdb) SelectAllCurrentWorkflows(ctx context.Context, shardID int, pageToken []byte, pageSize int) ([]*p.CurrentWorkflowExecution, []byte, error) {
func (db *cdb) SelectAllCurrentWorkflows(ctx context.Context, shardID int, pageToken []byte, pageSize int) ([]*persistence.CurrentWorkflowExecution, []byte, error) {
query := db.session.Query(
templateListCurrentExecutionsQuery,
shardID,
Expand All @@ -298,14 +298,14 @@ func (db *cdb) SelectAllCurrentWorkflows(ctx context.Context, shardID int, pageT
}
}
result := make(map[string]interface{})
var executions []*p.CurrentWorkflowExecution
var executions []*persistence.CurrentWorkflowExecution
for iter.MapScan(result) {
runID := result["run_id"].(gocql.UUID).String()
if runID != permanentRunID {
result = make(map[string]interface{})
continue
}
executions = append(executions, &p.CurrentWorkflowExecution{
executions = append(executions, &persistence.CurrentWorkflowExecution{
DomainID: result["domain_id"].(gocql.UUID).String(),
WorkflowID: result["workflow_id"].(string),
RunID: permanentRunID,
Expand All @@ -316,11 +316,10 @@ func (db *cdb) SelectAllCurrentWorkflows(ctx context.Context, shardID int, pageT
}
nextPageToken := getNextPageToken(iter)

err := iter.Close()
return executions, nextPageToken, err
return executions, nextPageToken, iter.Close()
}

func (db *cdb) SelectAllWorkflowExecutions(ctx context.Context, shardID int, pageToken []byte, pageSize int) ([]*p.InternalListConcreteExecutionsEntity, []byte, error) {
func (db *cdb) SelectAllWorkflowExecutions(ctx context.Context, shardID int, pageToken []byte, pageSize int) ([]*persistence.InternalListConcreteExecutionsEntity, []byte, error) {
query := db.session.Query(
templateListWorkflowExecutionQuery,
shardID,
Expand All @@ -335,25 +334,22 @@ func (db *cdb) SelectAllWorkflowExecutions(ctx context.Context, shardID int, pag
}

result := make(map[string]interface{})
var executions []*p.InternalListConcreteExecutionsEntity
var executions []*persistence.InternalListConcreteExecutionsEntity
for iter.MapScan(result) {
runID := result["run_id"].(gocql.UUID).String()
if runID == permanentRunID {
result = make(map[string]interface{})
continue
}
executions = append(executions, &p.InternalListConcreteExecutionsEntity{
executions = append(executions, &persistence.InternalListConcreteExecutionsEntity{
ExecutionInfo: parseWorkflowExecutionInfo(result["execution"].(map[string]interface{})),
VersionHistories: p.NewDataBlob(result["version_histories"].([]byte), common.EncodingType(result["version_histories_encoding"].(string))),
VersionHistories: persistence.NewDataBlob(result["version_histories"].([]byte), common.EncodingType(result["version_histories_encoding"].(string))),
})
result = make(map[string]interface{})
}
nextPageToken := getNextPageToken(iter)

if err := iter.Close(); err != nil {
return nil, nil, err
}
return executions, nextPageToken, nil
return executions, nextPageToken, iter.Close()
}

func (db *cdb) IsWorkflowExecutionExists(ctx context.Context, shardID int, domainID, workflowID, runID string) (bool, error) {
Expand Down Expand Up @@ -444,8 +440,8 @@ func (db *cdb) RangeDeleteTransferTasks(ctx context.Context, shardID int, exclus

func (db *cdb) SelectTimerTasksOrderByVisibilityTime(ctx context.Context, shardID, pageSize int, pageToken []byte, inclusiveMinTime, exclusiveMaxTime time.Time) ([]*nosqlplugin.TimerTask, []byte, error) {
// Reading timer tasks need to be quorum level consistent, otherwise we could loose task
minTimestamp := p.UnixNanoToDBTimestamp(inclusiveMinTime.UnixNano())
maxTimestamp := p.UnixNanoToDBTimestamp(exclusiveMaxTime.UnixNano())
minTimestamp := persistence.UnixNanoToDBTimestamp(inclusiveMinTime.UnixNano())
maxTimestamp := persistence.UnixNanoToDBTimestamp(exclusiveMaxTime.UnixNano())
query := db.session.Query(templateGetTimerTasksQuery,
shardID,
rowTypeTimerTask,
Expand Down Expand Up @@ -479,7 +475,7 @@ func (db *cdb) SelectTimerTasksOrderByVisibilityTime(ctx context.Context, shardI
}

func (db *cdb) DeleteTimerTask(ctx context.Context, shardID int, taskID int64, visibilityTimestamp time.Time) error {
ts := p.UnixNanoToDBTimestamp(visibilityTimestamp.UnixNano())
ts := persistence.UnixNanoToDBTimestamp(visibilityTimestamp.UnixNano())
query := db.session.Query(templateCompleteTimerTaskQuery,
shardID,
rowTypeTimerTask,
Expand All @@ -494,8 +490,8 @@ func (db *cdb) DeleteTimerTask(ctx context.Context, shardID int, taskID int64, v
}

func (db *cdb) RangeDeleteTimerTasks(ctx context.Context, shardID int, inclusiveMinTime, exclusiveMaxTime time.Time) error {
start := p.UnixNanoToDBTimestamp(inclusiveMinTime.UnixNano())
end := p.UnixNanoToDBTimestamp(exclusiveMaxTime.UnixNano())
start := persistence.UnixNanoToDBTimestamp(inclusiveMinTime.UnixNano())
end := persistence.UnixNanoToDBTimestamp(exclusiveMaxTime.UnixNano())
query := db.session.Query(templateRangeCompleteTimerTaskQuery,
shardID,
rowTypeTimerTask,
Expand Down Expand Up @@ -635,9 +631,9 @@ func (db *cdb) InsertReplicationDLQTask(ctx context.Context, shardID int, source
task.NextEventID,
task.Version,
task.ScheduledID,
p.EventStoreVersion,
persistence.EventStoreVersion,
task.BranchToken,
p.EventStoreVersion,
persistence.EventStoreVersion,
task.NewRunBranchToken,
defaultVisibilityTimestamp,
defaultVisibilityTimestamp,
Expand Down
Loading

0 comments on commit bd23893

Please sign in to comment.