Skip to content

Commit

Permalink
task queue worker: improve get job detail history
Browse files Browse the repository at this point in the history
- add freeze mode to dashboard
- update dashboard
  • Loading branch information
agungdwiprasetyo committed Jul 29, 2022
1 parent ffb6274 commit 954d212
Show file tree
Hide file tree
Showing 18 changed files with 201 additions and 106 deletions.
5 changes: 3 additions & 2 deletions cmd/candi/cli_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ stageSelectWorkerHandlers:

srvConfig.OutputDir = flagParam.outputFlag
if scope == AddHandler {
srvConfig.parseDefaultHeader()
scopeAddHandler(flagParam, srvConfig, serviceHandlers, workerHandlers)
}
return
Expand All @@ -204,8 +205,8 @@ stageSelectDependencies:
fmt.Printf(RedFormat, "Redis Subscriber need redis, try again")
goto stageSelectDependencies
}
if workerHandlers[TaskqueueHandler] && !(dependencies[RedisDeps] && dependencies[MongodbDeps]) {
fmt.Printf(RedFormat, "Task Queue Worker need redis (for queue) and mongo (for log storage), try again")
if workerHandlers[TaskqueueHandler] && !dependencies[RedisDeps] {
fmt.Printf(RedFormat, "Task Queue Worker need redis (for queue), try again")
goto stageSelectDependencies
}

Expand Down
File renamed without changes.
38 changes: 22 additions & 16 deletions cmd/candi/template_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,19 @@ import (
"context"
"time"
"{{.LibraryName}}/candishared"
"{{$.PackagePrefix}}/internal/modules/{{cleanPathModule .ModuleName}}/domain"
shareddomain "{{$.PackagePrefix}}/pkg/shared/domain"
"{{.LibraryName}}/candishared"
"{{.LibraryName}}/tracer"
)
func (uc *{{camel .ModuleName}}UsecaseImpl) GetAll{{upper (camel .ModuleName)}}(ctx context.Context, filter *domain.Filter{{upper (camel .ModuleName)}}) (results []domain.Response{{upper (camel .ModuleName)}}, meta candishared.Meta, err error) {
trace, ctx := tracer.StartTraceWithContext(ctx, "{{upper (camel .ModuleName)}}Usecase:GetAll{{upper (camel .ModuleName)}}")
defer trace.Finish()
{{if or .SQLDeps .MongoDeps .ArangoDeps}}data, err := uc.repo{{if .SQLDeps}}SQL{{else if .MongoDeps}}Mongo{{else if .ArangoDeps}}Arango{{end}}.{{upper (camel .ModuleName)}}Repo().FetchAll(ctx, filter)
var data []shareddomain.{{upper (camel .ModuleName)}}
{{if or .SQLDeps .MongoDeps .ArangoDeps}}data, err = uc.repo{{if .SQLDeps}}SQL{{else if .MongoDeps}}Mongo{{else if .ArangoDeps}}Arango{{end}}.{{upper (camel .ModuleName)}}Repo().FetchAll(ctx, filter)
if err != nil {
return results, meta, err
}
Expand Down Expand Up @@ -221,15 +224,18 @@ import (
"time"
"{{$.PackagePrefix}}/internal/modules/{{cleanPathModule .ModuleName}}/domain"
shareddomain "{{$.PackagePrefix}}/pkg/shared/domain"
"{{.LibraryName}}/tracer"
)
func (uc *{{camel .ModuleName}}UsecaseImpl) GetDetail{{upper (camel .ModuleName)}}(ctx context.Context, id string) (result domain.Response{{upper (camel .ModuleName)}}, err error) {
trace, ctx := tracer.StartTraceWithContext(ctx, "{{upper (camel .ModuleName)}}Usecase:GetDetail{{upper (camel .ModuleName)}}")
defer trace.Finish()
var data shareddomain.{{upper (camel .ModuleName)}}
{{if or .SQLDeps .MongoDeps .ArangoDeps}}repoFilter := domain.Filter{{upper (camel .ModuleName)}}{ID: id}
data, err := uc.repo{{if .SQLDeps}}SQL{{else if .MongoDeps}}Mongo{{else if .ArangoDeps}}Arango{{end}}.{{upper (camel .ModuleName)}}Repo().Find(ctx, &repoFilter){{end}}
data, err = uc.repo{{if .SQLDeps}}SQL{{else if .MongoDeps}}Mongo{{else if .ArangoDeps}}Arango{{end}}.{{upper (camel .ModuleName)}}Repo().Find(ctx, &repoFilter){{end}}
if err != nil {
return result, err
}
Expand Down Expand Up @@ -280,8 +286,8 @@ func Test_{{camel .ModuleName}}UsecaseImpl_GetDetail{{upper (camel .ModuleName)}
import (
"context"
"{{$.PackagePrefix}}/internal/modules/{{cleanPathModule .ModuleName}}/domain"
shareddomain "{{$.PackagePrefix}}/pkg/shared/domain"
"{{$.PackagePrefix}}/internal/modules/{{cleanPathModule .ModuleName}}/domain"{{if or .SQLDeps .MongoDeps .ArangoDeps}}
shareddomain "{{$.PackagePrefix}}/pkg/shared/domain"{{end}}
"{{.LibraryName}}/tracer"
)
Expand All @@ -290,10 +296,9 @@ func (uc *{{camel .ModuleName}}UsecaseImpl) Create{{upper (camel .ModuleName)}}(
trace, ctx := tracer.StartTraceWithContext(ctx, "{{upper (camel .ModuleName)}}Usecase:Create{{upper (camel .ModuleName)}}")
defer trace.Finish()
data := shareddomain.{{upper (camel .ModuleName)}}{
return{{if or .SQLDeps .MongoDeps .ArangoDeps}} uc.repo{{if .SQLDeps}}SQL{{else if .MongoDeps}}Mongo{{else if .ArangoDeps}}Arango{{end}}.{{upper (camel .ModuleName)}}Repo().Save(ctx, &shareddomain.{{upper (camel .ModuleName)}}{
Field: req.Field,
}
return{{if or .SQLDeps .MongoDeps .ArangoDeps}} uc.repo{{if .SQLDeps}}SQL{{else if .MongoDeps}}Mongo{{else if .ArangoDeps}}Arango{{end}}.{{upper (camel .ModuleName)}}Repo().Save(ctx, &data){{end}}
}){{end}}
}
`

Expand Down Expand Up @@ -409,19 +414,20 @@ func Test_{{camel .ModuleName}}UsecaseImpl_Update{{upper (camel .ModuleName)}}(t
templateUsecaseDelete = `package usecase
import (
"context"
"context"{{if or .SQLDeps .MongoDeps .ArangoDeps}}
shareddomain "{{$.PackagePrefix}}/pkg/shared/domain"{{end}}{{if and .MongoDeps (not .SQLDeps)}}
shareddomain "{{$.PackagePrefix}}/pkg/shared/domain"
"go.mongodb.org/mongo-driver/bson/primitive"{{end}}
{{if and .MongoDeps (not .SQLDeps)}}"go.mongodb.org/mongo-driver/bson/primitive"{{end}}
"{{.LibraryName}}/tracer"
)
func (uc *{{camel .ModuleName}}UsecaseImpl) Delete{{upper (camel .ModuleName)}}(ctx context.Context, id string) (err error) {
trace, ctx := tracer.StartTraceWithContext(ctx, "{{upper (camel .ModuleName)}}Usecase:Delete{{upper (camel .ModuleName)}}")
defer trace.Finish()
{{if and .MongoDeps (not .SQLDeps)}}
objID, _ := primitive.ObjectIDFromHex(id){{end}}
{{if and .MongoDeps (not .SQLDeps)}}objID, _ := primitive.ObjectIDFromHex(id){{end}}
return {{if or .SQLDeps .MongoDeps .ArangoDeps}}uc.repo{{if .SQLDeps}}SQL{{else if .MongoDeps}}Mongo{{else if .ArangoDeps}}Arango{{end}}.{{upper (camel .ModuleName)}}Repo().Delete(ctx, &shareddomain.{{upper (camel .ModuleName)}}{
ID: {{if and .MongoDeps (not .SQLDeps)}}objID{{else}}id{{end}},
}){{end}}
Expand Down Expand Up @@ -471,21 +477,21 @@ import (
mockinterfaces "{{.LibraryName}}/mocks/codebase/interfaces"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestNew{{upper (camel .ModuleName)}}Usecase(t *testing.T) {
{{if not (or .KafkaHandler .RabbitMQHandler)}}/*{{end}}
mockPublisher := &mockinterfaces.Publisher{}
mockBroker := &mockinterfaces.Broker{}
mockBroker.On("GetPublisher").Return(mockPublisher)
{{if not (or .KafkaHandler .RabbitMQHandler)}}*/{{end}}
mockCache := &mockinterfaces.Cache{}
mockRedisPool := &mockinterfaces.RedisPool{}
mockRedisPool.On("Cache").Return(mockCache)
mockDeps := &mockdeps.Dependency{}
mockDeps.On("GetRedisPool").Return(mockRedisPool)
{{if not (or .KafkaHandler .RabbitMQHandler)}}// {{end}}mockDeps.On("GetBroker", mock.Anything).Return(mockBroker)
mockDeps.On("GetBroker", mock.Anything).Return(mockBroker)
uc, setFunc := New{{upper (camel .ModuleName)}}Usecase(mockDeps)
setFunc(nil)
Expand Down
37 changes: 28 additions & 9 deletions codebase/app/task_queue_worker/graphql_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,22 @@ func (r *rootResolver) GetAllJob(ctx context.Context, input struct{ Filter *GetA
return
}

func (r *rootResolver) GetJobDetail(ctx context.Context, input struct{ JobID string }) (res JobResolver, err error) {
func (r *rootResolver) GetDetailJob(ctx context.Context, input struct {
JobID string
Filter *GetAllJobHistoryInputResolver
}) (res JobResolver, err error) {

job, err := persistent.FindJobByID(ctx, input.JobID)
if input.Filter == nil {
input.Filter = &GetAllJobHistoryInputResolver{}
}
filter := input.Filter.ToFilter()
job, err := persistent.FindJobByID(ctx, input.JobID, &filter)
if err != nil {
return res, err
}
res.ParseFromJob(&job)
res.Meta.Page = filter.Page
res.Meta.TotalHistory = filter.Count
return
}

Expand Down Expand Up @@ -322,7 +331,7 @@ func (r *rootResolver) GetAllActiveSubscriber(ctx context.Context) (cs []*Client
mapper[k] = &ClientSubscriber{}
}
mapper[k].ClientID = k
mapper[k].SubscribeList.JobDetailID = v.jobID
mapper[k].SubscribeList.JobDetailID = candihelper.PtrToString(v.filter.JobID)
}

for _, v := range mapper {
Expand Down Expand Up @@ -398,7 +407,7 @@ func (r *rootResolver) ListenAllJob(ctx context.Context, input struct{ Filter *G
filter.Limit = 10
}

if err := registerNewJobListSubscriber(filter.TaskName, clientID, &filter, output); err != nil {
if err := registerNewJobListSubscriber(clientID, &filter, output); err != nil {
return nil, err
}

Expand Down Expand Up @@ -430,7 +439,8 @@ func (r *rootResolver) ListenAllJob(ctx context.Context, input struct{ Filter *G
}

func (r *rootResolver) ListenDetailJob(ctx context.Context, input struct {
JobID string
JobID string
Filter *GetAllJobHistoryInputResolver
}) (<-chan JobResolver, error) {

output := make(chan JobResolver)
Expand All @@ -442,12 +452,17 @@ func (r *rootResolver) ListenDetailJob(ctx context.Context, input struct {
return output, errors.New("Job ID cannot empty")
}

_, err := persistent.FindJobByID(ctx, input.JobID, "retry_histories")
_, err := persistent.FindJobByID(ctx, input.JobID, nil)
if err != nil {
return output, errors.New("Job not found")
}

if err := registerNewJobDetailSubscriber(clientID, input.JobID, output); err != nil {
if input.Filter == nil {
input.Filter = &GetAllJobHistoryInputResolver{}
}
filter := input.Filter.ToFilter()
filter.JobID = &input.JobID
if err := registerNewJobDetailSubscriber(clientID, &filter, output); err != nil {
return nil, err
}

Expand All @@ -463,12 +478,16 @@ func (r *rootResolver) ListenDetailJob(ctx context.Context, input struct {
return

case <-closeAllSubscribers:
output <- JobResolver{IsCloseSession: true}
var js JobResolver
js.Meta.IsCloseSession = true
output <- js
removeJobDetailSubscriber(clientID)
return

case <-autoRemoveClient.C:
output <- JobResolver{IsCloseSession: true}
var js JobResolver
js.Meta.IsCloseSession = true
output <- js
removeJobDetailSubscriber(clientID)
return

Expand Down
20 changes: 17 additions & 3 deletions codebase/app/task_queue_worker/graphql_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const schema = `schema {
type Query {
dashboard(gc: Boolean): DashboardType!
get_job_detail(job_id: String!): JobResolver!
get_detail_job(job_id: String!, filter: GetAllJobHistoryInputResolver): JobResolver!
get_all_active_subscriber(): [ClientSubscriber!]!
get_all_job(filter: GetAllJobInputResolver): JobListResolver!
}
Expand All @@ -32,7 +32,7 @@ type Subscription {
search: String
): TaskListResolver!
listen_all_job(filter: GetAllJobInputResolver): JobListResolver!
listen_detail_job(job_id: String!): JobResolver!
listen_detail_job(job_id: String!, filter: GetAllJobHistoryInputResolver): JobResolver!
}
type DashboardType {
Expand Down Expand Up @@ -65,6 +65,7 @@ type MetaType {
total_records: Int!
is_close_session: Boolean!
is_loading: Boolean!
is_freeze_broadcast: Boolean!
detail: TaskDetailResolver!
}
Expand Down Expand Up @@ -111,7 +112,6 @@ type JobListResolver {
}
type JobResolver {
is_close_session: Boolean!
id: String!
task_name: String!
arguments: String!
Expand All @@ -125,6 +125,13 @@ type JobResolver {
created_at: String!
finished_at: String!
next_retry_at: String!
meta: JoDetailMetaResolver!
}
type JoDetailMetaResolver {
is_close_session: Boolean!
page: Int!
total_history: Int!
}
type JobRetryHistory {
Expand Down Expand Up @@ -174,6 +181,13 @@ input GetAllJobInputResolver {
end_date: String,
job_id: String
}
input GetAllJobHistoryInputResolver {
page: Int,
limit: Int,
start_date: String,
end_date: String
}
`

// AddJobInputResolver model
Expand Down
56 changes: 44 additions & 12 deletions codebase/app/task_queue_worker/graphql_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ type (

// MetaJobList resolver
MetaJobList struct {
Page int
Limit int
TotalRecords int
TotalPages int
IsCloseSession bool
IsLoading bool
Detail SummaryDetail
Page int
Limit int
TotalRecords int
TotalPages int
IsCloseSession bool
IsLoading bool
IsFreezeBroadcast bool
Detail SummaryDetail
}

// SummaryDetail type
Expand Down Expand Up @@ -95,7 +96,11 @@ type (
TraceID string
RetryHistories []RetryHistory
NextRetryAt string
IsCloseSession bool
Meta struct {
IsCloseSession bool
Page int
TotalHistory int
}
}

// ConfigResolver resolver
Expand All @@ -106,14 +111,23 @@ type (
// GetAllJobInputResolver resolver
GetAllJobInputResolver struct {
TaskName *string
Page *int32
Limit *int32
Page *int
Limit *int
Search *string
JobID *string
Statuses *[]string
StartDate *string
EndDate *string
}

// GetAllJobHistoryInputResolver resolver
GetAllJobHistoryInputResolver struct {
Page *int
Limit *int
StartDate *string
EndDate *string
JobID string
}
)

// ToFilter method
Expand All @@ -126,10 +140,10 @@ func (i *GetAllJobInputResolver) ToFilter() (filter Filter) {
}

if i.Page != nil && *i.Page > 0 {
filter.Page = int(*i.Page)
filter.Page = *i.Page
}
if i.Limit != nil && *i.Limit > 0 {
filter.Limit = int(*i.Limit)
filter.Limit = *i.Limit
}
if i.Statuses != nil {
filter.Statuses = *i.Statuses
Expand All @@ -141,6 +155,24 @@ func (i *GetAllJobInputResolver) ToFilter() (filter Filter) {
return
}

// ToFilter method
func (i *GetAllJobHistoryInputResolver) ToFilter() (filter Filter) {

filter = Filter{
Page: 1, Limit: 10,
}

if i.Page != nil && *i.Page > 0 {
filter.Page = *i.Page
}
if i.Limit != nil && *i.Limit > 0 {
filter.Limit = *i.Limit
}
filter.StartDate, _ = time.Parse(time.RFC3339, candihelper.PtrToString(i.StartDate))
filter.EndDate, _ = time.Parse(time.RFC3339, candihelper.PtrToString(i.EndDate))
return
}

func (j *JobResolver) ParseFromJob(job *Job) {
j.ID = job.ID
j.TaskName = job.TaskName
Expand Down
3 changes: 1 addition & 2 deletions codebase/app/task_queue_worker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func (job *Job) toMap() map[string]interface{} {
"retries": job.Retries,
"max_retry": job.MaxRetry,
"interval": job.Interval,
"created_at": job.CreatedAt.Format(time.RFC3339),
"finished_at": job.FinishedAt.Format(time.RFC3339),
"finished_at": job.FinishedAt,
"status": job.Status,
"error": job.Error,
"trace_id": job.TraceID,
Expand Down
Loading

0 comments on commit 954d212

Please sign in to comment.