This project aims to be a blueprint for your own job queue solution with Go and PostgreSQL. It is recommended to fork this project and adjust the job queue with features to your own needs.
By using Postgres SKIP LOCKED
feature, this allows to make a performant,
non-blocking and independent queue. The technique is described
here and here.
- Performat: Non-blocking queue mechanism
- Robust: Jobs will be 'freed' again when a worker crashes
- Failed jobs will be retried until
MAX_RETRIES
, current attempt is passed as argument - Schedule jobs for later execution with
EnqueueAt(ctx, job, "queuename", timeAt)
- Support for multiple queues
- Zero dependency
A complete, runnable examples can be found under ./example/
.
First define a pgjobs.Job
, for example in ./jobs/emailUser.go
:
package jobs
import (
"encoding/json"
"log"
"github.com/rverton/pgjobs"
)
type EmailUser struct {
Email string
}
func NewEmailUser(email string) *EmailUser {
return &EmailUser{
Email: email,
}
}
// the action which should be executed
func (e EmailUser) Perform(attempt int32) error {
log.Printf("emailing %v, attempt=%v", e.Email, attempt)
return nil
}
// this is boilerplate code and does not need to be modified
func (e EmailUser) Load(data string) (pgjobs.Job, error) {
var n EmailUser
err := json.Unmarshal([]byte(data), &n)
return n, err
}
You can then setup a queue, (optionally) enforce the jobs table schema, and work on queued jobs.
package main
import (
"context"
"database/sql"
"example/jobs"
"log"
"os"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/rverton/pgjobs"
)
func main() {
db, err := sql.Open("postgres", os.Getenv("DB_URL"))
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
// initiate queue with postgres connection
queue := pgjobs.NewQueue(db)
if err := queue.SetupSchema(ctx); err != nil {
panic(err)
}
job := jobs.NewEmailUser("[email protected]")
// enequeue an example job for immediate execution
if err = queue.Enqueue(context.Background(), job, "default"); err != nil {
log.Printf("error enqueueing: %v", err)
}
// enequeue an example job for execution in 10s+
if err = queue.EnqueueAt(context.Background(), job, "default", time.Now().Add(10*time.Second)); err != nil {
log.Printf("error enqueueing: %v", err)
}
// start worker and pass all processable jobs
// note: this worker will only process the passed jobs
queues := []string{"default"}
if err := queue.Worker(ctx, queues, &jobs.EmailUser{}); err != nil {
log.Println(err)
}
}
- The polling interval for workers can be adjusted via
pgjobs.PollInterval
. - For all other configuration, it is currently recommended to create a fork and adjust as needed.
- Make job processing more robust by using a transaction
- Implement
attempt
handling - Add error handling and retries?
- Add scheduled execution
- Remove
github.com/lib/pq
dependency - Add more tests (dequeing, reflection)
- Add priority queuing