diff --git a/candihelper/helper.go b/candihelper/helper.go index 020f9177..6c54d716 100644 --- a/candihelper/helper.go +++ b/candihelper/helper.go @@ -544,3 +544,92 @@ func GetRuntimeStackLine() string { return fmt.Sprintf("%s:%d", file, line) } + +// ToString helper +func ToString(val interface{}) (str string) { + switch s := val.(type) { + case string: + return s + case bool: + return strconv.FormatBool(s) + case float64: + return strconv.FormatFloat(s, 'f', -1, 64) + case float32: + return strconv.FormatFloat(float64(s), 'f', -1, 32) + case int: + return strconv.Itoa(s) + case int64: + return strconv.FormatInt(s, 10) + case int32: + return strconv.Itoa(int(s)) + case int16: + return strconv.FormatInt(int64(s), 10) + case int8: + return strconv.FormatInt(int64(s), 10) + case uint: + return strconv.FormatInt(int64(s), 10) + case uint64: + return strconv.FormatInt(int64(s), 10) + case uint32: + return strconv.FormatInt(int64(s), 10) + case uint16: + return strconv.FormatInt(int64(s), 10) + case uint8: + return strconv.FormatInt(int64(s), 10) + case []byte: + return string(s) + case nil: + return "" + case fmt.Stringer: + return s.String() + case error: + return s.Error() + default: + return "" + } +} + +// ToInt helper +func ToInt(val interface{}) (i int) { + switch s := val.(type) { + case int: + return s + case int64: + return int(s) + case int32: + return int(s) + case int16: + return int(s) + case int8: + return int(s) + case uint: + return int(s) + case uint64: + return int(s) + case uint32: + return int(s) + case uint16: + return int(s) + case uint8: + return int(s) + case float64: + return int(s) + case float32: + return int(s) + case string: + v, err := strconv.ParseInt(s, 0, 0) + if err == nil { + return int(v) + } + return 0 + case bool: + if s { + return 1 + } + return 0 + case nil: + return 0 + default: + return 0 + } +} diff --git a/cmd/candi/template_configs.go b/cmd/candi/template_configs.go index 510ab85b..85fee52a 100644 --- a/cmd/candi/template_configs.go +++ b/cmd/candi/template_configs.go @@ -106,9 +106,13 @@ func SetEnv(env Environment) { package configs import ( + taskqueueworker "{{.LibraryName}}/codebase/app/task_queue_worker" "{{.LibraryName}}/codebase/factory" "{{.LibraryName}}/codebase/factory/appfactory" - "{{.LibraryName}}/config/env" + "{{.LibraryName}}/config/env"{{if not .MongoDeps}} + + "database/sql" + _ "github.com/mattn/go-sqlite3"{{end}} ) /* @@ -138,7 +142,18 @@ func InitAppFromEnvironmentConfig(service factory.ServiceFactory) (apps []factor apps = append(apps, appfactory.SetupCronWorker(service)) } if env.BaseEnv().UseTaskQueueWorker { - apps = append(apps, appfactory.SetupTaskQueueWorker(service)) + {{if .MongoDeps}}persistent := taskqueueworker.NewMongoPersistent(service. + GetDependency(). + GetMongoDatabase(). + WriteDB(), + ){{else}}db, err := sql.Open("sqlite3", "./candi_task_queue_worker.db") + if err != nil { + panic(err) + } + persistent := taskqueueworker.NewSQLPersistent(db){{end}} + apps = append(apps, appfactory.SetupTaskQueueWorker(service, + taskqueueworker.SetPersistent(persistent), + )) } if env.BaseEnv().UseRedisSubscriber { apps = append(apps, appfactory.SetupRedisWorker(service)) diff --git a/cmd/candi/template_etc.go b/cmd/candi/template_etc.go index 8aefece9..51d4440e 100644 --- a/cmd/candi/template_etc.go +++ b/cmd/candi/template_etc.go @@ -2,13 +2,13 @@ package main const ( dockerfileTemplate = `# Stage 1 -FROM golang:1.16.4-alpine3.13 AS dependency_builder +FROM golang:1.18.4-alpine3.16 AS dependency_builder WORKDIR /go/src ENV GO111MODULE=on RUN apk update -RUN apk add --no-cache bash ca-certificates git make +RUN apk add --no-cache bash ca-certificates git gcc musl-dev COPY go.mod . COPY go.sum . @@ -21,7 +21,7 @@ FROM dependency_builder AS service_builder WORKDIR /usr/app COPY . . -RUN CGO_ENABLED=0 GOOS=linux go build -o bin +RUN go build -o bin # Stage 3 FROM alpine:latest @@ -85,7 +85,10 @@ test: go {{.GoVersion}} -require {{.LibraryName}} {{.Version}} +require ( + {{.LibraryName}} {{.Version}} + golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect +) ` gitignoreTemplate = `bin @@ -345,13 +348,13 @@ $ make migration service={{service_name}} down "```\n" dockerfileMonorepoTemplate = `# Stage 1 -FROM golang:1.16.4-alpine3.13 AS dependency_builder +FROM golang:1.18.4-alpine3.16 AS dependency_builder WORKDIR /go/src ENV GO111MODULE=on RUN apk update -RUN apk add --no-cache bash ca-certificates git +RUN apk add --no-cache bash ca-certificates git make gcc musl-dev COPY go.mod . COPY go.sum . @@ -368,7 +371,7 @@ COPY sdk sdk COPY services/$SERVICE_NAME services/$SERVICE_NAME COPY go.mod . COPY go.sum . -RUN CGO_ENABLED=0 GOOS=linux go build -o bin services/$SERVICE_NAME/*.go +RUN go build -o bin services/$SERVICE_NAME/*.go # Stage 3 FROM alpine:latest diff --git a/codebase/app/task_queue_worker/graphql_resolver.go b/codebase/app/task_queue_worker/graphql_resolver.go index 07330741..beebf1e1 100644 --- a/codebase/app/task_queue_worker/graphql_resolver.go +++ b/codebase/app/task_queue_worker/graphql_resolver.go @@ -110,7 +110,7 @@ func (r *rootResolver) DeleteJob(ctx context.Context, input struct{ JobID string if err != nil { return "", err } - persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{ + persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{ job.Status: -1, }) broadcastAllToSubscribers(r.worker.ctx) @@ -164,7 +164,7 @@ func (r *rootResolver) StopAllJob(ctx context.Context, input struct { stopAllJobInTask(input.TaskName) queue.Clear(ctx, input.TaskName) - incrQuery := map[string]int{} + incrQuery := map[string]int64{} affectedStatus := []string{string(statusQueueing), string(statusRetrying)} for _, status := range affectedStatus { countMatchedFilter, countAffected, err := persistent.UpdateJob(ctx, @@ -178,12 +178,12 @@ func (r *rootResolver) StopAllJob(ctx context.Context, input struct { if err != nil { continue } - incrQuery[strings.ToLower(status)] -= int(countMatchedFilter) - incrQuery[strings.ToLower(string(statusStopped))] += int(countAffected) + incrQuery[strings.ToLower(status)] -= countMatchedFilter + incrQuery[strings.ToLower(string(statusStopped))] += countAffected } broadcastWhenChangeAllJob(ctx, input.TaskName, false) - persistent.Summary().IncrementSummary(ctx, input.TaskName, convertIncrementMap(incrQuery)) + persistent.Summary().IncrementSummary(ctx, input.TaskName, incrQuery) broadcastAllToSubscribers(r.worker.ctx) }(r.worker.ctx) @@ -214,7 +214,7 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct { } }) - incr := map[string]int{} + incr := map[string]int64{} for _, status := range filter.Statuses { countMatchedFilter, countAffected, err := persistent.UpdateJob(ctx, &Filter{ @@ -228,12 +228,12 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct { if err != nil { continue } - incr[strings.ToLower(status)] -= int(countMatchedFilter) - incr[strings.ToLower(string(statusQueueing))] += int(countAffected) + incr[strings.ToLower(status)] -= countMatchedFilter + incr[strings.ToLower(string(statusQueueing))] += countAffected } broadcastWhenChangeAllJob(ctx, input.TaskName, false) - persistent.Summary().IncrementSummary(ctx, input.TaskName, convertIncrementMap(incr)) + persistent.Summary().IncrementSummary(ctx, input.TaskName, incr) broadcastAllToSubscribers(r.worker.ctx) refreshWorkerNotif <- struct{}{} @@ -250,7 +250,7 @@ func (r *rootResolver) CleanJob(ctx context.Context, input struct { broadcastWhenChangeAllJob(ctx, input.TaskName, true) - incrQuery := map[string]int{} + incrQuery := map[string]int64{} affectedStatus := []string{string(statusSuccess), string(statusFailure), string(statusStopped)} for _, status := range affectedStatus { countAffected := persistent.CleanJob(ctx, @@ -258,11 +258,11 @@ func (r *rootResolver) CleanJob(ctx context.Context, input struct { TaskName: input.TaskName, Status: &status, }, ) - incrQuery[strings.ToLower(status)] -= int(countAffected) + incrQuery[strings.ToLower(status)] -= countAffected } broadcastWhenChangeAllJob(ctx, input.TaskName, false) - persistent.Summary().IncrementSummary(ctx, input.TaskName, convertIncrementMap(incrQuery)) + persistent.Summary().IncrementSummary(ctx, input.TaskName, incrQuery) broadcastAllToSubscribers(r.worker.ctx) }(r.worker.ctx) diff --git a/codebase/app/task_queue_worker/job.go b/codebase/app/task_queue_worker/job.go index 4686ccc2..fb80793b 100644 --- a/codebase/app/task_queue_worker/job.go +++ b/codebase/app/task_queue_worker/job.go @@ -43,11 +43,10 @@ func (job *Job) toMap() map[string]interface{} { "retries": job.Retries, "max_retry": job.MaxRetry, "interval": job.Interval, - "created_at": job.CreatedAt, - "finished_at": job.FinishedAt, + "created_at": job.CreatedAt.Format(time.RFC3339), + "finished_at": job.FinishedAt.Format(time.RFC3339), "status": job.Status, "error": job.Error, - "error_stack": job.ErrorStack, "trace_id": job.TraceID, } } diff --git a/codebase/app/task_queue_worker/job_operation.go b/codebase/app/task_queue_worker/job_operation.go index 7a48fcd0..abb32a9f 100644 --- a/codebase/app/task_queue_worker/job_operation.go +++ b/codebase/app/task_queue_worker/job_operation.go @@ -13,7 +13,6 @@ import ( "github.com/golangid/candi/candihelper" "github.com/golangid/candi/candiutils" "github.com/golangid/candi/tracer" - "github.com/google/uuid" ) type ( @@ -70,7 +69,6 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) { } var newJob Job - newJob.ID = uuid.New().String() newJob.TaskName = req.TaskName newJob.Arguments = string(req.Args) newJob.MaxRetry = req.MaxRetry @@ -89,7 +87,7 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) { defer func() { <-semaphoreAddJob }() persistent.SaveJob(ctx, job) - persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{ + persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{ strings.ToLower(job.Status): 1, }) broadcastAllToSubscribers(ctx) @@ -184,7 +182,7 @@ func RetryJob(ctx context.Context, jobID string) error { matched, affected, _ := persistent.UpdateJob(ctx, &Filter{JobID: &job.ID}, map[string]interface{}{ "status": job.Status, "interval": job.Interval, "retries": job.Retries, }) - persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{ + persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{ statusBefore: -matched, job.Status: affected, }) @@ -223,7 +221,7 @@ func StopJob(ctx context.Context, jobID string) error { if err != nil { return err } - persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{ + persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{ job.Status: countAffected, statusBefore: -matchedCount, }) diff --git a/codebase/app/task_queue_worker/persistent.go b/codebase/app/task_queue_worker/persistent.go index 12904fea..9796a758 100644 --- a/codebase/app/task_queue_worker/persistent.go +++ b/codebase/app/task_queue_worker/persistent.go @@ -4,6 +4,11 @@ import ( "context" ) +const ( + jobModelName = "task_queue_worker_jobs" + jobSummaryModelName = "task_queue_worker_job_summaries" +) + type ( // Persistent abstraction Persistent interface { diff --git a/codebase/app/task_queue_worker/persistent_mongo.go b/codebase/app/task_queue_worker/persistent_mongo.go index 7a6ee92c..c15f2553 100644 --- a/codebase/app/task_queue_worker/persistent_mongo.go +++ b/codebase/app/task_queue_worker/persistent_mongo.go @@ -8,19 +8,14 @@ import ( "github.com/golangid/candi/candihelper" "github.com/golangid/candi/logger" "github.com/golangid/candi/tracer" + "github.com/google/uuid" "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" ) -const ( - mongoJobCollections = "task_queue_worker_jobs" - mongoJobSummaryCollections = "task_queue_worker_job_summaries" -) - -type mongoPersistent struct { +type MongoPersistent struct { db *mongo.Database ctx context.Context @@ -28,7 +23,7 @@ type mongoPersistent struct { } // NewMongoPersistent create mongodb persistent -func NewMongoPersistent(db *mongo.Database) Persistent { +func NewMongoPersistent(db *mongo.Database) *MongoPersistent { ctx := context.Background() uniqueOpts := &options.IndexOptions{ @@ -36,7 +31,7 @@ func NewMongoPersistent(db *mongo.Database) Persistent { } // check and create index in collection task_queue_worker_job_summaries - indexViewJobSummaryColl := db.Collection(mongoJobSummaryCollections).Indexes() + indexViewJobSummaryColl := db.Collection(jobSummaryModelName).Indexes() currentIndexSummaryNames := make(map[string]struct{}) curJobSummary, err := indexViewJobSummaryColl.List(ctx) if err == nil { @@ -66,7 +61,7 @@ func NewMongoPersistent(db *mongo.Database) Persistent { } // check and create index in collection task_queue_worker_jobs - indexViewJobColl := db.Collection(mongoJobCollections).Indexes() + indexViewJobColl := db.Collection(jobModelName).Indexes() currentIndexNames := make(map[string]struct{}) curJobColl, err := indexViewJobColl.List(ctx) if err == nil { @@ -137,7 +132,7 @@ func NewMongoPersistent(db *mongo.Database) Persistent { } } - mp := &mongoPersistent{ + mp := &MongoPersistent{ db: db, ctx: ctx, } @@ -146,14 +141,14 @@ func NewMongoPersistent(db *mongo.Database) Persistent { return mp } -func (s *mongoPersistent) SetSummary(summary Summary) { +func (s *MongoPersistent) SetSummary(summary Summary) { s.summary = summary } -func (s *mongoPersistent) Summary() Summary { +func (s *MongoPersistent) Summary() Summary { return s.summary } -func (s *mongoPersistent) FindAllJob(ctx context.Context, filter *Filter) (jobs []Job) { +func (s *MongoPersistent) FindAllJob(ctx context.Context, filter *Filter) (jobs []Job) { findOptions := &options.FindOptions{} if !filter.ShowAll { @@ -174,7 +169,7 @@ func (s *mongoPersistent) FindAllJob(ctx context.Context, filter *Filter) (jobs findOptions.SetProjection(bson.M{"retry_histories": 0}) query := s.toBsonFilter(filter) - cur, err := s.db.Collection(mongoJobCollections).Find(ctx, query, findOptions) + cur, err := s.db.Collection(jobModelName).Find(ctx, query, findOptions) if err != nil { logger.LogE(err.Error()) return @@ -193,13 +188,13 @@ func (s *mongoPersistent) FindAllJob(ctx context.Context, filter *Filter) (jobs return } -func (s *mongoPersistent) CountAllJob(ctx context.Context, filter *Filter) int { +func (s *MongoPersistent) CountAllJob(ctx context.Context, filter *Filter) int { queryFilter := s.toBsonFilter(filter) - count, _ := s.db.Collection(mongoJobCollections).CountDocuments(ctx, queryFilter) + count, _ := s.db.Collection(jobModelName).CountDocuments(ctx, queryFilter) return int(count) } -func (s *mongoPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filter) (results []TaskSummary) { +func (s *MongoPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filter) (results []TaskSummary) { pipeQuery := []bson.M{ { @@ -239,7 +234,7 @@ func (s *mongoPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filte findOptions := options.Aggregate() findOptions.SetAllowDiskUse(true) - csr, err := s.db.Collection(mongoJobCollections).Aggregate(ctx, pipeQuery, findOptions) + csr, err := s.db.Collection(jobModelName).Aggregate(ctx, pipeQuery, findOptions) if err != nil { logger.LogE(err.Error()) return @@ -249,16 +244,16 @@ func (s *mongoPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filte csr.All(ctx, &results) return } -func (s *mongoPersistent) SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory) { +func (s *MongoPersistent) SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory) { tracer.Log(ctx, "persistent.mongo:save_job", job.ID) var err error if job.ID == "" { - job.ID = primitive.NewObjectID().Hex() + job.ID = uuid.New().String() if len(job.RetryHistories) == 0 { job.RetryHistories = make([]RetryHistory, 0) } - _, err = s.db.Collection(mongoJobCollections).InsertOne(ctx, job) + _, err = s.db.Collection(jobModelName).InsertOne(ctx, job) } else { updateQuery := bson.M{ @@ -275,7 +270,7 @@ func (s *mongoPersistent) SaveJob(ctx context.Context, job *Job, retryHistories opt := options.UpdateOptions{ Upsert: candihelper.ToBoolPtr(true), } - _, err = s.db.Collection(mongoJobCollections).UpdateOne(ctx, + _, err = s.db.Collection(jobModelName).UpdateOne(ctx, bson.M{ "_id": job.ID, }, @@ -288,7 +283,7 @@ func (s *mongoPersistent) SaveJob(ctx context.Context, job *Job, retryHistories } } -func (s *mongoPersistent) UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error) { +func (s *MongoPersistent) UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error) { updateQuery := bson.M{ "$set": bson.M(updated), @@ -302,7 +297,7 @@ func (s *mongoPersistent) UpdateJob(ctx context.Context, filter *Filter, updated } queryFilter := s.toBsonFilter(filter) - res, err := s.db.Collection(mongoJobCollections).UpdateMany(ctx, + res, err := s.db.Collection(jobModelName).UpdateMany(ctx, queryFilter, updateQuery, ) @@ -315,7 +310,7 @@ func (s *mongoPersistent) UpdateJob(ctx context.Context, filter *Filter, updated return res.MatchedCount, res.ModifiedCount, nil } -func (s *mongoPersistent) FindJobByID(ctx context.Context, id string, excludeFields ...string) (job Job, err error) { +func (s *MongoPersistent) FindJobByID(ctx context.Context, id string, excludeFields ...string) (job Job, err error) { tracer.Log(ctx, "persistent.mongo:find_job", id) var opts []*options.FindOneOptions @@ -330,7 +325,7 @@ func (s *mongoPersistent) FindJobByID(ctx context.Context, id string, excludeFie }) } - err = s.db.Collection(mongoJobCollections).FindOne(ctx, bson.M{"_id": id}, opts...).Decode(&job) + err = s.db.Collection(jobModelName).FindOne(ctx, bson.M{"_id": id}, opts...).Decode(&job) if len(job.RetryHistories) == 0 { job.RetryHistories = make([]RetryHistory, 0) @@ -340,9 +335,9 @@ func (s *mongoPersistent) FindJobByID(ctx context.Context, id string, excludeFie return } -func (s *mongoPersistent) CleanJob(ctx context.Context, filter *Filter) (affectedRow int64) { +func (s *MongoPersistent) CleanJob(ctx context.Context, filter *Filter) (affectedRow int64) { - res, err := s.db.Collection(mongoJobCollections).DeleteMany(ctx, s.toBsonFilter(filter)) + res, err := s.db.Collection(jobModelName).DeleteMany(ctx, s.toBsonFilter(filter)) if err != nil { logger.LogE(err.Error()) return affectedRow @@ -351,8 +346,8 @@ func (s *mongoPersistent) CleanJob(ctx context.Context, filter *Filter) (affecte return res.DeletedCount } -func (s *mongoPersistent) DeleteJob(ctx context.Context, id string) (job Job, err error) { - res := s.db.Collection(mongoJobCollections).FindOneAndDelete(ctx, bson.M{"_id": id}) +func (s *MongoPersistent) DeleteJob(ctx context.Context, id string) (job Job, err error) { + res := s.db.Collection(jobModelName).FindOneAndDelete(ctx, bson.M{"_id": id}) if res.Err() != nil { return job, res.Err() } @@ -360,7 +355,7 @@ func (s *mongoPersistent) DeleteJob(ctx context.Context, id string) (job Job, er return } -func (s *mongoPersistent) toBsonFilter(f *Filter) bson.M { +func (s *MongoPersistent) toBsonFilter(f *Filter) bson.M { pipeQuery := []bson.M{} if f.TaskName != "" { @@ -416,7 +411,7 @@ func (s *mongoPersistent) toBsonFilter(f *Filter) bson.M { // summary -func (s *mongoPersistent) FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary) { +func (s *MongoPersistent) FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary) { query := bson.M{} @@ -428,7 +423,7 @@ func (s *mongoPersistent) FindAllSummary(ctx context.Context, filter *Filter) (r } } - cur, err := s.db.Collection(mongoJobSummaryCollections).Find(s.ctx, query) + cur, err := s.db.Collection(jobSummaryModelName).Find(s.ctx, query) if err != nil { logger.LogE(err.Error()) return @@ -454,12 +449,12 @@ func (s *mongoPersistent) FindAllSummary(ctx context.Context, filter *Filter) (r return } -func (s *mongoPersistent) FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary) { - s.db.Collection(mongoJobSummaryCollections).FindOne(ctx, bson.M{"task_name": taskName}).Decode(&result) +func (s *MongoPersistent) FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary) { + s.db.Collection(jobSummaryModelName).FindOne(ctx, bson.M{"task_name": taskName}).Decode(&result) return } -func (s *mongoPersistent) IncrementSummary(ctx context.Context, taskName string, incr map[string]interface{}) { +func (s *MongoPersistent) IncrementSummary(ctx context.Context, taskName string, incr map[string]int64) { opt := options.UpdateOptions{ Upsert: candihelper.ToBoolPtr(true), @@ -469,9 +464,10 @@ func (s *mongoPersistent) IncrementSummary(ctx context.Context, taskName string, if k == "" { continue } + fmt.Println(k, v) incr[strings.ToLower(k)] = v } - _, err := s.db.Collection(mongoJobSummaryCollections).UpdateOne(s.ctx, + _, err := s.db.Collection(jobSummaryModelName).UpdateOne(s.ctx, bson.M{ "task_name": taskName, }, @@ -486,7 +482,7 @@ func (s *mongoPersistent) IncrementSummary(ctx context.Context, taskName string, } } -func (s *mongoPersistent) UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{}) { +func (s *MongoPersistent) UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{}) { opt := options.UpdateOptions{ Upsert: candihelper.ToBoolPtr(true), @@ -498,7 +494,7 @@ func (s *mongoPersistent) UpdateSummary(ctx context.Context, taskName string, up } updated[strings.ToLower(k)] = v } - _, err := s.db.Collection(mongoJobSummaryCollections).UpdateOne(s.ctx, + _, err := s.db.Collection(jobSummaryModelName).UpdateOne(s.ctx, bson.M{ "task_name": taskName, }, @@ -513,7 +509,7 @@ func (s *mongoPersistent) UpdateSummary(ctx context.Context, taskName string, up } } -func (s *mongoPersistent) Ping(ctx context.Context) error { +func (s *MongoPersistent) Ping(ctx context.Context) error { if err := s.db.Client().Ping(ctx, readpref.Primary()); err != nil { return fmt.Errorf("mongodb ping: %v", err) diff --git a/codebase/app/task_queue_worker/persistent_sql.go b/codebase/app/task_queue_worker/persistent_sql.go new file mode 100644 index 00000000..33b6e504 --- /dev/null +++ b/codebase/app/task_queue_worker/persistent_sql.go @@ -0,0 +1,508 @@ +package taskqueueworker + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + "time" + + "github.com/golangid/candi/candihelper" + "github.com/golangid/candi/logger" + "github.com/google/uuid" +) + +type ( + SQLPersistent struct { + db *sql.DB + summary Summary + queryReplacer *strings.Replacer + } +) + +// NewSQLPersistent init new persistent SQL +func NewSQLPersistent(db *sql.DB) *SQLPersistent { + + // init jobs table + _, err := db.Exec(`CREATE TABLE IF NOT EXISTS ` + jobModelName + ` ( + id VARCHAR(255) PRIMARY KEY NOT NULL DEFAULT '', + task_name VARCHAR(255) NOT NULL DEFAULT '', + arguments TEXT NOT NULL DEFAULT '', + retries INTEGER NOT NULL DEFAULT 0, + max_retry INTEGER NOT NULL DEFAULT 0, + interval VARCHAR(255) NOT NULL DEFAULT '', + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + finished_at TIMESTAMP NULL, + status VARCHAR(255) NOT NULL DEFAULT '', + error VARCHAR(255) NOT NULL DEFAULT '', + trace_id VARCHAR(255) NOT NULL DEFAULT '' + );`) + if err != nil { + panic(err) + } + + // init job_summaries table + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS ` + jobSummaryModelName + ` ( + id VARCHAR(255) PRIMARY KEY NOT NULL DEFAULT '', + success INTEGER NOT NULL DEFAULT 0, + queueing INTEGER NOT NULL DEFAULT 0, + retrying INTEGER NOT NULL DEFAULT 0, + failure INTEGER NOT NULL DEFAULT 0, + stopped INTEGER NOT NULL DEFAULT 0, + is_loading BOOLEAN DEFAULT false + );`) + if err != nil { + panic(err) + } + + // init job_histories table + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS task_queue_worker_job_histories ( + id INTEGER PRIMARY KEY NOT NULL, + job_id VARCHAR(255), + error_stack VARCHAR(255), + status VARCHAR(255), + error VARCHAR(255), + trace_id VARCHAR(255), + start_at VARCHAR(255), + end_at VARCHAR(255) + );`) + if err != nil { + panic(err) + } + + indexList := map[string]struct { + tableName, field string + }{ + "idx_task_name": {jobModelName, "task_name"}, + "idx_status": {jobModelName, "status"}, + "idx_created_at": {jobModelName, "created_at"}, + "idx_args_err": {jobModelName, "arguments, error"}, + "idx_task_name_status": {jobModelName, "task_name, status"}, + "idx_task_name_status_created_at": {jobModelName, "task_name, status, created_at"}, + "idx_task_name_summary": {jobSummaryModelName, "id"}, + "idx_job_id_history": {"task_queue_worker_job_histories", "job_id"}, + "idx_start_at_history": {"task_queue_worker_job_histories", "start_at"}, + } + for indexName, field := range indexList { + _, err := db.Exec(`CREATE INDEX IF NOT EXISTS ` + indexName + ` ON ` + field.tableName + ` (` + field.field + `)`) + if err != nil { + panic(err) + } + } + + sqlLitePersistent := &SQLPersistent{ + db: db, + queryReplacer: strings.NewReplacer("'", "''"), + } + sqlLitePersistent.summary = sqlLitePersistent + return sqlLitePersistent +} + +func (s *SQLPersistent) Ping(ctx context.Context) error { + return s.db.Ping() +} +func (s *SQLPersistent) SetSummary(summary Summary) { + s.summary = summary +} +func (s *SQLPersistent) Summary() Summary { + return s.summary +} +func (s *SQLPersistent) FindAllJob(ctx context.Context, filter *Filter) (jobs []Job) { + where, _ := s.toQueryFilter(filter) + if filter.Sort == "" { + filter.Sort = "-created_at" + } + sort := "ASC" + if strings.HasPrefix(filter.Sort, "-") { + sort = "DESC" + } + filter.Sort = strings.TrimPrefix(filter.Sort, "-") + query := `SELECT id, task_name, arguments, retries, max_retry, interval, created_at, finished_at, status, error, trace_id + FROM ` + jobModelName + ` ` + where + ` ORDER BY ` + filter.Sort + ` ` + sort + if !filter.ShowAll { + query += fmt.Sprintf(` LIMIT %d OFFSET %d `, filter.Limit, (filter.Page-1)*filter.Limit) + } + rows, err := s.db.Query(query) + if err != nil { + logger.LogE(err.Error()) + return jobs + } + defer rows.Close() + + for rows.Next() { + var job Job + var finishedAt sql.NullTime + if err := rows.Scan( + &job.ID, &job.TaskName, &job.Arguments, &job.Retries, &job.MaxRetry, &job.Interval, &job.CreatedAt, + &finishedAt, &job.Status, &job.Error, &job.TraceID, + ); err != nil { + return + } + job.FinishedAt = finishedAt.Time + jobs = append(jobs, job) + } + + return +} +func (s *SQLPersistent) FindJobByID(ctx context.Context, id string, excludeFields ...string) (job Job, err error) { + var finishedAt sql.NullTime + err = s.db.QueryRow(`SELECT id, task_name, arguments, retries, max_retry, interval, created_at, finished_at, status, error, trace_id + FROM `+jobModelName+` WHERE id='`+s.queryReplacer.Replace(id)+`'`).Scan( + &job.ID, &job.TaskName, &job.Arguments, &job.Retries, &job.MaxRetry, &job.Interval, &job.CreatedAt, + &finishedAt, &job.Status, &job.Error, &job.TraceID, + ) + job.FinishedAt = finishedAt.Time + if err != nil { + logger.LogE(err.Error()) + } + + if len(excludeFields) == 0 { + rows, err := s.db.Query(`SELECT error_stack, status, error, trace_id, start_at, end_at FROM task_queue_worker_job_histories + WHERE job_id = '` + s.queryReplacer.Replace(id) + `' ORDER BY start_at DESC`) + if err != nil { + logger.LogE(err.Error()) + return job, err + } + defer rows.Close() + + for rows.Next() { + var rh RetryHistory + var startAt, endAt string + rows.Scan(&rh.ErrorStack, &rh.Status, &rh.Error, &rh.TraceID, &startAt, &endAt) + rh.StartAt, _ = time.Parse(time.RFC3339Nano, startAt) + rh.EndAt, _ = time.Parse(time.RFC3339Nano, endAt) + job.RetryHistories = append(job.RetryHistories, rh) + } + } + + return +} +func (s *SQLPersistent) CountAllJob(ctx context.Context, filter *Filter) (count int) { + where, _ := s.toQueryFilter(filter) + s.db.QueryRow(`SELECT COUNT(*) FROM ` + jobModelName + ` ` + where).Scan(&count) + return +} +func (s *SQLPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filter) (result []TaskSummary) { + where, _ := s.toQueryFilter(filter) + query := `SELECT COUNT(status), status, task_name FROM ` + jobModelName + ` ` + where + ` GROUP BY status, task_name` + rows, err := s.db.Query(query) + if err != nil { + logger.LogE(err.Error()) + return + } + defer rows.Close() + + mapSummary := make(map[string]TaskSummary) + for rows.Next() { + var count int + var status, taskName string + rows.Scan(&count, &status, &taskName) + summary := mapSummary[taskName] + switch status { + case string(statusSuccess): + summary.Success += count + case string(statusQueueing): + summary.Queueing += count + case string(statusRetrying): + summary.Retrying += count + case string(statusFailure): + summary.Failure += count + case string(statusStopped): + summary.Stopped += count + } + mapSummary[taskName] = summary + } + + for taskName, summary := range mapSummary { + summary.TaskName = taskName + summary.ID = taskName + result = append(result, summary) + } + + return +} +func (s *SQLPersistent) SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory) { + var query string + if job.ID == "" { + job.ID = uuid.NewString() + job.CreatedAt = time.Now() + query = `INSERT INTO ` + jobModelName + ` (id, task_name, arguments, retries, max_retry, interval, created_at, + updated_at, finished_at, status, error, trace_id) VALUES ( + '` + s.queryReplacer.Replace(job.ID) + `', + '` + s.queryReplacer.Replace(job.TaskName) + `', + '` + s.queryReplacer.Replace(job.Arguments) + `', + '` + candihelper.ToString(job.Retries) + `', + '` + candihelper.ToString(job.MaxRetry) + `', + '` + s.queryReplacer.Replace(job.Interval) + `', + '` + job.CreatedAt.Format(time.RFC3339) + `', + '` + time.Now().Format(time.RFC3339) + `', + '` + job.FinishedAt.Format(time.RFC3339) + `', + '` + s.queryReplacer.Replace(job.Status) + `', + '` + s.queryReplacer.Replace(job.Error) + `', + '` + s.queryReplacer.Replace(job.TraceID) + `' + )` + } else { + query = `UPDATE ` + jobModelName + ` SET + task_name='` + s.queryReplacer.Replace(job.TaskName) + `', + arguments='` + s.queryReplacer.Replace(job.Arguments) + `', + retries='` + candihelper.ToString(job.Retries) + `', + max_retry='` + candihelper.ToString(job.MaxRetry) + `', + interval='` + s.queryReplacer.Replace(job.Interval) + `', + updated_at='` + time.Now().Format(time.RFC3339) + `', + finished_at='` + job.FinishedAt.Format(time.RFC3339) + `', + status='` + s.queryReplacer.Replace(job.Status) + `', + error='` + s.queryReplacer.Replace(job.Error) + `', + trace_id='` + s.queryReplacer.Replace(job.TraceID) + `' + WHERE id = '` + s.queryReplacer.Replace(job.ID) + `'` + } + + _, err := s.db.Exec(query) + if err != nil { + logger.LogE(err.Error()) + } + + for _, rh := range retryHistories { + _, err = s.db.Exec(`INSERT INTO task_queue_worker_job_histories (job_id, error_stack, status, error, trace_id, start_at, end_at) + VALUES ( + '` + s.queryReplacer.Replace(job.ID) + `', + '` + s.queryReplacer.Replace(rh.ErrorStack) + `', + '` + s.queryReplacer.Replace(rh.Status) + `', + '` + s.queryReplacer.Replace(rh.Error) + `', + '` + s.queryReplacer.Replace(rh.TraceID) + `', + '` + rh.StartAt.Format(time.RFC3339Nano) + `', + '` + rh.EndAt.Format(time.RFC3339Nano) + `' + )`) + if err != nil { + logger.LogE(err.Error()) + } + } +} +func (s *SQLPersistent) UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error) { + where, err := s.toQueryFilter(filter) + if err != nil { + logger.LogE(err.Error()) + return matchedCount, affectedRow, err + } + tx, err := s.db.Begin() + if err != nil { + logger.LogE(err.Error()) + return matchedCount, affectedRow, err + } + defer func() { + if err != nil { + tx.Rollback() + } else { + tx.Commit() + } + }() + + tx.QueryRow(`SELECT COUNT(*) FROM ` + jobModelName + ` ` + where).Scan(&matchedCount) + var setFields []string + for field, value := range updated { + setFields = append(setFields, field+"='"+s.queryReplacer.Replace(candihelper.ToString(value))+"'") + } + res, err := tx.Exec(`UPDATE ` + jobModelName + ` SET ` + strings.Join(setFields, ",") + ` ` + where) + if err != nil { + logger.LogE(err.Error()) + return matchedCount, affectedRow, err + } + affectedRow, _ = res.RowsAffected() + + if filter.JobID != nil { + for _, rh := range retryHistories { + _, err = tx.Exec(`INSERT INTO task_queue_worker_job_histories (job_id, error_stack, status, error, trace_id, start_at, end_at) + VALUES ( + '` + s.queryReplacer.Replace(*filter.JobID) + `', + '` + s.queryReplacer.Replace(rh.ErrorStack) + `', + '` + s.queryReplacer.Replace(rh.Status) + `', + '` + s.queryReplacer.Replace(rh.Error) + `', + '` + s.queryReplacer.Replace(rh.TraceID) + `', + '` + rh.StartAt.Format(time.RFC3339Nano) + `', + '` + rh.EndAt.Format(time.RFC3339Nano) + `' + )`) + if err != nil { + logger.LogE(err.Error()) + } + } + } + return +} +func (s *SQLPersistent) CleanJob(ctx context.Context, filter *Filter) (affectedRow int64) { + where, err := s.toQueryFilter(filter) + if err != nil { + logger.LogE(err.Error()) + return affectedRow + } + + res, err := s.db.Exec(`DELETE FROM ` + jobModelName + ` ` + where) + if err != nil { + logger.LogE(err.Error()) + return affectedRow + } + affectedRow, _ = res.RowsAffected() + return +} +func (s *SQLPersistent) DeleteJob(ctx context.Context, id string) (job Job, err error) { + err = s.db.QueryRow(`SELECT id, task_name, arguments, retries, max_retry, interval, created_at, finished_at, status, error, trace_id + FROM `+jobModelName+` WHERE id='`+s.queryReplacer.Replace(id)+`'`).Scan( + &job.ID, &job.TaskName, &job.Arguments, &job.Retries, &job.MaxRetry, &job.Interval, &job.CreatedAt, + &job.FinishedAt, &job.Status, &job.Error, &job.TraceID, + ) + _, err = s.db.Exec(`DELETE FROM ` + jobModelName + ` WHERE id='` + s.queryReplacer.Replace(id) + `'`) + if err != nil { + logger.LogE(err.Error()) + } + return +} + +// summary +func (s *SQLPersistent) FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary) { + var where string + if filter.TaskName != "" { + where = ` WHERE id = '` + s.queryReplacer.Replace(filter.TaskName) + `'` + } else if len(filter.TaskNameList) > 0 { + var taskNameList []string + for _, taskName := range filter.TaskNameList { + taskNameList = append(taskNameList, "'"+s.queryReplacer.Replace(taskName)+"'") + } + where = " WHERE id IN (" + strings.Join(taskNameList, ",") + ")" + } + query := `SELECT id, success, queueing, retrying, failure, stopped, is_loading FROM ` + jobSummaryModelName + where + rows, err := s.db.Query(query) + if err != nil { + return + } + defer rows.Close() + for rows.Next() { + var detail TaskSummary + rows.Scan(&detail.TaskName, &detail.Success, &detail.Queueing, &detail.Retrying, + &detail.Failure, &detail.Stopped, &detail.IsLoading) + detail.ID = detail.TaskName + result = append(result, detail) + } + + if len(filter.Statuses) > 0 { + for i, res := range result { + mapRes := res.ToMapResult() + newCount := map[string]int{} + for _, status := range filter.Statuses { + newCount[strings.ToUpper(status)] = mapRes[strings.ToUpper(status)] + } + res.SetValue(newCount) + result[i] = res + } + } + + return +} +func (s *SQLPersistent) FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary) { + err := s.db.QueryRow(`SELECT id, success, queueing, retrying, failure, stopped, is_loading + FROM `+jobSummaryModelName+` WHERE id='`+s.queryReplacer.Replace(taskName)+`'`). + Scan(&result.TaskName, &result.Success, &result.Queueing, &result.Retrying, + &result.Failure, &result.Stopped, &result.IsLoading) + if err != nil { + logger.LogE(err.Error()) + } + result.ID = result.TaskName + return +} +func (s *SQLPersistent) UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{}) { + var setFields []string + for field, value := range updated { + if field == "" { + continue + } + field = strings.ToLower(field) + setFields = append(setFields, field+"='"+candihelper.ToString(value)+"'") + } + query := `UPDATE ` + jobSummaryModelName + ` SET ` + strings.Join(setFields, ",") + ` WHERE id='` + s.queryReplacer.Replace(taskName) + `'` + res, err := s.db.Exec(query) + if err != nil { + logger.LogE(err.Error()) + } + affected, _ := res.RowsAffected() + if affected == 0 { + query := fmt.Sprintf(`INSERT INTO %s (id, success, queueing, retrying, failure, stopped) VALUES ('%s', '%d', '%d', '%d', '%d', '%d')`, + jobSummaryModelName, s.queryReplacer.Replace(taskName), candihelper.ToInt(updated["success"]), candihelper.ToInt(updated["queueing"]), + candihelper.ToInt(updated["retrying"]), candihelper.ToInt(updated["failure"]), candihelper.ToInt(updated["stopped"])) + _, err := s.db.Exec(query) + if err != nil { + logger.LogE(err.Error()) + } + } + return +} +func (s *SQLPersistent) IncrementSummary(ctx context.Context, taskName string, incr map[string]int64) { + var setFields []string + for field, value := range incr { + if field == "" { + continue + } + field = strings.ToLower(field) + val := candihelper.ToString(value) + if value >= 0 { + val = "+" + candihelper.ToString(value) + } + setFields = append(setFields, field+"="+field+val) + } + query := `UPDATE ` + jobSummaryModelName + ` SET ` + strings.Join(setFields, ",") + ` WHERE id='` + s.queryReplacer.Replace(taskName) + `'` + res, err := s.db.Exec(query) + if err != nil { + logger.LogE(err.Error()) + return + } + affected, _ := res.RowsAffected() + if affected == 0 { + query := fmt.Sprintf(`INSERT INTO %s (id, success, queueing, retrying, failure, stopped) VALUES ('%s', '%d', '%d', '%d', '%d', '%d')`, + jobSummaryModelName, s.queryReplacer.Replace(taskName), candihelper.ToInt(incr["success"]), candihelper.ToInt(incr["queueing"]), + candihelper.ToInt(incr["retrying"]), candihelper.ToInt(incr["failure"]), candihelper.ToInt(incr["stopped"])) + _, err := s.db.Exec(query) + if err != nil { + logger.LogE(err.Error()) + } + } + return +} + +func (s *SQLPersistent) toQueryFilter(f *Filter) (where string, err error) { + + var conditions []string + if f.TaskName != "" { + conditions = append(conditions, "task_name='"+s.queryReplacer.Replace(f.TaskName)+"'") + } else if len(f.TaskNameList) > 0 { + var taskNameList []string + for _, taskName := range f.TaskNameList { + taskNameList = append(taskNameList, "'"+s.queryReplacer.Replace(taskName)+"'") + } + conditions = append(conditions, "task_name IN ("+strings.Join(taskNameList, ",")+")") + } + + if f.JobID != nil && *f.JobID != "" { + conditions = append(conditions, "id='"+s.queryReplacer.Replace(*f.JobID)+"'") + } + if f.Search != nil && *f.Search != "" { + conditions = append(conditions, `(arguments LIKE '%%`+*f.Search+`%%' OR error LIKE '%%`+*f.Search+`%%')`) + } + if len(f.Statuses) > 0 { + var statuses []string + for _, status := range f.Statuses { + statuses = append(statuses, "'"+s.queryReplacer.Replace(status)+"'") + } + conditions = append(conditions, "status IN ("+strings.Join(statuses, ",")+")") + } + if f.Status != nil { + conditions = append(conditions, "status='"+s.queryReplacer.Replace(*f.Status)+"'") + } + if !f.StartDate.IsZero() && !f.EndDate.IsZero() { + conditions = append(conditions, "created_at BETWEEN '"+f.StartDate.Format(time.RFC3339)+"' AND '"+f.EndDate.Format(time.RFC3339)+"'") + } + + if len(conditions) == 0 { + return where, errors.New("empty filter") + } + + where = " WHERE " + strings.Join(conditions, " AND ") + return +} diff --git a/codebase/app/task_queue_worker/summary.go b/codebase/app/task_queue_worker/summary.go index b4654165..63e995db 100644 --- a/codebase/app/task_queue_worker/summary.go +++ b/codebase/app/task_queue_worker/summary.go @@ -12,7 +12,7 @@ type ( FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary) FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary) UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{}) - IncrementSummary(ctx context.Context, taskName string, incr map[string]interface{}) + IncrementSummary(ctx context.Context, taskName string, incr map[string]int64) } // TaskSummary model @@ -152,7 +152,7 @@ func (i *inMemSummary) UpdateSummary(ctx context.Context, taskName string, updat i.values[taskName] = summary return } -func (i *inMemSummary) IncrementSummary(ctx context.Context, taskName string, incr map[string]interface{}) { +func (i *inMemSummary) IncrementSummary(ctx context.Context, taskName string, incr map[string]int64) { i.mu.Lock() defer i.mu.Unlock() @@ -161,24 +161,17 @@ func (i *inMemSummary) IncrementSummary(ctx context.Context, taskName string, in summary = new(TaskSummary) } for k, v := range incr { - var count int - switch c := v.(type) { - case int: - count = c - case int64: - count = int(c) - } switch strings.ToUpper(k) { case string(statusFailure): - summary.Failure += count + summary.Failure += int(v) case string(statusRetrying): - summary.Retrying += count + summary.Retrying += int(v) case string(statusSuccess): - summary.Success += count + summary.Success += int(v) case string(statusQueueing): - summary.Queueing += count + summary.Queueing += int(v) case string(statusStopped): - summary.Stopped += count + summary.Stopped += int(v) } } i.values[taskName] = summary diff --git a/codebase/app/task_queue_worker/task_queue_worker.go b/codebase/app/task_queue_worker/task_queue_worker.go index bd7d9d5f..f151c0c7 100644 --- a/codebase/app/task_queue_worker/task_queue_worker.go +++ b/codebase/app/task_queue_worker/task_queue_worker.go @@ -103,7 +103,7 @@ func (t *taskQueueWorker) prepare() { "status": job.Status, }) - persistent.Summary().IncrementSummary(t.ctx, job.TaskName, map[string]interface{}{ + persistent.Summary().IncrementSummary(t.ctx, job.TaskName, map[string]int64{ statusBefore: -matched, job.Status: affected, }) } diff --git a/codebase/app/task_queue_worker/trigger_task.go b/codebase/app/task_queue_worker/trigger_task.go index 62fc4ccd..c0958d48 100644 --- a/codebase/app/task_queue_worker/trigger_task.go +++ b/codebase/app/task_queue_worker/trigger_task.go @@ -88,7 +88,7 @@ func (t *taskQueueWorker) execJob(ctx context.Context, runningTask *Task) { matchedCount, affectedCount, err := persistent.UpdateJob( t.ctx, &Filter{JobID: &job.ID}, job.toMap(), ) - persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{ + persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{ string(job.Status): affectedCount, statusBefore: -matchedCount, }) @@ -133,7 +133,7 @@ func (t *taskQueueWorker) execJob(ctx context.Context, runningTask *Task) { if affectedCount == 0 && matchedCount == 0 { persistent.SaveJob(t.ctx, &job, retryHistory) } - persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{ + persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{ job.Status: affectedCount, statusBefore: -matchedCount, }) diff --git a/codebase/factory/appfactory/setup_task_queue_worker.go b/codebase/factory/appfactory/setup_task_queue_worker.go index b18f3f27..943b6e6b 100644 --- a/codebase/factory/appfactory/setup_task_queue_worker.go +++ b/codebase/factory/appfactory/setup_task_queue_worker.go @@ -11,23 +11,14 @@ func SetupTaskQueueWorker(service factory.ServiceFactory, opts ...taskqueueworke if service.GetDependency().GetRedisPool() == nil { panic("Task queue worker require redis for queue") } - if service.GetDependency().GetMongoDatabase() == nil { - panic("Task queue worker require mongo for dashboard management") - } queue := taskqueueworker.NewRedisQueue(service. GetDependency(). GetRedisPool(). WritePool(), ) - persistent := taskqueueworker.NewMongoPersistent(service. - GetDependency(). - GetMongoDatabase(). - WriteDB(), - ) workerOpts := []taskqueueworker.OptionFunc{ taskqueueworker.SetQueue(queue), - taskqueueworker.SetPersistent(persistent), taskqueueworker.SetDashboardHTTPPort(env.BaseEnv().TaskQueueDashboardPort), taskqueueworker.SetMaxClientSubscriber(env.BaseEnv().TaskQueueDashboardMaxClientSubscribers), taskqueueworker.SetTracingDashboard(env.BaseEnv().JaegerTracingDashboard + "/trace"), diff --git a/mocks/codebase/app/task_queue_worker/Summary.go b/mocks/codebase/app/task_queue_worker/Summary.go index 245c2610..8cbed7be 100644 --- a/mocks/codebase/app/task_queue_worker/Summary.go +++ b/mocks/codebase/app/task_queue_worker/Summary.go @@ -45,7 +45,7 @@ func (_m *Summary) FindDetailSummary(ctx context.Context, taskName string) taskq } // IncrementSummary provides a mock function with given fields: ctx, taskName, incr -func (_m *Summary) IncrementSummary(ctx context.Context, taskName string, incr map[string]interface{}) { +func (_m *Summary) IncrementSummary(ctx context.Context, taskName string, incr map[string]int64) { _m.Called(ctx, taskName, incr) }