diff --git a/codebase/app/task_queue_worker/graphql_resolver.go b/codebase/app/task_queue_worker/graphql_resolver.go index 5ad61831..75de0bd5 100644 --- a/codebase/app/task_queue_worker/graphql_resolver.go +++ b/codebase/app/task_queue_worker/graphql_resolver.go @@ -274,15 +274,20 @@ func (r *rootResolver) RecalculateSummary(ctx context.Context) (string, error) { func (r *rootResolver) ClearAllClientSubscriber(ctx context.Context) (string, error) { - for range clientTaskSubscribers { - closeAllSubscribers <- struct{}{} - } - for range clientTaskJobListSubscribers { - closeAllSubscribers <- struct{}{} - } - for range clientJobDetailSubscribers { - closeAllSubscribers <- struct{}{} - } + go func() { + for k := range clientTaskSubscribers { + removeTaskListSubscriber(k) + closeAllSubscribers <- struct{}{} + } + for k := range clientTaskJobListSubscribers { + removeJobListSubscriber(k) + closeAllSubscribers <- struct{}{} + } + for k := range clientJobDetailSubscribers { + removeJobDetailSubscriber(k) + closeAllSubscribers <- struct{}{} + } + }() return "Success clear all client subscriber", nil } @@ -342,11 +347,11 @@ func (r *rootResolver) ListenTaskDashboard(ctx context.Context, input struct { autoRemoveClient := time.NewTicker(defaultOption.autoRemoveClientInterval) + go broadcastTaskList(r.worker.ctx) + go func() { defer func() { broadcastTaskList(r.worker.ctx); close(output); autoRemoveClient.Stop() }() - broadcastTaskList(r.worker.ctx) - select { case <-ctx.Done(): removeTaskListSubscriber(clientID) @@ -358,11 +363,7 @@ func (r *rootResolver) ListenTaskDashboard(ctx context.Context, input struct { return case <-autoRemoveClient.C: - output <- TaskListResolver{ - Meta: MetaTaskResolver{ - IsCloseSession: true, - }, - } + output <- TaskListResolver{Meta: MetaTaskResolver{IsCloseSession: true}} removeTaskListSubscriber(clientID) return } @@ -405,12 +406,12 @@ func (r *rootResolver) ListenTaskJobList(ctx context.Context, input struct { return nil, err } + go broadcastJobListToClient(ctx, clientID) + autoRemoveClient := time.NewTicker(defaultOption.autoRemoveClientInterval) go func() { defer func() { close(output); autoRemoveClient.Stop() }() - broadcastJobListToClient(ctx, clientID) - select { case <-ctx.Done(): removeJobListSubscriber(clientID) @@ -422,11 +423,7 @@ func (r *rootResolver) ListenTaskJobList(ctx context.Context, input struct { return case <-autoRemoveClient.C: - output <- JobListResolver{ - Meta: MetaJobList{ - IsCloseSession: true, - }, - } + output <- JobListResolver{Meta: MetaJobList{IsCloseSession: true}} removeJobListSubscriber(clientID) return @@ -458,22 +455,24 @@ func (r *rootResolver) ListenJobDetail(ctx context.Context, input struct { return nil, err } + go broadcastJobDetail(ctx) + autoRemoveClient := time.NewTicker(defaultOption.autoRemoveClientInterval) go func() { defer func() { close(output); autoRemoveClient.Stop() }() - broadcastJobDetail(ctx) - select { case <-ctx.Done(): removeJobDetailSubscriber(clientID) return case <-closeAllSubscribers: + output <- Job{} removeJobDetailSubscriber(clientID) return case <-autoRemoveClient.C: + output <- Job{} removeJobDetailSubscriber(clientID) return diff --git a/codebase/app/task_queue_worker/job_operation.go b/codebase/app/task_queue_worker/job_operation.go index 51f8d646..921d21a5 100644 --- a/codebase/app/task_queue_worker/job_operation.go +++ b/codebase/app/task_queue_worker/job_operation.go @@ -53,7 +53,6 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) { }() trace.SetTag("task_name", req.TaskName) - trace.Log("message", req.Args) if err = req.Validate(); err != nil { return jobID, err @@ -63,6 +62,8 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) { return AddJobViaHTTPRequest(ctx, defaultOption.externalWorkerHost, req) } + trace.Log("message", req.Args) + task, ok := registeredTask[req.TaskName] if !ok { return jobID, fmt.Errorf("task '%s' unregistered, task must one of [%s]", req.TaskName, strings.Join(tasks, ", ")) @@ -85,6 +86,7 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) { semaphoreAddJob <- struct{}{} go func(ctx context.Context, job *Job, workerIndex int) { + defer func() { <-semaphoreAddJob }() persistent.SaveJob(ctx, job) persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{ @@ -96,7 +98,6 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) { registerJobToWorker(&newJob, workerIndex) refreshWorkerNotif <- struct{}{} } - <-semaphoreAddJob }(context.Background(), &newJob, task.workerIndex) diff --git a/codebase/app/task_queue_worker/subscribers.go b/codebase/app/task_queue_worker/subscribers.go index c2a52994..f1eccad2 100644 --- a/codebase/app/task_queue_worker/subscribers.go +++ b/codebase/app/task_queue_worker/subscribers.go @@ -78,7 +78,6 @@ func broadcastAllToSubscribers(ctx context.Context) { return } - semaphoreBroadcast <- struct{}{} go func(ctx context.Context) { if len(clientTaskSubscribers) > 0 { broadcastTaskList(ctx) @@ -89,7 +88,6 @@ func broadcastAllToSubscribers(ctx context.Context) { if len(clientTaskJobListSubscribers) > 0 { broadcastJobList(ctx) } - <-semaphoreBroadcast }(ctx) } diff --git a/codebase/app/task_queue_worker/types.go b/codebase/app/task_queue_worker/types.go index fc7b8fa8..92069efb 100644 --- a/codebase/app/task_queue_worker/types.go +++ b/codebase/app/task_queue_worker/types.go @@ -169,10 +169,10 @@ var ( queue QueueStorage persistent Persistent - refreshWorkerNotif, shutdown, closeAllSubscribers, semaphoreBroadcast, semaphoreAddJob chan struct{} - semaphore []chan struct{} - mutex sync.Mutex - tasks []string + refreshWorkerNotif, shutdown, closeAllSubscribers, semaphoreAddJob chan struct{} + semaphore []chan struct{} + mutex sync.Mutex + tasks []string clientTaskSubscribers map[string]*clientTaskDashboardSubscriber clientTaskJobListSubscribers map[string]*clientTaskJobListSubscriber @@ -210,7 +210,6 @@ func makeAllGlobalVars(opts ...OptionFunc) { persistent = defaultOption.persistent refreshWorkerNotif, shutdown, closeAllSubscribers = make(chan struct{}), make(chan struct{}, 1), make(chan struct{}) - semaphoreBroadcast = make(chan struct{}, env.BaseEnv().MaxGoroutines) semaphoreAddJob = make(chan struct{}, env.BaseEnv().MaxGoroutines) clientTaskSubscribers = make(map[string]*clientTaskDashboardSubscriber, defaultOption.maxClientSubscriber) clientTaskJobListSubscribers = make(map[string]*clientTaskJobListSubscriber, defaultOption.maxClientSubscriber) diff --git a/init.go b/init.go index 41a2408e..ab32ec1e 100644 --- a/init.go +++ b/init.go @@ -2,5 +2,5 @@ package candi const ( // Version of this library - Version = "v1.11.19" + Version = "v1.11.20" )