Skip to content

Commit

Permalink
task queue worker: add handling locked task
Browse files Browse the repository at this point in the history
  • Loading branch information
agungdwiprasetyo committed Oct 4, 2023
1 parent 9a7e9b9 commit 9f3f967
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 11 deletions.
2 changes: 1 addition & 1 deletion codebase/app/task_queue_worker/graphql_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (r *rootResolver) SetConfiguration(ctx context.Context, input struct {
func (r *rootResolver) RunQueuedJob(ctx context.Context, input struct {
TaskName string
}) (res string, err error) {

r.engine.checkForUnlockTask(input.TaskName)
r.engine.registerNextJob(true, input.TaskName)
return "Success", nil
}
Expand Down
5 changes: 4 additions & 1 deletion codebase/app/task_queue_worker/job_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) {
if n := engine.opt.queue.PushJob(ctx, &newJob); n <= 1 && len(engine.semaphore[workerIndex-1]) == 0 {
engine.registerJobToWorker(&newJob)
}
if engine.opt.locker.HasBeenLocked(engine.getLockKey(newJob.TaskName)) {
engine.checkForUnlockTask(newJob.TaskName)
}

return newJob.ID, nil
}
Expand Down Expand Up @@ -141,7 +144,7 @@ func AddJobViaHTTPRequest(ctx context.Context, workerHost string, req *AddJobReq
},
"query": `mutation addJob($param: AddJobInputResolver!) { add_job(param: $param) }`,
}
httpResp, err := httpReq.DoRequest(ctx, http.MethodPost, workerHost+"/graphql", candihelper.ToBytes(reqBody), header)
httpResp, err := httpReq.DoRequest(ctx, http.MethodPost, strings.Trim(workerHost, "/")+"/graphql", candihelper.ToBytes(reqBody), header)
if err != nil {
return jobID, err
}
Expand Down
1 change: 0 additions & 1 deletion codebase/app/task_queue_worker/task_queue_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ func (t *taskQueueWorker) Name() string {
}

func (t *taskQueueWorker) registerJobToWorker(job *Job) {

interval, err := time.ParseDuration(job.Interval)
if err != nil || interval <= 0 {
logger.LogRed("invalid interval " + job.Interval)
Expand Down
20 changes: 12 additions & 8 deletions codebase/app/task_queue_worker/trigger_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func (t *taskQueueWorker) triggerTask(workerIndex int) {
go func(workerIndex int, task *Task) {
var ctx context.Context
ctx, task.cancel = context.WithCancel(t.ctx)

defer func() {
if r := recover(); r != nil {
logger.LogRed(fmt.Sprintf("task_queue_worker > panic: %v", r))
Expand All @@ -57,12 +56,13 @@ func (t *taskQueueWorker) triggerTask(workerIndex int) {
}

// lock for multiple worker (if running on multiple runtime)
isLocked := t.opt.locker.IsLocked(t.getLockKey(runningTask.taskName))
if isLocked {
lockKey := t.getLockKey(runningTask.taskName)
if t.opt.locker.IsLocked(lockKey) {
logger.LogI("task_queue_worker > task " + runningTask.taskName + " is locked")
t.checkForUnlockTask(runningTask.taskName)
return
}
defer t.opt.locker.Unlock(t.getLockKey(runningTask.taskName))
defer t.opt.locker.Unlock(lockKey)

t.execJob(ctx, task)

Expand Down Expand Up @@ -276,11 +276,17 @@ func (t *taskQueueWorker) getLockKey(jobID string) string {
return fmt.Sprintf("%s:task-queue-worker-lock:%s", t.service.Name(), jobID)
}

func (t *taskQueueWorker) registerNextJob(withStream bool, taskName string) {
func (t *taskQueueWorker) checkForUnlockTask(taskName string) {
if count := t.opt.persistent.CountAllJob(t.ctx, &Filter{
TaskName: taskName, Status: candihelper.ToStringPtr(StatusRetrying.String()),
}); count == 0 {
t.opt.locker.Unlock(t.getLockKey(taskName))
}
}

func (t *taskQueueWorker) registerNextJob(withStream bool, taskName string) {
nextJobID := t.opt.queue.NextJob(t.ctx, taskName)
if nextJobID != "" {

if nextJob, err := t.opt.persistent.FindJobByID(t.ctx, nextJobID, nil); err == nil {
t.registerJobToWorker(&nextJob)
return
Expand All @@ -290,7 +296,6 @@ func (t *taskQueueWorker) registerNextJob(withStream bool, taskName string) {
t.registerNextJob(false, taskName)

} else if withStream {

StreamAllJob(t.ctx, &Filter{
TaskName: taskName,
Sort: "created_at",
Expand All @@ -300,5 +305,4 @@ func (t *taskQueueWorker) registerNextJob(withStream bool, taskName string) {
})
t.registerNextJob(false, taskName)
}

}

0 comments on commit 9f3f967

Please sign in to comment.