Skip to content

Commit

Permalink
task queue worker: add new status for hold/unhold job queue
Browse files Browse the repository at this point in the history
  • Loading branch information
agungdwiprasetyo committed Aug 22, 2024
1 parent 6e3b2a1 commit c67e92a
Show file tree
Hide file tree
Showing 14 changed files with 431 additions and 272 deletions.
1 change: 0 additions & 1 deletion codebase/app/task_queue_worker/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func initEngine(service factory.ServiceFactory, opts ...OptionFunc) *taskQueueWo

engine = &taskQueueWorker{
service: service,
ready: make(chan struct{}),
shutdown: make(chan struct{}, 1),
refreshWorkerNotif: make(chan struct{}, 1),
opt: &opt,
Expand Down
109 changes: 98 additions & 11 deletions codebase/app/task_queue_worker/graphql_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
dashboard "github.com/golangid/candi-plugin/task-queue-worker/dashboard"
"github.com/golangid/candi/candihelper"
"github.com/golangid/candi/candishared"
cronexpr "github.com/golangid/candi/candiutils/cronparser"
graphqlserver "github.com/golangid/candi/codebase/app/graphql_server"
"github.com/golangid/candi/config/env"
"github.com/golangid/candi/logger"
Expand Down Expand Up @@ -65,7 +66,7 @@ type rootResolver struct {
engine *taskQueueWorker
}

func (r *rootResolver) Dashboard(ctx context.Context, input struct{ GC *bool }) (res DashboardResolver) {
func (r *rootResolver) Dashboard(ctx context.Context) (res DashboardResolver) {
res.Banner = r.engine.opt.dashboardBanner
res.Tagline = "Task Queue Worker Dashboard"
res.Version = candi.Version
Expand All @@ -77,10 +78,6 @@ func (r *rootResolver) Dashboard(ctx context.Context, input struct{ GC *bool })
_, isType := r.engine.opt.persistent.(*noopPersistent)
res.Config.WithPersistent = !isType

if input.GC != nil && *input.GC {
runtime.GC()
}

// dependency health
if err := r.engine.opt.persistent.Ping(ctx); err != nil {
res.DependencyHealth.Persistent = candihelper.ToStringPtr(err.Error())
Expand Down Expand Up @@ -220,6 +217,8 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct {
Filter FilterMutateJobInputResolver
}) (string, error) {
go func(ctx context.Context, req *FilterMutateJobInputResolver) {
summary := r.engine.opt.persistent.Summary().FindDetailSummary(ctx, req.TaskName)

filter := req.ToFilter()
r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, true, "Retrying...")

Expand All @@ -228,8 +227,12 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct {
filter.Statuses = []string{string(StatusFailure), string(StatusStopped)}
}

StreamAllJob(ctx, &filter, func(job *Job) {
StreamAllJob(ctx, &filter, func(idx, total int, job *Job) {
r.engine.opt.queue.PushJob(ctx, job)
r.engine.opt.persistent.Summary().UpdateSummary(r.engine.ctx, filter.TaskName, map[string]interface{}{
"is_loading": true, "loading_message": fmt.Sprintf(`Requeueing %d of %d`, idx, total),
})
r.engine.subscriber.broadcastTaskList(r.engine.ctx)
})

incr := map[string]int64{}
Expand All @@ -251,9 +254,9 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct {
incr[strings.ToLower(string(StatusQueueing))] += countAffected
}

r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, false, "")
r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, false, summary.LoadingMessage)
r.engine.opt.persistent.Summary().IncrementSummary(ctx, filter.TaskName, incr)
r.engine.subscriber.broadcastAllToSubscribers(ctx)
r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx)
r.engine.registerNextJob(false, filter.TaskName)

}(r.engine.ctx, &input.Filter)
Expand Down Expand Up @@ -293,7 +296,6 @@ func (r *rootResolver) CleanJob(ctx context.Context, input struct {
}

func (r *rootResolver) RecalculateSummary(ctx context.Context) (string, error) {

RecalculateSummary(ctx)
r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx)
return "Success recalculate summary", nil
Expand Down Expand Up @@ -419,7 +421,7 @@ func (r *rootResolver) SetConfiguration(ctx context.Context, input struct {
func (r *rootResolver) RunQueuedJob(ctx context.Context, input struct {
TaskName string
}) (res string, err error) {
r.engine.checkForUnlockTask(input.TaskName)
r.engine.unlockTask(input.TaskName)
r.engine.registerNextJob(true, input.TaskName)
return "Success", nil
}
Expand All @@ -430,7 +432,7 @@ func (r *rootResolver) RestoreFromSecondary(ctx context.Context) (res RestoreSec
secondaryPersistent: true,
Status: candihelper.ToStringPtr(string(StatusQueueing)),
}
res.TotalData = StreamAllJob(ctx, filter, func(job *Job) {
res.TotalData = StreamAllJob(ctx, filter, func(_, _ int, job *Job) {
if err := r.engine.opt.persistent.SaveJob(ctx, job); err != nil {
logger.LogE(err.Error())
return
Expand Down Expand Up @@ -599,3 +601,88 @@ func (r *rootResolver) ListenDetailJob(ctx context.Context, input struct {

return output, nil
}

func (r *rootResolver) HoldJobTask(ctx context.Context, input struct {
TaskName string
IsAutoSwitch bool
SwitchInterval *string
FirstSwitch *string
}) (res string, err error) {
summaryTask := r.engine.opt.persistent.Summary().FindDetailSummary(ctx, input.TaskName)
if summaryTask.TaskName == "" {
return res, errors.New("Task not found")
}

defer r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx)

if !input.IsAutoSwitch {
r.engine.opt.persistent.Summary().UpdateSummary(ctx, input.TaskName, map[string]interface{}{
"is_hold": !summaryTask.IsHold, "loading_message": "Hold Mode",
})
if !summaryTask.IsHold {
return
}

r.engine.subscriber.broadcastWhenChangeAllJob(ctx, input.TaskName, true, "Unhold...")
r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx)
filter := &Filter{
TaskName: input.TaskName, Status: candihelper.WrapPtr(StatusHold.String()),
Sort: "created_at",
}
StreamAllJob(ctx, filter, func(idx, total int, job *Job) {
r.engine.opt.queue.PushJob(ctx, job)
r.engine.opt.persistent.Summary().UpdateSummary(r.engine.ctx, filter.TaskName, map[string]interface{}{
"is_loading": true, "loading_message": fmt.Sprintf(`Requeueing %d of %d`, idx, total),
})
r.engine.subscriber.broadcastTaskList(r.engine.ctx)
})

matchedCount, affectedRow, _ := r.engine.opt.persistent.UpdateJob(ctx,
filter,
map[string]interface{}{
"status": StatusQueueing,
"retries": 0,
},
)
r.engine.opt.persistent.Summary().IncrementSummary(ctx, filter.TaskName, map[string]int64{
strings.ToLower(StatusQueueing.String()): matchedCount,
strings.ToLower(StatusHold.String()): -affectedRow,
})
r.engine.registerNextJob(false, filter.TaskName)
r.engine.subscriber.broadcastWhenChangeAllJob(ctx, input.TaskName, false, "")
return
}

if input.SwitchInterval != nil {
schedule, err := cronexpr.Parse(*input.SwitchInterval)
if err != nil {
return "", err
}

date := make([]string, 6)
now := time.Now()
for i := range date {
nextInterval := schedule.NextInterval(now)
now = now.Add(nextInterval)
date[i] = now.Format(candihelper.DateFormatYYYYMMDDHHmmss)
}
}

return "Success", nil
}

func (r *rootResolver) ParseCronExpression(ctx context.Context, input struct{ Expr string }) (date []string, err error) {
schedule, err := cronexpr.Parse(input.Expr)
if err != nil {
return date, err
}

date = make([]string, 6)
now := time.Now()
for i := range date {
nextInterval := schedule.NextInterval(now)
now = now.Add(nextInterval)
date[i] = now.Format(candihelper.DateFormatYYYYMMDDHHmmss)
}
return
}
8 changes: 7 additions & 1 deletion codebase/app/task_queue_worker/graphql_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ const schema = `schema {
}
type Query {
dashboard(gc: Boolean): DashboardType!
dashboard(): DashboardType!
get_detail_job(job_id: String!, filter: GetAllJobHistoryInputResolver): JobResolver!
get_all_active_subscriber(): [ClientSubscriber!]!
get_all_job(filter: GetAllJobInputResolver): JobListResolver!
get_count_job(filter: GetAllJobInputResolver): Int!
get_all_configuration(): [ConfigurationResolver!]!
get_detail_configuration(key: String!): ConfigurationResolver!
parse_cron_expression(expr: String!): [String!]!
}
type Mutation {
Expand All @@ -30,6 +31,7 @@ type Mutation {
set_configuration(config: SetConfigurationInputResolver!): String!
run_queued_job(task_name: String!): String!
restore_from_secondary(): RestoreSecondaryResolver!
hold_job_task(task_name: String!, is_auto_switch: Boolean!, switch_interval: String, first_switch: String): String!
}
type Subscription {
Expand Down Expand Up @@ -75,9 +77,11 @@ type MetaType {
limit: Int!
total_pages: Int!
total_records: Int!
message: String!
is_close_session: Boolean!
is_loading: Boolean!
is_freeze_broadcast: Boolean!
is_hold: Boolean!
detail: TaskDetailResolver!
}
Expand All @@ -104,6 +108,7 @@ type TaskResolver {
total_jobs: Int!
is_loading: Boolean!
loading_message: String!
is_hold: Boolean!
detail: TaskDetailResolver!
}
Expand All @@ -118,6 +123,7 @@ type TaskDetailResolver {
success: Int!
queueing: Int!
stopped: Int!
hold: Int!
}
type JobListResolver {
Expand Down
61 changes: 33 additions & 28 deletions codebase/app/task_queue_worker/graphql_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type (
TotalJobs int
IsLoading bool
LoadingMessage string
IsHold bool
Detail SummaryDetail
}
// TaskListResolver resolver
Expand All @@ -72,15 +73,17 @@ type (
Limit int
TotalRecords int
TotalPages int
Message string
IsCloseSession bool
IsLoading bool
IsFreezeBroadcast bool
IsHold bool
Detail SummaryDetail
}

// SummaryDetail type
SummaryDetail struct {
Failure, Retrying, Success, Queueing, Stopped int
Failure, Retrying, Success, Queueing, Stopped, Hold int
}

// JobListResolver resolver
Expand Down Expand Up @@ -287,7 +290,9 @@ func (j *JobResolver) ParseFromJob(job *Job, maxArgsLength int) {
j.NextRetryAt = time.Now().Add(delay).In(candihelper.AsiaJakartaLocalTime).Format(time.RFC3339)
}
j.CreatedAt = job.CreatedAt.In(candihelper.AsiaJakartaLocalTime).Format(time.RFC3339)
j.FinishedAt = job.FinishedAt.In(candihelper.AsiaJakartaLocalTime).Format(time.RFC3339)
if !job.FinishedAt.IsZero() {
j.FinishedAt = job.FinishedAt.In(candihelper.AsiaJakartaLocalTime).Format(time.RFC3339)
}
if job.Retries > job.MaxRetry {
j.Retries = job.MaxRetry
}
Expand All @@ -299,35 +304,35 @@ func (j *JobResolver) ParseFromJob(job *Job, maxArgsLength int) {
}

func (j *JobListResolver) GetAllJob(ctx context.Context, filter *Filter) {

var meta MetaJobList
var taskDetailSummary []TaskSummary

var detailSummary TaskSummary
if candihelper.PtrToString(filter.Search) != "" ||
candihelper.PtrToString(filter.JobID) != "" ||
(filter.StartDate != "" && filter.EndDate != "") {
taskDetailSummary = engine.opt.persistent.AggregateAllTaskJob(ctx, filter)
taskDetailSummary := engine.opt.persistent.AggregateAllTaskJob(ctx, filter)
if len(taskDetailSummary) > 0 {
detailSummary = taskDetailSummary[0]
}
} else {
taskDetailSummary = engine.opt.persistent.Summary().FindAllSummary(ctx, filter)
}

for _, detailSummary := range taskDetailSummary {
detail := detailSummary.ToSummaryDetail()
meta.Detail.Failure += detail.Failure
meta.Detail.Retrying += detail.Retrying
meta.Detail.Success += detail.Success
meta.Detail.Queueing += detail.Queueing
meta.Detail.Stopped += detail.Stopped
meta.TotalRecords += detailSummary.CountTotalJob()
}
meta.Page, meta.Limit = filter.Page, filter.Limit
meta.TotalPages = int(math.Ceil(float64(meta.TotalRecords) / float64(meta.Limit)))

j.Meta = meta

for _, job := range engine.opt.persistent.FindAllJob(ctx, filter) {
var jobResolver JobResolver
jobResolver.ParseFromJob(&job, 100)
j.Data = append(j.Data, jobResolver)
detailSummary = engine.opt.persistent.Summary().FindDetailSummary(ctx, filter.TaskName)
detailSummary.ApplyFilterStatus(filter.Statuses)
}

detail := detailSummary.ToSummaryDetail()
j.Meta.Detail.Failure = detail.Failure
j.Meta.Detail.Retrying = detail.Retrying
j.Meta.Detail.Success = detail.Success
j.Meta.Detail.Queueing = detail.Queueing
j.Meta.Detail.Stopped = detail.Stopped
j.Meta.Detail.Hold = detail.Hold
j.Meta.TotalRecords = detailSummary.CountTotalJob()
j.Meta.IsHold = detailSummary.IsHold
j.Meta.Message = detailSummary.LoadingMessage
j.Meta.Page, j.Meta.Limit = filter.Page, filter.Limit
j.Meta.TotalPages = int(math.Ceil(float64(j.Meta.TotalRecords) / float64(j.Meta.Limit)))

jobs := engine.opt.persistent.FindAllJob(ctx, filter)
j.Data = make([]JobResolver, len(jobs))
for i, job := range jobs {
j.Data[i].ParseFromJob(&job, 100)
}
}
4 changes: 0 additions & 4 deletions codebase/app/task_queue_worker/internal_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

func (t *taskQueueWorker) registerInternalTask() {

retentionBeat := reflect.SelectCase{Dir: reflect.SelectRecv}
internalTaskRetention := &Task{
isInternalTask: true,
Expand All @@ -32,16 +31,13 @@ func (t *taskQueueWorker) registerInternalTask() {
END:
t.runningWorkerIndexTask[internalTaskRetention.workerIndex] = internalTaskRetention
t.workerChannels = append(t.workerChannels, retentionBeat)

}

func (t *taskQueueWorker) execInternalTask(task *Task) {

logger.LogIf("running internal task: %s", task.internalTaskName)

switch task.internalTaskName {
case configurationRetentionAgeKey:

cfg, _ := t.opt.persistent.GetConfiguration(configurationRetentionAgeKey)
if !cfg.IsActive {
return
Expand Down
Loading

0 comments on commit c67e92a

Please sign in to comment.