Skip to content

Commit

Permalink
task queue worker: fix remove client subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
agungdwiprasetyo committed Jul 5, 2022
1 parent f918b9f commit bab20bf
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 35 deletions.
49 changes: 24 additions & 25 deletions codebase/app/task_queue_worker/graphql_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,20 @@ func (r *rootResolver) RecalculateSummary(ctx context.Context) (string, error) {

func (r *rootResolver) ClearAllClientSubscriber(ctx context.Context) (string, error) {

for range clientTaskSubscribers {
closeAllSubscribers <- struct{}{}
}
for range clientTaskJobListSubscribers {
closeAllSubscribers <- struct{}{}
}
for range clientJobDetailSubscribers {
closeAllSubscribers <- struct{}{}
}
go func() {
for k := range clientTaskSubscribers {
removeTaskListSubscriber(k)
closeAllSubscribers <- struct{}{}
}
for k := range clientTaskJobListSubscribers {
removeJobListSubscriber(k)
closeAllSubscribers <- struct{}{}
}
for k := range clientJobDetailSubscribers {
removeJobDetailSubscriber(k)
closeAllSubscribers <- struct{}{}
}
}()

return "Success clear all client subscriber", nil
}
Expand Down Expand Up @@ -342,11 +347,11 @@ func (r *rootResolver) ListenTaskDashboard(ctx context.Context, input struct {

autoRemoveClient := time.NewTicker(defaultOption.autoRemoveClientInterval)

go broadcastTaskList(r.worker.ctx)

go func() {
defer func() { broadcastTaskList(r.worker.ctx); close(output); autoRemoveClient.Stop() }()

broadcastTaskList(r.worker.ctx)

select {
case <-ctx.Done():
removeTaskListSubscriber(clientID)
Expand All @@ -358,11 +363,7 @@ func (r *rootResolver) ListenTaskDashboard(ctx context.Context, input struct {
return

case <-autoRemoveClient.C:
output <- TaskListResolver{
Meta: MetaTaskResolver{
IsCloseSession: true,
},
}
output <- TaskListResolver{Meta: MetaTaskResolver{IsCloseSession: true}}
removeTaskListSubscriber(clientID)
return
}
Expand Down Expand Up @@ -405,12 +406,12 @@ func (r *rootResolver) ListenTaskJobList(ctx context.Context, input struct {
return nil, err
}

go broadcastJobListToClient(ctx, clientID)

autoRemoveClient := time.NewTicker(defaultOption.autoRemoveClientInterval)
go func() {
defer func() { close(output); autoRemoveClient.Stop() }()

broadcastJobListToClient(ctx, clientID)

select {
case <-ctx.Done():
removeJobListSubscriber(clientID)
Expand All @@ -422,11 +423,7 @@ func (r *rootResolver) ListenTaskJobList(ctx context.Context, input struct {
return

case <-autoRemoveClient.C:
output <- JobListResolver{
Meta: MetaJobList{
IsCloseSession: true,
},
}
output <- JobListResolver{Meta: MetaJobList{IsCloseSession: true}}
removeJobListSubscriber(clientID)
return

Expand Down Expand Up @@ -458,22 +455,24 @@ func (r *rootResolver) ListenJobDetail(ctx context.Context, input struct {
return nil, err
}

go broadcastJobDetail(ctx)

autoRemoveClient := time.NewTicker(defaultOption.autoRemoveClientInterval)
go func() {
defer func() { close(output); autoRemoveClient.Stop() }()

broadcastJobDetail(ctx)

select {
case <-ctx.Done():
removeJobDetailSubscriber(clientID)
return

case <-closeAllSubscribers:
output <- Job{}
removeJobDetailSubscriber(clientID)
return

case <-autoRemoveClient.C:
output <- Job{}
removeJobDetailSubscriber(clientID)
return

Expand Down
5 changes: 3 additions & 2 deletions codebase/app/task_queue_worker/job_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) {
}()

trace.SetTag("task_name", req.TaskName)
trace.Log("message", req.Args)

if err = req.Validate(); err != nil {
return jobID, err
Expand All @@ -63,6 +62,8 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) {
return AddJobViaHTTPRequest(ctx, defaultOption.externalWorkerHost, req)
}

trace.Log("message", req.Args)

task, ok := registeredTask[req.TaskName]
if !ok {
return jobID, fmt.Errorf("task '%s' unregistered, task must one of [%s]", req.TaskName, strings.Join(tasks, ", "))
Expand All @@ -85,6 +86,7 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) {

semaphoreAddJob <- struct{}{}
go func(ctx context.Context, job *Job, workerIndex int) {
defer func() { <-semaphoreAddJob }()

persistent.SaveJob(ctx, job)
persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{
Expand All @@ -96,7 +98,6 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) {
registerJobToWorker(&newJob, workerIndex)
refreshWorkerNotif <- struct{}{}
}
<-semaphoreAddJob

}(context.Background(), &newJob, task.workerIndex)

Expand Down
2 changes: 0 additions & 2 deletions codebase/app/task_queue_worker/subscribers.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func broadcastAllToSubscribers(ctx context.Context) {
return
}

semaphoreBroadcast <- struct{}{}
go func(ctx context.Context) {
if len(clientTaskSubscribers) > 0 {
broadcastTaskList(ctx)
Expand All @@ -89,7 +88,6 @@ func broadcastAllToSubscribers(ctx context.Context) {
if len(clientTaskJobListSubscribers) > 0 {
broadcastJobList(ctx)
}
<-semaphoreBroadcast
}(ctx)
}

Expand Down
9 changes: 4 additions & 5 deletions codebase/app/task_queue_worker/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ var (
queue QueueStorage
persistent Persistent

refreshWorkerNotif, shutdown, closeAllSubscribers, semaphoreBroadcast, semaphoreAddJob chan struct{}
semaphore []chan struct{}
mutex sync.Mutex
tasks []string
refreshWorkerNotif, shutdown, closeAllSubscribers, semaphoreAddJob chan struct{}
semaphore []chan struct{}
mutex sync.Mutex
tasks []string

clientTaskSubscribers map[string]*clientTaskDashboardSubscriber
clientTaskJobListSubscribers map[string]*clientTaskJobListSubscriber
Expand Down Expand Up @@ -210,7 +210,6 @@ func makeAllGlobalVars(opts ...OptionFunc) {
persistent = defaultOption.persistent

refreshWorkerNotif, shutdown, closeAllSubscribers = make(chan struct{}), make(chan struct{}, 1), make(chan struct{})
semaphoreBroadcast = make(chan struct{}, env.BaseEnv().MaxGoroutines)
semaphoreAddJob = make(chan struct{}, env.BaseEnv().MaxGoroutines)
clientTaskSubscribers = make(map[string]*clientTaskDashboardSubscriber, defaultOption.maxClientSubscriber)
clientTaskJobListSubscribers = make(map[string]*clientTaskJobListSubscriber, defaultOption.maxClientSubscriber)
Expand Down
2 changes: 1 addition & 1 deletion init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package candi

const (
// Version of this library
Version = "v1.11.19"
Version = "v1.11.20"
)

0 comments on commit bab20bf

Please sign in to comment.