From c67e92a75d4eb2643e2f08e45ed38a5855bf5263 Mon Sep 17 00:00:00 2001 From: agungdwiprasetyo Date: Fri, 28 Jun 2024 13:39:33 +0700 Subject: [PATCH] task queue worker: add new status for hold/unhold job queue --- codebase/app/task_queue_worker/globals.go | 1 - .../app/task_queue_worker/graphql_resolver.go | 109 ++++++++- .../app/task_queue_worker/graphql_schema.go | 8 +- .../app/task_queue_worker/graphql_types.go | 61 ++--- .../app/task_queue_worker/internal_task.go | 4 - .../app/task_queue_worker/job_operation.go | 25 +- .../app/task_queue_worker/persistent_model.go | 217 +++++++++++------- .../app/task_queue_worker/persistent_mongo.go | 18 +- .../app/task_queue_worker/persistent_sql.go | 35 ++- .../task_queue_worker/persistent_sql_tools.go | 81 ++++--- codebase/app/task_queue_worker/subscribers.go | 17 +- .../task_queue_worker/task_queue_worker.go | 114 +++++---- .../app/task_queue_worker/trigger_task.go | 11 +- codebase/app/task_queue_worker/types.go | 2 + 14 files changed, 431 insertions(+), 272 deletions(-) diff --git a/codebase/app/task_queue_worker/globals.go b/codebase/app/task_queue_worker/globals.go index f53651c7..2f1977b1 100644 --- a/codebase/app/task_queue_worker/globals.go +++ b/codebase/app/task_queue_worker/globals.go @@ -66,7 +66,6 @@ func initEngine(service factory.ServiceFactory, opts ...OptionFunc) *taskQueueWo engine = &taskQueueWorker{ service: service, - ready: make(chan struct{}), shutdown: make(chan struct{}, 1), refreshWorkerNotif: make(chan struct{}, 1), opt: &opt, diff --git a/codebase/app/task_queue_worker/graphql_resolver.go b/codebase/app/task_queue_worker/graphql_resolver.go index 0116b8fd..ce9150e7 100644 --- a/codebase/app/task_queue_worker/graphql_resolver.go +++ b/codebase/app/task_queue_worker/graphql_resolver.go @@ -17,6 +17,7 @@ import ( dashboard "github.com/golangid/candi-plugin/task-queue-worker/dashboard" "github.com/golangid/candi/candihelper" "github.com/golangid/candi/candishared" + cronexpr "github.com/golangid/candi/candiutils/cronparser" graphqlserver "github.com/golangid/candi/codebase/app/graphql_server" "github.com/golangid/candi/config/env" "github.com/golangid/candi/logger" @@ -65,7 +66,7 @@ type rootResolver struct { engine *taskQueueWorker } -func (r *rootResolver) Dashboard(ctx context.Context, input struct{ GC *bool }) (res DashboardResolver) { +func (r *rootResolver) Dashboard(ctx context.Context) (res DashboardResolver) { res.Banner = r.engine.opt.dashboardBanner res.Tagline = "Task Queue Worker Dashboard" res.Version = candi.Version @@ -77,10 +78,6 @@ func (r *rootResolver) Dashboard(ctx context.Context, input struct{ GC *bool }) _, isType := r.engine.opt.persistent.(*noopPersistent) res.Config.WithPersistent = !isType - if input.GC != nil && *input.GC { - runtime.GC() - } - // dependency health if err := r.engine.opt.persistent.Ping(ctx); err != nil { res.DependencyHealth.Persistent = candihelper.ToStringPtr(err.Error()) @@ -220,6 +217,8 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct { Filter FilterMutateJobInputResolver }) (string, error) { go func(ctx context.Context, req *FilterMutateJobInputResolver) { + summary := r.engine.opt.persistent.Summary().FindDetailSummary(ctx, req.TaskName) + filter := req.ToFilter() r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, true, "Retrying...") @@ -228,8 +227,12 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct { filter.Statuses = []string{string(StatusFailure), string(StatusStopped)} } - StreamAllJob(ctx, &filter, func(job *Job) { + StreamAllJob(ctx, &filter, func(idx, total int, job *Job) { r.engine.opt.queue.PushJob(ctx, job) + r.engine.opt.persistent.Summary().UpdateSummary(r.engine.ctx, filter.TaskName, map[string]interface{}{ + "is_loading": true, "loading_message": fmt.Sprintf(`Requeueing %d of %d`, idx, total), + }) + r.engine.subscriber.broadcastTaskList(r.engine.ctx) }) incr := map[string]int64{} @@ -251,9 +254,9 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct { incr[strings.ToLower(string(StatusQueueing))] += countAffected } - r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, false, "") + r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, false, summary.LoadingMessage) r.engine.opt.persistent.Summary().IncrementSummary(ctx, filter.TaskName, incr) - r.engine.subscriber.broadcastAllToSubscribers(ctx) + r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx) r.engine.registerNextJob(false, filter.TaskName) }(r.engine.ctx, &input.Filter) @@ -293,7 +296,6 @@ func (r *rootResolver) CleanJob(ctx context.Context, input struct { } func (r *rootResolver) RecalculateSummary(ctx context.Context) (string, error) { - RecalculateSummary(ctx) r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx) return "Success recalculate summary", nil @@ -419,7 +421,7 @@ func (r *rootResolver) SetConfiguration(ctx context.Context, input struct { func (r *rootResolver) RunQueuedJob(ctx context.Context, input struct { TaskName string }) (res string, err error) { - r.engine.checkForUnlockTask(input.TaskName) + r.engine.unlockTask(input.TaskName) r.engine.registerNextJob(true, input.TaskName) return "Success", nil } @@ -430,7 +432,7 @@ func (r *rootResolver) RestoreFromSecondary(ctx context.Context) (res RestoreSec secondaryPersistent: true, Status: candihelper.ToStringPtr(string(StatusQueueing)), } - res.TotalData = StreamAllJob(ctx, filter, func(job *Job) { + res.TotalData = StreamAllJob(ctx, filter, func(_, _ int, job *Job) { if err := r.engine.opt.persistent.SaveJob(ctx, job); err != nil { logger.LogE(err.Error()) return @@ -599,3 +601,88 @@ func (r *rootResolver) ListenDetailJob(ctx context.Context, input struct { return output, nil } + +func (r *rootResolver) HoldJobTask(ctx context.Context, input struct { + TaskName string + IsAutoSwitch bool + SwitchInterval *string + FirstSwitch *string +}) (res string, err error) { + summaryTask := r.engine.opt.persistent.Summary().FindDetailSummary(ctx, input.TaskName) + if summaryTask.TaskName == "" { + return res, errors.New("Task not found") + } + + defer r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx) + + if !input.IsAutoSwitch { + r.engine.opt.persistent.Summary().UpdateSummary(ctx, input.TaskName, map[string]interface{}{ + "is_hold": !summaryTask.IsHold, "loading_message": "Hold Mode", + }) + if !summaryTask.IsHold { + return + } + + r.engine.subscriber.broadcastWhenChangeAllJob(ctx, input.TaskName, true, "Unhold...") + r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx) + filter := &Filter{ + TaskName: input.TaskName, Status: candihelper.WrapPtr(StatusHold.String()), + Sort: "created_at", + } + StreamAllJob(ctx, filter, func(idx, total int, job *Job) { + r.engine.opt.queue.PushJob(ctx, job) + r.engine.opt.persistent.Summary().UpdateSummary(r.engine.ctx, filter.TaskName, map[string]interface{}{ + "is_loading": true, "loading_message": fmt.Sprintf(`Requeueing %d of %d`, idx, total), + }) + r.engine.subscriber.broadcastTaskList(r.engine.ctx) + }) + + matchedCount, affectedRow, _ := r.engine.opt.persistent.UpdateJob(ctx, + filter, + map[string]interface{}{ + "status": StatusQueueing, + "retries": 0, + }, + ) + r.engine.opt.persistent.Summary().IncrementSummary(ctx, filter.TaskName, map[string]int64{ + strings.ToLower(StatusQueueing.String()): matchedCount, + strings.ToLower(StatusHold.String()): -affectedRow, + }) + r.engine.registerNextJob(false, filter.TaskName) + r.engine.subscriber.broadcastWhenChangeAllJob(ctx, input.TaskName, false, "") + return + } + + if input.SwitchInterval != nil { + schedule, err := cronexpr.Parse(*input.SwitchInterval) + if err != nil { + return "", err + } + + date := make([]string, 6) + now := time.Now() + for i := range date { + nextInterval := schedule.NextInterval(now) + now = now.Add(nextInterval) + date[i] = now.Format(candihelper.DateFormatYYYYMMDDHHmmss) + } + } + + return "Success", nil +} + +func (r *rootResolver) ParseCronExpression(ctx context.Context, input struct{ Expr string }) (date []string, err error) { + schedule, err := cronexpr.Parse(input.Expr) + if err != nil { + return date, err + } + + date = make([]string, 6) + now := time.Now() + for i := range date { + nextInterval := schedule.NextInterval(now) + now = now.Add(nextInterval) + date[i] = now.Format(candihelper.DateFormatYYYYMMDDHHmmss) + } + return +} diff --git a/codebase/app/task_queue_worker/graphql_schema.go b/codebase/app/task_queue_worker/graphql_schema.go index 9de2bdd8..34545b46 100644 --- a/codebase/app/task_queue_worker/graphql_schema.go +++ b/codebase/app/task_queue_worker/graphql_schema.go @@ -7,13 +7,14 @@ const schema = `schema { } type Query { - dashboard(gc: Boolean): DashboardType! + dashboard(): DashboardType! get_detail_job(job_id: String!, filter: GetAllJobHistoryInputResolver): JobResolver! get_all_active_subscriber(): [ClientSubscriber!]! get_all_job(filter: GetAllJobInputResolver): JobListResolver! get_count_job(filter: GetAllJobInputResolver): Int! get_all_configuration(): [ConfigurationResolver!]! get_detail_configuration(key: String!): ConfigurationResolver! + parse_cron_expression(expr: String!): [String!]! } type Mutation { @@ -30,6 +31,7 @@ type Mutation { set_configuration(config: SetConfigurationInputResolver!): String! run_queued_job(task_name: String!): String! restore_from_secondary(): RestoreSecondaryResolver! + hold_job_task(task_name: String!, is_auto_switch: Boolean!, switch_interval: String, first_switch: String): String! } type Subscription { @@ -75,9 +77,11 @@ type MetaType { limit: Int! total_pages: Int! total_records: Int! + message: String! is_close_session: Boolean! is_loading: Boolean! is_freeze_broadcast: Boolean! + is_hold: Boolean! detail: TaskDetailResolver! } @@ -104,6 +108,7 @@ type TaskResolver { total_jobs: Int! is_loading: Boolean! loading_message: String! + is_hold: Boolean! detail: TaskDetailResolver! } @@ -118,6 +123,7 @@ type TaskDetailResolver { success: Int! queueing: Int! stopped: Int! + hold: Int! } type JobListResolver { diff --git a/codebase/app/task_queue_worker/graphql_types.go b/codebase/app/task_queue_worker/graphql_types.go index 1cd59b61..87fc57a0 100644 --- a/codebase/app/task_queue_worker/graphql_types.go +++ b/codebase/app/task_queue_worker/graphql_types.go @@ -53,6 +53,7 @@ type ( TotalJobs int IsLoading bool LoadingMessage string + IsHold bool Detail SummaryDetail } // TaskListResolver resolver @@ -72,15 +73,17 @@ type ( Limit int TotalRecords int TotalPages int + Message string IsCloseSession bool IsLoading bool IsFreezeBroadcast bool + IsHold bool Detail SummaryDetail } // SummaryDetail type SummaryDetail struct { - Failure, Retrying, Success, Queueing, Stopped int + Failure, Retrying, Success, Queueing, Stopped, Hold int } // JobListResolver resolver @@ -287,7 +290,9 @@ func (j *JobResolver) ParseFromJob(job *Job, maxArgsLength int) { j.NextRetryAt = time.Now().Add(delay).In(candihelper.AsiaJakartaLocalTime).Format(time.RFC3339) } j.CreatedAt = job.CreatedAt.In(candihelper.AsiaJakartaLocalTime).Format(time.RFC3339) - j.FinishedAt = job.FinishedAt.In(candihelper.AsiaJakartaLocalTime).Format(time.RFC3339) + if !job.FinishedAt.IsZero() { + j.FinishedAt = job.FinishedAt.In(candihelper.AsiaJakartaLocalTime).Format(time.RFC3339) + } if job.Retries > job.MaxRetry { j.Retries = job.MaxRetry } @@ -299,35 +304,35 @@ func (j *JobResolver) ParseFromJob(job *Job, maxArgsLength int) { } func (j *JobListResolver) GetAllJob(ctx context.Context, filter *Filter) { - - var meta MetaJobList - var taskDetailSummary []TaskSummary - + var detailSummary TaskSummary if candihelper.PtrToString(filter.Search) != "" || candihelper.PtrToString(filter.JobID) != "" || (filter.StartDate != "" && filter.EndDate != "") { - taskDetailSummary = engine.opt.persistent.AggregateAllTaskJob(ctx, filter) + taskDetailSummary := engine.opt.persistent.AggregateAllTaskJob(ctx, filter) + if len(taskDetailSummary) > 0 { + detailSummary = taskDetailSummary[0] + } } else { - taskDetailSummary = engine.opt.persistent.Summary().FindAllSummary(ctx, filter) - } - - for _, detailSummary := range taskDetailSummary { - detail := detailSummary.ToSummaryDetail() - meta.Detail.Failure += detail.Failure - meta.Detail.Retrying += detail.Retrying - meta.Detail.Success += detail.Success - meta.Detail.Queueing += detail.Queueing - meta.Detail.Stopped += detail.Stopped - meta.TotalRecords += detailSummary.CountTotalJob() - } - meta.Page, meta.Limit = filter.Page, filter.Limit - meta.TotalPages = int(math.Ceil(float64(meta.TotalRecords) / float64(meta.Limit))) - - j.Meta = meta - - for _, job := range engine.opt.persistent.FindAllJob(ctx, filter) { - var jobResolver JobResolver - jobResolver.ParseFromJob(&job, 100) - j.Data = append(j.Data, jobResolver) + detailSummary = engine.opt.persistent.Summary().FindDetailSummary(ctx, filter.TaskName) + detailSummary.ApplyFilterStatus(filter.Statuses) + } + + detail := detailSummary.ToSummaryDetail() + j.Meta.Detail.Failure = detail.Failure + j.Meta.Detail.Retrying = detail.Retrying + j.Meta.Detail.Success = detail.Success + j.Meta.Detail.Queueing = detail.Queueing + j.Meta.Detail.Stopped = detail.Stopped + j.Meta.Detail.Hold = detail.Hold + j.Meta.TotalRecords = detailSummary.CountTotalJob() + j.Meta.IsHold = detailSummary.IsHold + j.Meta.Message = detailSummary.LoadingMessage + j.Meta.Page, j.Meta.Limit = filter.Page, filter.Limit + j.Meta.TotalPages = int(math.Ceil(float64(j.Meta.TotalRecords) / float64(j.Meta.Limit))) + + jobs := engine.opt.persistent.FindAllJob(ctx, filter) + j.Data = make([]JobResolver, len(jobs)) + for i, job := range jobs { + j.Data[i].ParseFromJob(&job, 100) } } diff --git a/codebase/app/task_queue_worker/internal_task.go b/codebase/app/task_queue_worker/internal_task.go index 3d6fe331..df8e0ea6 100644 --- a/codebase/app/task_queue_worker/internal_task.go +++ b/codebase/app/task_queue_worker/internal_task.go @@ -11,7 +11,6 @@ import ( ) func (t *taskQueueWorker) registerInternalTask() { - retentionBeat := reflect.SelectCase{Dir: reflect.SelectRecv} internalTaskRetention := &Task{ isInternalTask: true, @@ -32,16 +31,13 @@ func (t *taskQueueWorker) registerInternalTask() { END: t.runningWorkerIndexTask[internalTaskRetention.workerIndex] = internalTaskRetention t.workerChannels = append(t.workerChannels, retentionBeat) - } func (t *taskQueueWorker) execInternalTask(task *Task) { - logger.LogIf("running internal task: %s", task.internalTaskName) switch task.internalTaskName { case configurationRetentionAgeKey: - cfg, _ := t.opt.persistent.GetConfiguration(configurationRetentionAgeKey) if !cfg.IsActive { return diff --git a/codebase/app/task_queue_worker/job_operation.go b/codebase/app/task_queue_worker/job_operation.go index e0f164ca..cc196773 100644 --- a/codebase/app/task_queue_worker/job_operation.go +++ b/codebase/app/task_queue_worker/job_operation.go @@ -31,7 +31,6 @@ type ( // Validate method func (a *AddJobRequest) Validate() error { - switch { case a.TaskName == "": return errors.New("Task name cannot empty") @@ -87,6 +86,14 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) { newJob.CreatedAt = time.Now() newJob.direct = req.direct + summary := engine.opt.persistent.Summary().FindDetailSummary(ctx, req.TaskName) + if summary.IsHold { + newJob.Status = string(StatusHold) + newJob.RetryHistories = []RetryHistory{ + {Status: newJob.Status, StartAt: time.Now(), EndAt: time.Now()}, + } + } + if err := engine.opt.persistent.SaveJob(ctx, &newJob); err != nil { trace.SetError(err) logger.LogE(fmt.Sprintf("Cannot save job, error: %s", err.Error())) @@ -100,11 +107,14 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) { strings.ToLower(newJob.Status): 1, }) engine.subscriber.broadcastAllToSubscribers(context.Background()) + if summary.IsHold || summary.IsLoading { + return newJob.ID, nil + } if n := engine.opt.queue.PushJob(ctx, &newJob); n <= 1 && len(engine.semaphore[workerIndex-1]) == 0 { engine.registerJobToWorker(&newJob) } if engine.opt.locker.HasBeenLocked(engine.getLockKey(newJob.TaskName)) { - engine.checkForUnlockTask(newJob.TaskName) + engine.unlockTask(newJob.TaskName) } return newJob.ID, nil @@ -248,7 +258,7 @@ func StopJob(ctx context.Context, jobID string) error { statusBefore := job.Status if job.Status == string(StatusRetrying) { - go engine.stopAllJobInTask(job.TaskName) + engine.stopAllJobInTask(job.TaskName) } job.Status = string(StatusStopped) @@ -271,7 +281,7 @@ func StopJob(ctx context.Context, jobID string) error { } // StreamAllJob api func for stream fetch all job -func StreamAllJob(ctx context.Context, filter *Filter, streamFunc func(job *Job)) (count int) { +func StreamAllJob(ctx context.Context, filter *Filter, streamFunc func(idx, total int, job *Job)) (count int) { if engine == nil { return } @@ -295,8 +305,9 @@ func StreamAllJob(ctx context.Context, filter *Filter, streamFunc func(job *Job) totalPages := int(math.Ceil(float64(count) / float64(filter.Limit))) for filter.Page <= totalPages { - for _, job := range perst.FindAllJob(ctx, filter) { - streamFunc(&job) + for i, job := range perst.FindAllJob(ctx, filter) { + offset := (filter.Page - 1) * filter.Limit + streamFunc(offset+i, count, &job) } filter.Page++ } @@ -357,7 +368,7 @@ func UpdateProgressJob(ctx context.Context, jobID string, numProcessed, maxProce engine.globalSemaphore <- struct{}{} go func() { defer func() { <-engine.globalSemaphore }() - engine.subscriber.broadcastJobDetail(context.Background()) + engine.subscriber.broadcastJobDetail(context.WithoutCancel(ctx)) }() } return nil diff --git a/codebase/app/task_queue_worker/persistent_model.go b/codebase/app/task_queue_worker/persistent_model.go index 83875119..1aea06bf 100644 --- a/codebase/app/task_queue_worker/persistent_model.go +++ b/codebase/app/task_queue_worker/persistent_model.go @@ -3,6 +3,8 @@ package taskqueueworker import ( "strings" "time" + + "github.com/golangid/candi/candihelper" ) const ( @@ -12,86 +14,27 @@ const ( configurationModelName = "task_queue_worker_configurations" ) -type ( - // Filter type - Filter struct { - Page int `json:"page"` - Limit int `json:"limit"` - Sort string `json:"sort,omitempty"` - TaskName string `json:"taskName,omitempty"` - TaskNameList []string `json:"taskNameList,omitempty"` - ExcludeTaskNameList []string `json:"excludeTaskNameList,omitempty"` - Search *string `json:"search,omitempty"` - JobID *string `json:"jobID,omitempty"` - Status *string `json:"status,omitempty"` - Statuses []string `json:"statuses,omitempty"` - ExcludeStatus []string `json:"excludeStatus,omitempty"` - ShowAll bool `json:"showAll,omitempty"` - ShowHistories *bool `json:"showHistories,omitempty"` - StartDate string `json:"startDate,omitempty"` - EndDate string `json:"endDate,omitempty"` - BeforeCreatedAt *time.Time `json:"beforeCreatedAt,omitempty"` - Count int `json:"count,omitempty"` - secondaryPersistent bool `json:"-"` - } - - // TaskSummary model - TaskSummary struct { - ID string `bson:"_id"` - TaskName string `bson:"task_name"` - Success int `bson:"success"` - Queueing int `bson:"queueing"` - Retrying int `bson:"retrying"` - Failure int `bson:"failure"` - Stopped int `bson:"stopped"` - IsLoading bool `bson:"is_loading"` - LoadingMessage string `bson:"loading_message"` - } - - // Job model - Job struct { - ID string `bson:"_id" json:"_id"` - TaskName string `bson:"task_name" json:"task_name"` - Arguments string `bson:"arguments" json:"arguments"` - Retries int `bson:"retries" json:"retries"` - MaxRetry int `bson:"max_retry" json:"max_retry"` - Interval string `bson:"interval" json:"interval"` - CreatedAt time.Time `bson:"created_at" json:"created_at"` - UpdatedAt time.Time `bson:"updated_at" json:"updated_at"` - FinishedAt time.Time `bson:"finished_at" json:"finished_at"` - Status string `bson:"status" json:"status"` - Error string `bson:"error" json:"error"` - ErrorStack string `bson:"-" json:"error_stack"` - Result string `bson:"result" json:"result"` - TraceID string `bson:"trace_id" json:"trace_id"` - CurrentProgress int `bson:"current_progress" json:"current_progress"` - MaxProgress int `bson:"max_progress" json:"max_progress"` - RetryHistories []RetryHistory `bson:"retry_histories" json:"retry_histories"` - NextRetryAt string `bson:"-" json:"-"` - direct bool `bson:"-" json:"-"` - } - - // RetryHistory model - RetryHistory struct { - ErrorStack string `bson:"error_stack" json:"error_stack"` - Status string `bson:"status" json:"status"` - Error string `bson:"error" json:"error"` - Result string `bson:"result" json:"result"` - TraceID string `bson:"trace_id" json:"trace_id"` - StartAt time.Time `bson:"start_at" json:"start_at"` - EndAt time.Time `bson:"end_at" json:"end_at"` - } - - // Configuration model - Configuration struct { - Key string `bson:"key" json:"key"` - Name string `bson:"name" json:"name"` - Value string `bson:"value" json:"value"` - IsActive bool `bson:"is_active" json:"is_active"` - } - - Configurations []Configuration -) +// Filter type +type Filter struct { + Page int `json:"page"` + Limit int `json:"limit"` + Sort string `json:"sort,omitempty"` + TaskName string `json:"taskName,omitempty"` + TaskNameList []string `json:"taskNameList,omitempty"` + ExcludeTaskNameList []string `json:"excludeTaskNameList,omitempty"` + Search *string `json:"search,omitempty"` + JobID *string `json:"jobID,omitempty"` + Status *string `json:"status,omitempty"` + Statuses []string `json:"statuses,omitempty"` + ExcludeStatus []string `json:"excludeStatus,omitempty"` + ShowAll bool `json:"showAll,omitempty"` + ShowHistories *bool `json:"showHistories,omitempty"` + StartDate string `json:"startDate,omitempty"` + EndDate string `json:"endDate,omitempty"` + BeforeCreatedAt *time.Time `json:"beforeCreatedAt,omitempty"` + Count int `json:"count,omitempty"` + secondaryPersistent bool `json:"-"` +} // CalculateOffset method func (f *Filter) CalculateOffset() int { @@ -107,10 +50,31 @@ func (f *Filter) ParseStartEndDate() (startDate, endDate time.Time) { return } +// TaskSummary model +type TaskSummary struct { + ID string `bson:"_id"` + TaskName string `bson:"task_name"` + Success int `bson:"success"` + Queueing int `bson:"queueing"` + Retrying int `bson:"retrying"` + Failure int `bson:"failure"` + Stopped int `bson:"stopped"` + Hold int `bson:"hold"` + IsLoading bool `bson:"is_loading"` + IsHold bool `bson:"is_hold"` + LoadingMessage string `bson:"loading_message"` + Config TaskConfig `bson:"config"` +} + +type TaskConfig struct { + HoldUnholdSwitchInterval string `bson:"hold_unhold_switch_interval" json:"hold_unhold_switch_interval"` + NextHoldUnhold string `bson:"next_hold_unhold" json:"next_hold_unhold"` +} + // CountTotalJob method func (s *TaskSummary) CountTotalJob() int { return normalizeCount(s.Success) + normalizeCount(s.Queueing) + normalizeCount(s.Retrying) + - normalizeCount(s.Failure) + normalizeCount(s.Stopped) + normalizeCount(s.Failure) + normalizeCount(s.Stopped) + normalizeCount(s.Hold) } // ToSummaryDetail method @@ -120,6 +84,7 @@ func (s *TaskSummary) ToSummaryDetail() (detail SummaryDetail) { detail.Success = normalizeCount(s.Success) detail.Queueing = normalizeCount(s.Queueing) detail.Stopped = normalizeCount(s.Stopped) + detail.Hold = normalizeCount(s.Hold) return } @@ -131,16 +96,14 @@ func (s *TaskSummary) ToTaskResolver() (res TaskResolver) { } res = TaskResolver{ - Name: s.TaskName, - ModuleName: engine.runningWorkerIndexTask[regTask].moduleName, - TotalJobs: s.CountTotalJob(), + Name: s.TaskName, + ModuleName: engine.runningWorkerIndexTask[regTask].moduleName, + TotalJobs: s.CountTotalJob(), + IsLoading: s.IsLoading, + IsHold: s.IsHold, + LoadingMessage: s.LoadingMessage, } res.Detail = s.ToSummaryDetail() - res.IsLoading = s.IsLoading - res.LoadingMessage = s.LoadingMessage - if res.LoadingMessage == "" { - res.LoadingMessage = "Processing..." - } return } @@ -152,6 +115,7 @@ func (s *TaskSummary) ToMapResult() map[string]int { strings.ToUpper(string(StatusSuccess)): s.Success, strings.ToUpper(string(StatusQueueing)): s.Queueing, strings.ToUpper(string(StatusStopped)): s.Stopped, + strings.ToUpper(string(StatusHold)): s.Hold, } } @@ -162,6 +126,71 @@ func (s *TaskSummary) SetValue(source map[string]int) { s.Success = source[strings.ToUpper(string(StatusSuccess))] s.Queueing = source[strings.ToUpper(string(StatusQueueing))] s.Stopped = source[strings.ToUpper(string(StatusStopped))] + s.Hold = source[strings.ToUpper(string(StatusHold))] +} + +// ApplyFilterStatus apply with filter status +func (s *TaskSummary) ApplyFilterStatus(statuses []string) { + if len(statuses) == 0 { + return + } + mapRes := s.ToMapResult() + newCount := map[string]int{} + for _, status := range statuses { + newCount[strings.ToUpper(status)] = mapRes[strings.ToUpper(status)] + } + s.SetValue(newCount) +} + +func (s TaskSummary) GetColumnName() []string { + return []string{"id", "success", "queueing", "retrying", "failure", "stopped", "is_loading", "is_hold", "hold", "loading_message"} +} + +func (s *TaskSummary) Scan(scanner interface{ Scan(...any) error }) error { + return scanner.Scan(&s.TaskName, &s.Success, &s.Queueing, &s.Retrying, + &s.Failure, &s.Stopped, &s.IsLoading, &s.IsHold, &s.Hold, &s.LoadingMessage) +} + +func (s *TaskSummary) ToArgs(val map[string]any) (args []any) { + return []any{ + s.TaskName, candihelper.ToInt(val["success"]), candihelper.ToInt(val["queueing"]), candihelper.ToInt(val["retrying"]), + candihelper.ToInt(val["failure"]), candihelper.ToInt(val["stopped"]), val["is_loading"], + val["is_hold"], val["hold"], val["loading_message"], + } +} + +// Job model +type Job struct { + ID string `bson:"_id" json:"_id"` + TaskName string `bson:"task_name" json:"task_name"` + Arguments string `bson:"arguments" json:"arguments"` + Retries int `bson:"retries" json:"retries"` + MaxRetry int `bson:"max_retry" json:"max_retry"` + Interval string `bson:"interval" json:"interval"` + CreatedAt time.Time `bson:"created_at" json:"created_at"` + UpdatedAt time.Time `bson:"updated_at" json:"updated_at"` + FinishedAt time.Time `bson:"finished_at" json:"finished_at"` + Status string `bson:"status" json:"status"` + Error string `bson:"error" json:"error"` + ErrorStack string `bson:"-" json:"error_stack"` + Result string `bson:"result" json:"result"` + TraceID string `bson:"trace_id" json:"trace_id"` + CurrentProgress int `bson:"current_progress" json:"current_progress"` + MaxProgress int `bson:"max_progress" json:"max_progress"` + RetryHistories []RetryHistory `bson:"retry_histories" json:"retry_histories"` + NextRetryAt string `bson:"-" json:"-"` + direct bool `bson:"-" json:"-"` +} + +// RetryHistory model +type RetryHistory struct { + ErrorStack string `bson:"error_stack" json:"error_stack"` + Status string `bson:"status" json:"status"` + Error string `bson:"error" json:"error"` + Result string `bson:"result" json:"result"` + TraceID string `bson:"trace_id" json:"trace_id"` + StartAt time.Time `bson:"start_at" json:"start_at"` + EndAt time.Time `bson:"end_at" json:"end_at"` } func (job *Job) toMap() map[string]interface{} { @@ -179,6 +208,16 @@ func (job *Job) toMap() map[string]interface{} { } } +// Configuration model +type Configuration struct { + Key string `bson:"key" json:"key"` + Name string `bson:"name" json:"name"` + Value string `bson:"value" json:"value"` + IsActive bool `bson:"is_active" json:"is_active"` +} + +type Configurations []Configuration + // ToMap method func (c Configurations) ToMap() map[string]string { mp := make(map[string]string, len(c)) diff --git a/codebase/app/task_queue_worker/persistent_mongo.go b/codebase/app/task_queue_worker/persistent_mongo.go index 3ae13a3b..2d9bd8e8 100644 --- a/codebase/app/task_queue_worker/persistent_mongo.go +++ b/codebase/app/task_queue_worker/persistent_mongo.go @@ -262,7 +262,6 @@ func (s *MongoPersistent) CountAllJob(ctx context.Context, filter *Filter) int { } func (s *MongoPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filter) (results []TaskSummary) { - pipeQuery := []bson.M{ { "$match": s.toBsonFilter(filter), @@ -275,6 +274,7 @@ func (s *MongoPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filte "retrying": bson.M{"$cond": bson.M{"if": bson.M{"$eq": []interface{}{"$status", StatusRetrying}}, "then": 1, "else": 0}}, "failure": bson.M{"$cond": bson.M{"if": bson.M{"$eq": []interface{}{"$status", StatusFailure}}, "then": 1, "else": 0}}, "stopped": bson.M{"$cond": bson.M{"if": bson.M{"$eq": []interface{}{"$status", StatusStopped}}, "then": 1, "else": 0}}, + "hold": bson.M{"$cond": bson.M{"if": bson.M{"$eq": []interface{}{"$status", StatusHold}}, "then": 1, "else": 0}}, }, }, { @@ -295,6 +295,9 @@ func (s *MongoPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filte "stopped": bson.M{ "$sum": "$stopped", }, + "hold": bson.M{ + "$sum": "$hold", + }, }, }, } @@ -474,7 +477,6 @@ func (s *MongoPersistent) toBsonFilter(f *Filter) bson.M { // summary func (s *MongoPersistent) FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary) { - query := bson.M{} if filter.TaskName != "" { query["task_name"] = filter.TaskName @@ -502,12 +504,7 @@ func (s *MongoPersistent) FindAllSummary(ctx context.Context, filter *Filter) (r if len(filter.Statuses) > 0 { for i, res := range result { - mapRes := res.ToMapResult() - newCount := map[string]int{} - for _, status := range filter.Statuses { - newCount[strings.ToUpper(status)] = mapRes[strings.ToUpper(status)] - } - res.SetValue(newCount) + res.ApplyFilterStatus(filter.Statuses) result[i] = res } } @@ -597,8 +594,9 @@ func (s *MongoPersistent) Type() string { var commandResult struct { Version string `bson:"version"` } - err := s.db.RunCommand(s.ctx, bson.D{{Key: "buildInfo", Value: 1}}).Decode(&commandResult) - logger.LogIfError(err) + if err := s.db.RunCommand(s.ctx, bson.D{{Key: "buildInfo", Value: 1}}).Decode(&commandResult); err != nil { + logger.LogI(err.Error()) + } if commandResult.Version != "" { commandResult.Version = ", version: " + commandResult.Version } diff --git a/codebase/app/task_queue_worker/persistent_sql.go b/codebase/app/task_queue_worker/persistent_sql.go index 9f6f8bc3..6541c713 100644 --- a/codebase/app/task_queue_worker/persistent_sql.go +++ b/codebase/app/task_queue_worker/persistent_sql.go @@ -319,7 +319,7 @@ func (s *SQLPersistent) FindAllSummary(ctx context.Context, filter *Filter) (res } where = " WHERE id IN (" + s.parameterize(len(args)) + ")" } - query := `SELECT ` + s.formatColumnName("id", "success", "queueing", "retrying", "failure", "stopped", "is_loading") + + query := `SELECT ` + s.formatColumnName(TaskSummary{}.GetColumnName()...) + ` FROM ` + jobSummaryModelName + where + " ORDER BY id ASC" rows, err := s.db.Query(query, args...) if err != nil { @@ -328,8 +328,7 @@ func (s *SQLPersistent) FindAllSummary(ctx context.Context, filter *Filter) (res defer rows.Close() for rows.Next() { var detail TaskSummary - rows.Scan(&detail.TaskName, &detail.Success, &detail.Queueing, &detail.Retrying, - &detail.Failure, &detail.Stopped, &detail.IsLoading) + detail.Scan(rows) detail.ID = detail.TaskName result = append(result, detail) } @@ -349,10 +348,10 @@ func (s *SQLPersistent) FindAllSummary(ctx context.Context, filter *Filter) (res return } func (s *SQLPersistent) FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary) { - s.db.QueryRowContext(ctx, `SELECT `+s.formatColumnName("id", "success", "queueing", "retrying", "failure", "stopped", "is_loading")+ - ` FROM `+jobSummaryModelName+` WHERE id=`+s.parameterize(1), taskName). - Scan(&result.TaskName, &result.Success, &result.Queueing, &result.Retrying, - &result.Failure, &result.Stopped, &result.IsLoading) + row := s.db.QueryRowContext(ctx, `SELECT `+ + s.formatColumnName(result.GetColumnName()...)+ + ` FROM `+jobSummaryModelName+` WHERE id=`+s.parameterize(1), taskName) + result.Scan(row) result.ID = result.TaskName return } @@ -374,12 +373,11 @@ func (s *SQLPersistent) UpdateSummary(ctx context.Context, taskName string, upda } affected, _ := res.RowsAffected() if affected == 0 { - args := []interface{}{ - taskName, candihelper.ToInt(updated["success"]), candihelper.ToInt(updated["queueing"]), candihelper.ToInt(updated["retrying"]), - candihelper.ToInt(updated["failure"]), candihelper.ToInt(updated["stopped"]), updated["is_loading"], - } - query := `INSERT INTO ` + jobSummaryModelName + ` (` + - s.formatColumnName("id", "success", "queueing", "retrying", "failure", "stopped", "is_loading") + + var task TaskSummary + task.TaskName = taskName + args := task.ToArgs(updated) + + query := `INSERT INTO ` + jobSummaryModelName + ` (` + s.formatColumnName(task.GetColumnName()...) + `) VALUES (` + s.parameterize(len(args)) + `)` _, err := s.db.Exec(query, args...) if err != nil { @@ -394,6 +392,7 @@ func (s *SQLPersistent) IncrementSummary(ctx context.Context, taskName string, i } var setFields []string + updated := make(map[string]any, len(incr)) for field, value := range incr { if field == "" { continue @@ -404,6 +403,7 @@ func (s *SQLPersistent) IncrementSummary(ctx context.Context, taskName string, i val = "+" + candihelper.ToString(value) } setFields = append(setFields, field+"="+field+val) + updated[field] = value } query := `UPDATE ` + jobSummaryModelName + ` SET ` + strings.Join(setFields, ",") + ` WHERE id='` + taskName + `'` res, err := s.db.Exec(query) @@ -413,12 +413,11 @@ func (s *SQLPersistent) IncrementSummary(ctx context.Context, taskName string, i } affected, _ := res.RowsAffected() if affected == 0 { - args := []interface{}{ - taskName, candihelper.ToInt(incr["success"]), candihelper.ToInt(incr["queueing"]), candihelper.ToInt(incr["retrying"]), - candihelper.ToInt(incr["failure"]), candihelper.ToInt(incr["stopped"]), - } + var task TaskSummary + task.TaskName = taskName + args := task.ToArgs(updated) query := `INSERT INTO ` + jobSummaryModelName + ` (` + - s.formatColumnName("id", "success", "queueing", "retrying", "failure", "stopped") + + s.formatColumnName(task.GetColumnName()...) + `) VALUES (` + s.parameterize(len(args)) + `)` _, err := s.db.Exec(query, args...) if err != nil { diff --git a/codebase/app/task_queue_worker/persistent_sql_tools.go b/codebase/app/task_queue_worker/persistent_sql_tools.go index 409d8f3f..9e22aa45 100644 --- a/codebase/app/task_queue_worker/persistent_sql_tools.go +++ b/codebase/app/task_queue_worker/persistent_sql_tools.go @@ -99,56 +99,52 @@ func (s *SQLPersistent) initTable(db *sql.DB) { case "mysql": initTableQueries = map[string]string{ - jobModelName: fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n%s", jobModelName, - "(`id` VARCHAR(255) PRIMARY KEY NOT NULL,", - "`task_name` VARCHAR(255) NOT NULL,", - "`arguments` TEXT NOT NULL,", - "`retries` INTEGER NOT NULL,", - "`max_retry` INTEGER NOT NULL,", - "`interval` VARCHAR(255) NOT NULL,", - "`created_at` DATETIME(3) NOT NULL,", - "`updated_at` DATETIME(3) NOT NULL,", - "`finished_at` DATETIME(3) NULL,", - "`status` VARCHAR(255) NOT NULL,", - "`error` TEXT NOT NULL,", - "`trace_id` VARCHAR(255) NOT NULL,", - "`current_progress` INTEGER NOT NULL,", - "`max_progress` INTEGER NOT NULL,", + jobModelName: "CREATE TABLE IF NOT EXISTS " + jobModelName + " " + + "(`id` VARCHAR(255) PRIMARY KEY NOT NULL," + + "`task_name` VARCHAR(255) NOT NULL," + + "`arguments` TEXT NOT NULL," + + "`retries` INTEGER NOT NULL," + + "`max_retry` INTEGER NOT NULL," + + "`interval` VARCHAR(255) NOT NULL," + + "`created_at` DATETIME(3) NOT NULL," + + "`updated_at` DATETIME(3) NOT NULL," + + "`finished_at` DATETIME(3) NULL," + + "`status` VARCHAR(255) NOT NULL," + + "`error` TEXT NOT NULL," + + "`trace_id` VARCHAR(255) NOT NULL," + + "`current_progress` INTEGER NOT NULL," + + "`max_progress` INTEGER NOT NULL," + `INDEX (created_at), INDEX (arguments(255), error(255)), INDEX (task_name, status, created_at), INDEX (task_name), INDEX (status), INDEX (task_name, status)) ENGINE=InnoDB DEFAULT CHARSET=utf8 DEFAULT COLLATE utf8_unicode_ci;`, - ), - jobSummaryModelName: fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s %s %s %s %s %s %s %s\n%s", jobSummaryModelName+ - "(`id` VARCHAR(255) PRIMARY KEY NOT NULL,", - "`success` INTEGER NOT NULL,", - "`queueing` INTEGER NOT NULL,", - "`retrying` INTEGER NOT NULL,", - "`failure` INTEGER NOT NULL,", - "`stopped` INTEGER NOT NULL,", - "`is_loading` BOOLEAN DEFAULT false,", - "`loading_message` VARCHAR(255) NOT NULL DEFAULT '',", + jobSummaryModelName: "CREATE TABLE IF NOT EXISTS " + jobSummaryModelName + " " + + "(`id` VARCHAR(255) PRIMARY KEY NOT NULL," + + "`success` INTEGER NOT NULL," + + "`queueing` INTEGER NOT NULL," + + "`retrying` INTEGER NOT NULL," + + "`failure` INTEGER NOT NULL," + + "`stopped` INTEGER NOT NULL," + + "`is_loading` BOOLEAN DEFAULT false," + + "`loading_message` VARCHAR(255) NOT NULL DEFAULT ''," + `INDEX (id)) ENGINE=InnoDB DEFAULT CHARSET=utf8 DEFAULT COLLATE utf8_unicode_ci;`, - ), - jobHistoryModel: fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s %s %s %s %s %s %s %s\n%s", jobHistoryModel, - "(`job_id` VARCHAR(255) NOT NULL,", - "`error_stack` VARCHAR(255) NOT NULL,", - "`status` VARCHAR(255) NOT NULL,", - "`error` TEXT NOT NULL,", - "`trace_id` VARCHAR(255) NOT NULL,", - "`start_at` DATETIME(3) NULL,", - "`end_at` DATETIME(3) NULL,", + jobHistoryModel: "CREATE TABLE IF NOT EXISTS " + jobHistoryModel + "" + + "(`job_id` VARCHAR(255) NOT NULL," + + "`error_stack` VARCHAR(255) NOT NULL," + + "`status` VARCHAR(255) NOT NULL," + + "`error` TEXT NOT NULL," + + "`trace_id` VARCHAR(255) NOT NULL," + + "`start_at` DATETIME(3) NULL," + + "`end_at` DATETIME(3) NULL," + `INDEX (job_id), INDEX (start_at)) ENGINE=InnoDB DEFAULT CHARSET=utf8 DEFAULT COLLATE utf8_unicode_ci;`, - ), - configurationModelName: fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s %s %s %s %s", configurationModelName, - "(`key` VARCHAR(255) PRIMARY KEY NOT NULL,", - "`name` VARCHAR(255) NOT NULL,", - "`value` VARCHAR(255) NOT NULL,", + configurationModelName: "CREATE TABLE IF NOT EXISTS " + configurationModelName + " " + + "(`key` VARCHAR(255) PRIMARY KEY NOT NULL," + + "`name` VARCHAR(255) NOT NULL," + + "`value` VARCHAR(255) NOT NULL," + "`is_active` BOOLEAN DEFAULT false) ENGINE=InnoDB DEFAULT CHARSET=utf8 DEFAULT COLLATE utf8_unicode_ci;", - ), } } @@ -164,11 +160,14 @@ func (s *SQLPersistent) initTable(db *sql.DB) { extraQueries := [][]string{ generateAddColumnQuery(s.driverName, jobModelName, "result", "TEXT"), generateAddColumnQuery(s.driverName, jobHistoryModel, "result", "TEXT"), + generateAddColumnQuery(s.driverName, jobSummaryModelName, "is_hold", "BOOLEAN"), + generateAddColumnQuery(s.driverName, jobSummaryModelName, "hold", "INTEGER"), } for _, queries := range extraQueries { if queries[0] != "" { var columnName string - if err := db.QueryRow(queries[0]).Scan(&columnName); err == nil { + err := db.QueryRow(queries[0]).Scan(&columnName) + if err == nil { continue } } diff --git a/codebase/app/task_queue_worker/subscribers.go b/codebase/app/task_queue_worker/subscribers.go index eec39e37..d9cdeb87 100644 --- a/codebase/app/task_queue_worker/subscribers.go +++ b/codebase/app/task_queue_worker/subscribers.go @@ -148,7 +148,6 @@ func (s *subscriber) broadcastAllToSubscribers(ctx context.Context) { } func (s *subscriber) broadcastTaskList(ctx context.Context) { - var taskRes TaskListResolver taskRes.Data = make([]TaskResolver, 0) for _, summary := range s.opt.persistent.Summary().FindAllSummary(ctx, &Filter{}) { @@ -178,7 +177,6 @@ func (s *subscriber) broadcastJobList(ctx context.Context) { } func (s *subscriber) broadcastJobListToClient(ctx context.Context, clientID string) { - subscriber, ok := s.clientTaskJobListSubscribers[clientID] if !ok { return @@ -189,7 +187,7 @@ func (s *subscriber) broadcastJobListToClient(ctx context.Context, clientID stri if summary.IsLoading { subscriber.skipBroadcast = summary.IsLoading subscriber.writeDataToChannel(JobListResolver{ - Meta: MetaJobList{IsLoading: summary.IsLoading}, + Meta: MetaJobList{IsLoading: summary.IsLoading, Message: summary.LoadingMessage}, }) return } @@ -210,7 +208,6 @@ func (s *subscriber) broadcastJobListToClient(ctx context.Context, clientID stri } func (s *subscriber) broadcastJobDetail(ctx context.Context) { - for clientID, subscriber := range s.clientJobDetailSubscribers { detail, err := s.opt.persistent.FindJobByID(ctx, candihelper.PtrToString(subscriber.filter.JobID), subscriber.filter) if err != nil { @@ -226,15 +223,17 @@ func (s *subscriber) broadcastJobDetail(ctx context.Context) { } func (s *subscriber) broadcastWhenChangeAllJob(ctx context.Context, taskName string, isLoading bool, loadingMessage string) { - s.opt.persistent.Summary().UpdateSummary(ctx, taskName, map[string]interface{}{ "is_loading": isLoading, "loading_message": loadingMessage, }) + summaries := s.opt.persistent.Summary().FindAllSummary(ctx, &Filter{}) + mapMessage := make(map[string]string, len(summaries)) var taskRes TaskListResolver - taskRes.Data = make([]TaskResolver, 0) - for _, summary := range s.opt.persistent.Summary().FindAllSummary(ctx, &Filter{}) { - taskRes.Data = append(taskRes.Data, summary.ToTaskResolver()) + taskRes.Data = make([]TaskResolver, len(summaries)) + for i, summary := range summaries { + taskRes.Data[i] = summary.ToTaskResolver() + mapMessage[summary.TaskName] = summary.LoadingMessage } sort.Slice(taskRes.Data, func(i, j int) bool { @@ -249,7 +248,7 @@ func (s *subscriber) broadcastWhenChangeAllJob(ctx context.Context, taskName str for _, subscriber := range s.clientTaskJobListSubscribers { subscriber.skipBroadcast = isLoading subscriber.writeDataToChannel(JobListResolver{ - Meta: MetaJobList{IsLoading: isLoading}, + Meta: MetaJobList{IsLoading: isLoading, Message: mapMessage[subscriber.filter.TaskName]}, }) } } diff --git a/codebase/app/task_queue_worker/task_queue_worker.go b/codebase/app/task_queue_worker/task_queue_worker.go index 5b3abd88..a4fff9df 100644 --- a/codebase/app/task_queue_worker/task_queue_worker.go +++ b/codebase/app/task_queue_worker/task_queue_worker.go @@ -16,12 +16,12 @@ import ( ) type taskQueueWorker struct { - ctx context.Context - ctxCancelFunc func() - isShutdown bool - ready, shutdown, refreshWorkerNotif chan struct{} - semaphore []chan struct{} - mutex sync.Mutex + ctx context.Context + ctxCancelFunc func() + isShutdown bool + shutdown, refreshWorkerNotif chan struct{} + semaphore []chan struct{} + mutex sync.Mutex service factory.ServiceFactory wg sync.WaitGroup @@ -82,7 +82,6 @@ func NewTaskQueueWorker(service factory.ServiceFactory, opts ...OptionFunc) fact func (t *taskQueueWorker) prepare() { if len(t.tasks) == 0 { logger.LogYellow("Task Queue Worker: warning, no task provided") - t.ready <- struct{}{} return } @@ -90,65 +89,78 @@ func (t *taskQueueWorker) prepare() { t.opt.persistent.Summary().DeleteAllSummary(t.ctx, &Filter{ExcludeTaskNameList: t.tasks}) t.opt.persistent.CleanJob(t.ctx, &Filter{ExcludeTaskNameList: t.tasks}) - // get current pending jobs - filter := &Filter{ - Page: 1, Limit: 50, - TaskNameList: t.tasks, - Sort: "created_at", - Statuses: []string{string(StatusRetrying), string(StatusQueueing)}, - } for _, taskName := range t.tasks { t.opt.queue.Clear(t.ctx, taskName) updated := map[string]interface{}{ "is_loading": true, "loading_message": "Requeueing...", } + summary := t.opt.persistent.Summary().FindDetailSummary(t.ctx, taskName) + + isRequeueing := false for _, status := range []string{ StatusRetrying.String(), StatusFailure.String(), StatusSuccess.String(), - StatusQueueing.String(), StatusStopped.String(), + StatusQueueing.String(), StatusStopped.String(), StatusHold.String(), } { - updated[strings.ToLower(status)] = t.opt.persistent.CountAllJob(t.ctx, &Filter{ + totalJob := t.opt.persistent.CountAllJob(t.ctx, &Filter{ TaskName: taskName, Status: &status, }) + updated[strings.ToLower(status)] = totalJob + if (status == string(StatusRetrying) || status == string(StatusQueueing)) && totalJob > 0 { + isRequeueing = true + } } t.opt.persistent.Summary().UpdateSummary(t.ctx, taskName, updated) - } - t.subscriber.broadcastTaskList(t.ctx) - StreamAllJob(t.ctx, filter, func(job *Job) { - if t.opt.locker.HasBeenLocked(t.getLockKey(job.ID)) { - return - } - - // update to queueing - if job.Status != string(StatusQueueing) { - statusBefore := job.Status - job.Status = string(StatusQueueing) - matchedCount, affectedCount, err := t.opt.persistent.UpdateJob(t.ctx, &Filter{ - JobID: &job.ID, - }, map[string]interface{}{ - "status": job.Status, + if isRequeueing { + // get current pending jobs + go func(filter *Filter) { + StreamAllJob(t.ctx, filter, func(idx, total int, job *Job) { + if t.opt.locker.HasBeenLocked(t.getLockKey(job.ID)) { + return + } + // update to queueing + if job.Status != string(StatusQueueing) { + statusBefore := job.Status + job.Status = string(StatusQueueing) + matchedCount, affectedCount, err := t.opt.persistent.UpdateJob(t.ctx, &Filter{ + JobID: &job.ID, + }, map[string]interface{}{ + "status": job.Status, + }) + if err != nil { + logger.LogE(err.Error()) + return + } + t.opt.persistent.Summary().IncrementSummary(t.ctx, job.TaskName, map[string]int64{ + string(job.Status): affectedCount, + statusBefore: -matchedCount, + }) + } + t.opt.queue.PushJob(t.ctx, job) + t.opt.persistent.Summary().UpdateSummary(t.ctx, filter.TaskName, map[string]interface{}{ + "is_loading": true, "loading_message": fmt.Sprintf(`Requeueing %d of %d`, idx, total), + }) + t.subscriber.broadcastTaskList(t.ctx) + }) + t.opt.persistent.Summary().UpdateSummary(t.ctx, filter.TaskName, map[string]interface{}{ + "is_loading": false, "loading_message": summary.LoadingMessage, + }) + t.registerNextJob(false, filter.TaskName) + t.subscriber.broadcastTaskList(t.ctx) + }(&Filter{ + Page: 1, Limit: 100, + TaskName: taskName, + Sort: "created_at", + Statuses: []string{string(StatusRetrying), string(StatusQueueing)}, }) - if err != nil { - logger.LogE(err.Error()) - return - } - t.opt.persistent.Summary().IncrementSummary(t.ctx, job.TaskName, map[string]int64{ - string(job.Status): affectedCount, - statusBefore: -matchedCount, + } else { + t.opt.persistent.Summary().UpdateSummary(t.ctx, taskName, map[string]interface{}{ + "is_loading": false, "loading_message": summary.LoadingMessage, }) } - t.opt.queue.PushJob(t.ctx, job) - }) + } t.registerInternalTask() - t.ready <- struct{}{} - - for _, taskName := range t.tasks { - t.opt.persistent.Summary().UpdateSummary(t.ctx, taskName, map[string]interface{}{ - "is_loading": false, "loading_message": "", - }) - t.registerNextJob(false, taskName) - } t.subscriber.broadcastTaskList(t.ctx) } @@ -157,8 +169,6 @@ func (t *taskQueueWorker) Serve() { // serve graphql api for communication to dashboard go t.serveGraphQLAPI() - <-t.ready - // run worker for { select { @@ -219,6 +229,10 @@ func (t *taskQueueWorker) Name() string { } func (t *taskQueueWorker) registerJobToWorker(job *Job) { + if job.Status != string(StatusQueueing) { + return + } + interval, err := time.ParseDuration(job.Interval) if err != nil || interval <= 0 { logger.LogRed("invalid interval " + job.Interval) diff --git a/codebase/app/task_queue_worker/trigger_task.go b/codebase/app/task_queue_worker/trigger_task.go index 34de7656..6babfa28 100644 --- a/codebase/app/task_queue_worker/trigger_task.go +++ b/codebase/app/task_queue_worker/trigger_task.go @@ -28,6 +28,11 @@ func (t *taskQueueWorker) triggerTask(workerIndex int) { return } + taskDetail := t.opt.persistent.Summary().FindDetailSummary(t.ctx, runningTask.taskName) + if taskDetail.IsHold { + return + } + t.semaphore[workerIndex-1] <- struct{}{} if t.isShutdown { logger.LogRed("worker has been shutdown") @@ -59,7 +64,7 @@ func (t *taskQueueWorker) triggerTask(workerIndex int) { lockKey := t.getLockKey(runningTask.taskName) if t.opt.locker.IsLocked(lockKey) { logger.LogI("task_queue_worker > task " + runningTask.taskName + " is locked") - t.checkForUnlockTask(runningTask.taskName) + t.unlockTask(runningTask.taskName) return } defer t.opt.locker.Unlock(lockKey) @@ -273,7 +278,7 @@ func (t *taskQueueWorker) getLockKey(jobID string) string { return fmt.Sprintf("%s:task-queue-worker-lock:%s", t.service.Name(), jobID) } -func (t *taskQueueWorker) checkForUnlockTask(taskName string) { +func (t *taskQueueWorker) unlockTask(taskName string) { if count := t.opt.persistent.CountAllJob(t.ctx, &Filter{ TaskName: taskName, Status: candihelper.ToStringPtr(StatusRetrying.String()), }); count == 0 { @@ -297,7 +302,7 @@ func (t *taskQueueWorker) registerNextJob(withStream bool, taskName string) { TaskName: taskName, Sort: "created_at", Status: candihelper.ToStringPtr(string(StatusQueueing)), - }, func(job *Job) { + }, func(_, _ int, job *Job) { t.opt.queue.PushJob(t.ctx, job) }) t.registerNextJob(false, taskName) diff --git a/codebase/app/task_queue_worker/types.go b/codebase/app/task_queue_worker/types.go index 8e017836..a91b2b77 100644 --- a/codebase/app/task_queue_worker/types.go +++ b/codebase/app/task_queue_worker/types.go @@ -45,6 +45,8 @@ const ( StatusQueueing JobStatusEnum = "QUEUEING" // StatusStopped const StatusStopped JobStatusEnum = "STOPPED" + // StatusHold const + StatusHold JobStatusEnum = "HOLD" // HeaderRetries const HeaderRetries = "retries"