Skip to content

Commit

Permalink
task queue worker improvement
Browse files Browse the repository at this point in the history
- update dashboard, select filter before retry and clean all job
- add progress for job detail
  • Loading branch information
agungdwiprasetyo committed Jan 17, 2023
1 parent 5bd9737 commit 8e717e7
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 84 deletions.
65 changes: 42 additions & 23 deletions codebase/app/task_queue_worker/graphql_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ func (r *rootResolver) GetAllJob(ctx context.Context, input struct{ Filter *GetA
return
}

func (r *rootResolver) GetCountJob(ctx context.Context, input struct{ Filter *GetAllJobInputResolver }) (result int, err error) {

if input.Filter == nil {
input.Filter = &GetAllJobInputResolver{}
}

filter := input.Filter.ToFilter()
result = r.engine.opt.persistent.CountAllJob(ctx, &filter)
return
}

func (r *rootResolver) GetDetailJob(ctx context.Context, input struct {
JobID string
Filter *GetAllJobHistoryInputResolver
Expand Down Expand Up @@ -201,26 +212,29 @@ func (r *rootResolver) StopAllJob(ctx context.Context, input struct {
}

func (r *rootResolver) RetryAllJob(ctx context.Context, input struct {
TaskName string
Filter FilterMutateJobInputResolver
}) (string, error) {

go func(ctx context.Context) {
go func(ctx context.Context, req *FilterMutateJobInputResolver) {

r.engine.subscriber.broadcastWhenChangeAllJob(ctx, input.TaskName, true, "Retrying...")
filter := req.ToFilter()
r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, true, "Retrying...")

filter := &Filter{
Page: 1, Limit: 10, Sort: "created_at",
Statuses: []string{string(statusFailure), string(statusStopped)}, TaskName: input.TaskName,
filter.Sort = "created_at"
if len(filter.Statuses) == 0 {
filter.Statuses = []string{string(statusFailure), string(statusStopped)}
}
StreamAllJob(ctx, filter, func(job *Job) {

StreamAllJob(ctx, &filter, func(job *Job) {
r.engine.opt.queue.PushJob(ctx, job)
})

incr := map[string]int64{}
for _, status := range filter.Statuses {
countMatchedFilter, countAffected, err := r.engine.opt.persistent.UpdateJob(ctx,
&Filter{
TaskName: input.TaskName, Status: &status,
TaskName: filter.TaskName, Status: &status,
Search: filter.Search, StartDate: filter.StartDate, EndDate: filter.EndDate,
},
map[string]interface{}{
"status": statusQueueing,
Expand All @@ -234,42 +248,47 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct {
incr[strings.ToLower(string(statusQueueing))] += countAffected
}

r.engine.subscriber.broadcastWhenChangeAllJob(ctx, input.TaskName, false, "")
r.engine.opt.persistent.Summary().IncrementSummary(ctx, input.TaskName, incr)
r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, false, "")
r.engine.opt.persistent.Summary().IncrementSummary(ctx, filter.TaskName, incr)
r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx)
r.engine.registerNextJob(false, input.TaskName)
r.engine.registerNextJob(false, filter.TaskName)

}(r.engine.ctx)
}(r.engine.ctx, &input.Filter)

return "Success retry all failure job in task " + input.TaskName, nil
return "Success retry all job in task " + input.Filter.TaskName, nil
}

func (r *rootResolver) CleanJob(ctx context.Context, input struct {
TaskName string
Filter FilterMutateJobInputResolver
}) (string, error) {

go func(ctx context.Context) {
go func(ctx context.Context, req *FilterMutateJobInputResolver) {

r.engine.subscriber.broadcastWhenChangeAllJob(ctx, input.TaskName, true, "Cleaning...")
filter := req.ToFilter()
r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, true, "Cleaning...")

filter.Sort = "created_at"
if len(filter.Statuses) == 0 {
filter.Statuses = []string{string(statusFailure), string(statusStopped)}
}
incrQuery := map[string]int64{}
affectedStatus := []string{string(statusSuccess), string(statusFailure), string(statusStopped)}
for _, status := range affectedStatus {
for _, status := range req.Statuses {
countAffected := r.engine.opt.persistent.CleanJob(ctx,
&Filter{
TaskName: input.TaskName, Status: &status,
TaskName: filter.TaskName, Status: &status,
Search: filter.Search, StartDate: filter.StartDate, EndDate: filter.EndDate,
},
)
incrQuery[strings.ToLower(status)] -= countAffected
}

r.engine.subscriber.broadcastWhenChangeAllJob(ctx, input.TaskName, false, "")
r.engine.opt.persistent.Summary().IncrementSummary(ctx, input.TaskName, incrQuery)
r.engine.subscriber.broadcastWhenChangeAllJob(ctx, req.TaskName, false, "")
r.engine.opt.persistent.Summary().IncrementSummary(ctx, req.TaskName, incrQuery)
r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx)

}(r.engine.ctx)
}(r.engine.ctx, &input.Filter)

return "Success clean all job in task " + input.TaskName, nil
return "Success clean all job in task " + input.Filter.TaskName, nil
}

func (r *rootResolver) RecalculateSummary(ctx context.Context) (string, error) {
Expand Down
16 changes: 14 additions & 2 deletions codebase/app/task_queue_worker/graphql_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Query {
get_detail_job(job_id: String!, filter: GetAllJobHistoryInputResolver): JobResolver!
get_all_active_subscriber(): [ClientSubscriber!]!
get_all_job(filter: GetAllJobInputResolver): JobListResolver!
get_count_job(filter: GetAllJobInputResolver): Int!
get_all_configuration(): [ConfigurationResolver!]!
get_detail_configuration(key: String!): ConfigurationResolver!
}
Expand All @@ -20,8 +21,8 @@ type Mutation {
stop_job(job_id: String!): String!
stop_all_job(task_name: String!): String!
retry_job(job_id: String!): String!
clean_job(task_name: String!): String!
retry_all_job(task_name: String!): String!
clean_job(filter: FilterMutateJobInputResolver!): String!
retry_all_job(filter: FilterMutateJobInputResolver!): String!
clear_all_client_subscriber(): String!
kill_client_subscriber(client_id: String!): String!
delete_job(job_id: String!): String!
Expand Down Expand Up @@ -138,6 +139,8 @@ type JobResolver {
created_at: String!
finished_at: String!
next_retry_at: String!
current_progress: Int!
max_progress: Int!
meta: JoDetailMetaResolver!
}
Expand Down Expand Up @@ -217,6 +220,15 @@ type RestoreSecondaryResolver {
total_data: Int!
message: String!
}
input FilterMutateJobInputResolver {
task_name: String!
search: String
job_id: String
statuses: [String!]!
start_date: String
end_date: String
}
`

// AddJobInputResolver model
Expand Down
67 changes: 52 additions & 15 deletions codebase/app/task_queue_worker/graphql_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,23 @@ type (

// JobResolver resolver
JobResolver struct {
ID string
TaskName string
Arguments string
Retries int
MaxRetry int
Interval string
CreatedAt string
FinishedAt string
Status string
Error string
ErrorStack string
TraceID string
RetryHistories []RetryHistory
NextRetryAt string
Meta struct {
ID string
TaskName string
Arguments string
Retries int
MaxRetry int
Interval string
CreatedAt string
FinishedAt string
Status string
Error string
ErrorStack string
TraceID string
RetryHistories []RetryHistory
NextRetryAt string
CurrentProgress int
MaxProgress int
Meta struct {
IsCloseSession bool
Page int
TotalHistory int
Expand Down Expand Up @@ -154,6 +156,16 @@ type (
Value string
IsActive bool
}

// FilterMutateJobInputResolver resolver
FilterMutateJobInputResolver struct {
TaskName string
Search *string
JobID *string
Statuses []string
StartDate *string
EndDate *string
}
)

// ToFilter method
Expand Down Expand Up @@ -207,6 +219,29 @@ func (i *GetAllJobHistoryInputResolver) ToFilter() (filter Filter) {
return
}

// ToFilter method
func (i *FilterMutateJobInputResolver) ToFilter() (filter Filter) {

filter = Filter{
Page: 1, Limit: 10,
Search: i.Search, TaskName: i.TaskName,
JobID: i.JobID,
}

filter.Page = 1
filter.Limit = 10
filter.Statuses = i.Statuses

if i.StartDate != nil {
filter.StartDate = *i.StartDate
}
if i.EndDate != nil {
filter.EndDate = *i.EndDate
}

return
}

func (m *MetaTaskResolver) CalculatePage() {
m.TotalPages = int(math.Ceil(float64(m.TotalRecords) / float64(m.Limit)))
}
Expand Down Expand Up @@ -240,6 +275,8 @@ func (j *JobResolver) ParseFromJob(job *Job, maxArgsLength int) {
j.TraceID = job.TraceID
j.RetryHistories = job.RetryHistories
j.NextRetryAt = job.NextRetryAt
j.CurrentProgress = job.CurrentProgress
j.MaxProgress = job.MaxProgress
j.RetryHistories = job.RetryHistories
if job.Status == string(statusSuccess) {
j.Error = ""
Expand Down
28 changes: 28 additions & 0 deletions codebase/app/task_queue_worker/job_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,31 @@ func RecalculateSummary(ctx context.Context) {
})
}
}

// UpdateProgressJob api for update progress job
func UpdateProgressJob(ctx context.Context, jobID string, numProcessed, maxProcess int) error {
if engine == nil {
return errWorkerInactive
}

if numProcessed > maxProcess {
return errors.New("Num processed cannot greater than max process")
}

job, err := engine.opt.persistent.FindJobByID(ctx, jobID, nil)
if err != nil {
return err
}

_, _, err = engine.opt.persistent.UpdateJob(ctx, &Filter{
JobID: &job.ID,
}, map[string]interface{}{
"current_progress": numProcessed, "max_progress": maxProcess,
})
if err != nil {
return err
}

engine.subscriber.broadcastJobDetail(ctx)
return nil
}
41 changes: 24 additions & 17 deletions codebase/app/task_queue_worker/persistent_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,24 @@ type (

// Job model
Job struct {
ID string `bson:"_id" json:"_id"`
TaskName string `bson:"task_name" json:"task_name"`
Arguments string `bson:"arguments" json:"arguments"`
Retries int `bson:"retries" json:"retries"`
MaxRetry int `bson:"max_retry" json:"max_retry"`
Interval string `bson:"interval" json:"interval"`
CreatedAt time.Time `bson:"created_at" json:"created_at"`
UpdatedAt time.Time `bson:"updated_at" json:"updated_at"`
FinishedAt time.Time `bson:"finished_at" json:"finished_at"`
Status string `bson:"status" json:"status"`
Error string `bson:"error" json:"error"`
ErrorStack string `bson:"-" json:"error_stack"`
TraceID string `bson:"trace_id" json:"trace_id"`
RetryHistories []RetryHistory `bson:"retry_histories" json:"retry_histories"`
NextRetryAt string `bson:"-" json:"-"`
direct bool `bson:"-" json:"-"`
ID string `bson:"_id" json:"_id"`
TaskName string `bson:"task_name" json:"task_name"`
Arguments string `bson:"arguments" json:"arguments"`
Retries int `bson:"retries" json:"retries"`
MaxRetry int `bson:"max_retry" json:"max_retry"`
Interval string `bson:"interval" json:"interval"`
CreatedAt time.Time `bson:"created_at" json:"created_at"`
UpdatedAt time.Time `bson:"updated_at" json:"updated_at"`
FinishedAt time.Time `bson:"finished_at" json:"finished_at"`
Status string `bson:"status" json:"status"`
Error string `bson:"error" json:"error"`
ErrorStack string `bson:"-" json:"error_stack"`
TraceID string `bson:"trace_id" json:"trace_id"`
CurrentProgress int `bson:"current_progress" json:"current_progress"`
MaxProgress int `bson:"max_progress" json:"max_progress"`
RetryHistories []RetryHistory `bson:"retry_histories" json:"retry_histories"`
NextRetryAt string `bson:"-" json:"-"`
direct bool `bson:"-" json:"-"`
}

// RetryHistory model
Expand Down Expand Up @@ -119,9 +121,14 @@ func (s *TaskSummary) ToSummaryDetail() (detail SummaryDetail) {

// ToTaskResolver method
func (s *TaskSummary) ToTaskResolver() (res TaskResolver) {
regTask, ok := engine.registeredTaskWorkerIndex[s.TaskName]
if !ok {
return
}

res = TaskResolver{
Name: s.TaskName,
ModuleName: engine.runningWorkerIndexTask[engine.registeredTaskWorkerIndex[s.TaskName]].moduleName,
ModuleName: engine.runningWorkerIndexTask[regTask].moduleName,
TotalJobs: s.CountTotalJob(),
}
res.Detail = s.ToSummaryDetail()
Expand Down
Loading

0 comments on commit 8e717e7

Please sign in to comment.