Skip to content

Commit

Permalink
task queue worker: set stop all job to background
Browse files Browse the repository at this point in the history
  • Loading branch information
agungdwiprasetyo committed Apr 3, 2024
1 parent 22eccf9 commit 9536ab1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
52 changes: 26 additions & 26 deletions codebase/app/task_queue_worker/graphql_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,31 +185,33 @@ func (r *rootResolver) StopAllJob(ctx context.Context, input struct {
input.TaskName, strings.Join(r.engine.tasks, ", "))
}

r.engine.subscriber.broadcastWhenChangeAllJob(r.engine.ctx, input.TaskName, true, "Stopping...")
r.engine.opt.queue.Clear(ctx, input.TaskName)
go r.engine.stopAllJobInTask(input.TaskName)

incrQuery := map[string]int64{}
affectedStatus := []string{string(StatusQueueing), string(StatusRetrying)}
for _, status := range affectedStatus {
countMatchedFilter, countAffected, err := r.engine.opt.persistent.UpdateJob(ctx,
&Filter{
TaskName: input.TaskName, Status: &status,
},
map[string]interface{}{
"status": StatusStopped,
},
)
if err != nil {
continue
go func(ctx context.Context, taskName string) {
r.engine.subscriber.broadcastWhenChangeAllJob(ctx, taskName, true, "Stopping...")
r.engine.opt.queue.Clear(ctx, taskName)
r.engine.stopAllJobInTask(taskName)

incrQuery := map[string]int64{}
affectedStatus := []string{string(StatusQueueing), string(StatusRetrying)}
for _, status := range affectedStatus {
countMatchedFilter, countAffected, err := r.engine.opt.persistent.UpdateJob(ctx,
&Filter{
TaskName: taskName, Status: &status,
},
map[string]interface{}{
"status": StatusStopped,
},
)
if err != nil {
continue
}
incrQuery[strings.ToLower(status)] -= countMatchedFilter
incrQuery[strings.ToLower(string(StatusStopped))] += countAffected
}
incrQuery[strings.ToLower(status)] -= countMatchedFilter
incrQuery[strings.ToLower(string(StatusStopped))] += countAffected
}

r.engine.subscriber.broadcastWhenChangeAllJob(r.engine.ctx, input.TaskName, false, "")
r.engine.opt.persistent.Summary().IncrementSummary(ctx, input.TaskName, incrQuery)
r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx)
r.engine.subscriber.broadcastWhenChangeAllJob(ctx, taskName, false, "")
r.engine.opt.persistent.Summary().IncrementSummary(ctx, taskName, incrQuery)
r.engine.subscriber.broadcastAllToSubscribers(ctx)
}(r.engine.ctx, input.TaskName)

return "Success stop all job in task " + input.TaskName, nil
}
Expand All @@ -218,7 +220,6 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct {
Filter FilterMutateJobInputResolver
}) (string, error) {
go func(ctx context.Context, req *FilterMutateJobInputResolver) {

filter := req.ToFilter()
r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, true, "Retrying...")

Expand Down Expand Up @@ -252,7 +253,7 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct {

r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, false, "")
r.engine.opt.persistent.Summary().IncrementSummary(ctx, filter.TaskName, incr)
r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx)
r.engine.subscriber.broadcastAllToSubscribers(ctx)
r.engine.registerNextJob(false, filter.TaskName)

}(r.engine.ctx, &input.Filter)
Expand All @@ -264,7 +265,6 @@ func (r *rootResolver) CleanJob(ctx context.Context, input struct {
Filter FilterMutateJobInputResolver
}) (string, error) {
go func(ctx context.Context, req *FilterMutateJobInputResolver) {

filter := req.ToFilter()
r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, true, "Cleaning...")

Expand Down
2 changes: 1 addition & 1 deletion init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package candi

const (
// Version of this library
Version = "v1.17.0"
Version = "v1.17.1"
)

0 comments on commit 9536ab1

Please sign in to comment.