Skip to content

Commit

Permalink
feat: add SQL persistent for task queue worker
Browse files Browse the repository at this point in the history
default using SQLite (replace mongo if no database)
  • Loading branch information
agungdwiprasetyo committed Jul 27, 2022
1 parent 20e2eed commit ffb6274
Show file tree
Hide file tree
Showing 14 changed files with 694 additions and 97 deletions.
89 changes: 89 additions & 0 deletions candihelper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,3 +544,92 @@ func GetRuntimeStackLine() string {

return fmt.Sprintf("%s:%d", file, line)
}

// ToString helper
func ToString(val interface{}) (str string) {
switch s := val.(type) {
case string:
return s
case bool:
return strconv.FormatBool(s)
case float64:
return strconv.FormatFloat(s, 'f', -1, 64)
case float32:
return strconv.FormatFloat(float64(s), 'f', -1, 32)
case int:
return strconv.Itoa(s)
case int64:
return strconv.FormatInt(s, 10)
case int32:
return strconv.Itoa(int(s))
case int16:
return strconv.FormatInt(int64(s), 10)
case int8:
return strconv.FormatInt(int64(s), 10)
case uint:
return strconv.FormatInt(int64(s), 10)
case uint64:
return strconv.FormatInt(int64(s), 10)
case uint32:
return strconv.FormatInt(int64(s), 10)
case uint16:
return strconv.FormatInt(int64(s), 10)
case uint8:
return strconv.FormatInt(int64(s), 10)
case []byte:
return string(s)
case nil:
return ""
case fmt.Stringer:
return s.String()
case error:
return s.Error()
default:
return ""
}
}

// ToInt helper
func ToInt(val interface{}) (i int) {
switch s := val.(type) {
case int:
return s
case int64:
return int(s)
case int32:
return int(s)
case int16:
return int(s)
case int8:
return int(s)
case uint:
return int(s)
case uint64:
return int(s)
case uint32:
return int(s)
case uint16:
return int(s)
case uint8:
return int(s)
case float64:
return int(s)
case float32:
return int(s)
case string:
v, err := strconv.ParseInt(s, 0, 0)
if err == nil {
return int(v)
}
return 0
case bool:
if s {
return 1
}
return 0
case nil:
return 0
default:
return 0
}
}
19 changes: 17 additions & 2 deletions cmd/candi/template_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,13 @@ func SetEnv(env Environment) {
package configs
import (
taskqueueworker "{{.LibraryName}}/codebase/app/task_queue_worker"
"{{.LibraryName}}/codebase/factory"
"{{.LibraryName}}/codebase/factory/appfactory"
"{{.LibraryName}}/config/env"
"{{.LibraryName}}/config/env"{{if not .MongoDeps}}
"database/sql"
_ "github.com/mattn/go-sqlite3"{{end}}
)
/*
Expand Down Expand Up @@ -138,7 +142,18 @@ func InitAppFromEnvironmentConfig(service factory.ServiceFactory) (apps []factor
apps = append(apps, appfactory.SetupCronWorker(service))
}
if env.BaseEnv().UseTaskQueueWorker {
apps = append(apps, appfactory.SetupTaskQueueWorker(service))
{{if .MongoDeps}}persistent := taskqueueworker.NewMongoPersistent(service.
GetDependency().
GetMongoDatabase().
WriteDB(),
){{else}}db, err := sql.Open("sqlite3", "./candi_task_queue_worker.db")
if err != nil {
panic(err)
}
persistent := taskqueueworker.NewSQLPersistent(db){{end}}
apps = append(apps, appfactory.SetupTaskQueueWorker(service,
taskqueueworker.SetPersistent(persistent),
))
}
if env.BaseEnv().UseRedisSubscriber {
apps = append(apps, appfactory.SetupRedisWorker(service))
Expand Down
17 changes: 10 additions & 7 deletions cmd/candi/template_etc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package main

const (
dockerfileTemplate = `# Stage 1
FROM golang:1.16.4-alpine3.13 AS dependency_builder
FROM golang:1.18.4-alpine3.16 AS dependency_builder
WORKDIR /go/src
ENV GO111MODULE=on
RUN apk update
RUN apk add --no-cache bash ca-certificates git make
RUN apk add --no-cache bash ca-certificates git gcc musl-dev
COPY go.mod .
COPY go.sum .
Expand All @@ -21,7 +21,7 @@ FROM dependency_builder AS service_builder
WORKDIR /usr/app
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o bin
RUN go build -o bin
# Stage 3
FROM alpine:latest
Expand Down Expand Up @@ -85,7 +85,10 @@ test:
go {{.GoVersion}}
require {{.LibraryName}} {{.Version}}
require (
{{.LibraryName}} {{.Version}}
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
)
`

gitignoreTemplate = `bin
Expand Down Expand Up @@ -345,13 +348,13 @@ $ make migration service={{service_name}} down
"```\n"

dockerfileMonorepoTemplate = `# Stage 1
FROM golang:1.16.4-alpine3.13 AS dependency_builder
FROM golang:1.18.4-alpine3.16 AS dependency_builder
WORKDIR /go/src
ENV GO111MODULE=on
RUN apk update
RUN apk add --no-cache bash ca-certificates git
RUN apk add --no-cache bash ca-certificates git make gcc musl-dev
COPY go.mod .
COPY go.sum .
Expand All @@ -368,7 +371,7 @@ COPY sdk sdk
COPY services/$SERVICE_NAME services/$SERVICE_NAME
COPY go.mod .
COPY go.sum .
RUN CGO_ENABLED=0 GOOS=linux go build -o bin services/$SERVICE_NAME/*.go
RUN go build -o bin services/$SERVICE_NAME/*.go
# Stage 3
FROM alpine:latest
Expand Down
24 changes: 12 additions & 12 deletions codebase/app/task_queue_worker/graphql_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (r *rootResolver) DeleteJob(ctx context.Context, input struct{ JobID string
if err != nil {
return "", err
}
persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{
persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{
job.Status: -1,
})
broadcastAllToSubscribers(r.worker.ctx)
Expand Down Expand Up @@ -164,7 +164,7 @@ func (r *rootResolver) StopAllJob(ctx context.Context, input struct {
stopAllJobInTask(input.TaskName)
queue.Clear(ctx, input.TaskName)

incrQuery := map[string]int{}
incrQuery := map[string]int64{}
affectedStatus := []string{string(statusQueueing), string(statusRetrying)}
for _, status := range affectedStatus {
countMatchedFilter, countAffected, err := persistent.UpdateJob(ctx,
Expand All @@ -178,12 +178,12 @@ func (r *rootResolver) StopAllJob(ctx context.Context, input struct {
if err != nil {
continue
}
incrQuery[strings.ToLower(status)] -= int(countMatchedFilter)
incrQuery[strings.ToLower(string(statusStopped))] += int(countAffected)
incrQuery[strings.ToLower(status)] -= countMatchedFilter
incrQuery[strings.ToLower(string(statusStopped))] += countAffected
}

broadcastWhenChangeAllJob(ctx, input.TaskName, false)
persistent.Summary().IncrementSummary(ctx, input.TaskName, convertIncrementMap(incrQuery))
persistent.Summary().IncrementSummary(ctx, input.TaskName, incrQuery)
broadcastAllToSubscribers(r.worker.ctx)

}(r.worker.ctx)
Expand Down Expand Up @@ -214,7 +214,7 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct {
}
})

incr := map[string]int{}
incr := map[string]int64{}
for _, status := range filter.Statuses {
countMatchedFilter, countAffected, err := persistent.UpdateJob(ctx,
&Filter{
Expand All @@ -228,12 +228,12 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct {
if err != nil {
continue
}
incr[strings.ToLower(status)] -= int(countMatchedFilter)
incr[strings.ToLower(string(statusQueueing))] += int(countAffected)
incr[strings.ToLower(status)] -= countMatchedFilter
incr[strings.ToLower(string(statusQueueing))] += countAffected
}

broadcastWhenChangeAllJob(ctx, input.TaskName, false)
persistent.Summary().IncrementSummary(ctx, input.TaskName, convertIncrementMap(incr))
persistent.Summary().IncrementSummary(ctx, input.TaskName, incr)
broadcastAllToSubscribers(r.worker.ctx)
refreshWorkerNotif <- struct{}{}

Expand All @@ -250,19 +250,19 @@ func (r *rootResolver) CleanJob(ctx context.Context, input struct {

broadcastWhenChangeAllJob(ctx, input.TaskName, true)

incrQuery := map[string]int{}
incrQuery := map[string]int64{}
affectedStatus := []string{string(statusSuccess), string(statusFailure), string(statusStopped)}
for _, status := range affectedStatus {
countAffected := persistent.CleanJob(ctx,
&Filter{
TaskName: input.TaskName, Status: &status,
},
)
incrQuery[strings.ToLower(status)] -= int(countAffected)
incrQuery[strings.ToLower(status)] -= countAffected
}

broadcastWhenChangeAllJob(ctx, input.TaskName, false)
persistent.Summary().IncrementSummary(ctx, input.TaskName, convertIncrementMap(incrQuery))
persistent.Summary().IncrementSummary(ctx, input.TaskName, incrQuery)
broadcastAllToSubscribers(r.worker.ctx)

}(r.worker.ctx)
Expand Down
5 changes: 2 additions & 3 deletions codebase/app/task_queue_worker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ func (job *Job) toMap() map[string]interface{} {
"retries": job.Retries,
"max_retry": job.MaxRetry,
"interval": job.Interval,
"created_at": job.CreatedAt,
"finished_at": job.FinishedAt,
"created_at": job.CreatedAt.Format(time.RFC3339),
"finished_at": job.FinishedAt.Format(time.RFC3339),
"status": job.Status,
"error": job.Error,
"error_stack": job.ErrorStack,
"trace_id": job.TraceID,
}
}
Expand Down
8 changes: 3 additions & 5 deletions codebase/app/task_queue_worker/job_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/golangid/candi/candihelper"
"github.com/golangid/candi/candiutils"
"github.com/golangid/candi/tracer"
"github.com/google/uuid"
)

type (
Expand Down Expand Up @@ -70,7 +69,6 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) {
}

var newJob Job
newJob.ID = uuid.New().String()
newJob.TaskName = req.TaskName
newJob.Arguments = string(req.Args)
newJob.MaxRetry = req.MaxRetry
Expand All @@ -89,7 +87,7 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) {
defer func() { <-semaphoreAddJob }()

persistent.SaveJob(ctx, job)
persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{
persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{
strings.ToLower(job.Status): 1,
})
broadcastAllToSubscribers(ctx)
Expand Down Expand Up @@ -184,7 +182,7 @@ func RetryJob(ctx context.Context, jobID string) error {
matched, affected, _ := persistent.UpdateJob(ctx, &Filter{JobID: &job.ID}, map[string]interface{}{
"status": job.Status, "interval": job.Interval, "retries": job.Retries,
})
persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{
persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{
statusBefore: -matched,
job.Status: affected,
})
Expand Down Expand Up @@ -223,7 +221,7 @@ func StopJob(ctx context.Context, jobID string) error {
if err != nil {
return err
}
persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]interface{}{
persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{
job.Status: countAffected,
statusBefore: -matchedCount,
})
Expand Down
5 changes: 5 additions & 0 deletions codebase/app/task_queue_worker/persistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"context"
)

const (
jobModelName = "task_queue_worker_jobs"
jobSummaryModelName = "task_queue_worker_job_summaries"
)

type (
// Persistent abstraction
Persistent interface {
Expand Down
Loading

0 comments on commit ffb6274

Please sign in to comment.