Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: new JOBS API #142

Merged
merged 11 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .githooks/pre-commit
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

set -e -o pipefail

# https://github.com/koalaman/shellcheck/wiki/SC2039#redirect-both-stdout-and-stderr
if ! command -v golangci-lint 2>&1 /dev/null; then
echo "golangci-lint is not installed"
exit 1
fi

exec golangci-lint --build-tags=race run "$@"
2 changes: 1 addition & 1 deletion .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ jobs:
- name: Run linter
uses: golangci/[email protected] # Action page: <https://github.com/golangci/golangci-lint-action>
with:
version: v1.54 # without patch version
version: v1.59 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m --build-tags=race ./...
10 changes: 5 additions & 5 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ jobs:
timeout-minutes: 60
strategy:
matrix:
php: [ "8.2" ]
go: [ stable ]
os: [ "ubuntu-latest" ]
php: ["8.3"]
go: [stable]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v5 # action page: <https://github.com/actions/setup-go>
Expand Down Expand Up @@ -71,13 +71,13 @@ jobs:
sudo cp mkcert-v*-linux-amd64 /usr/local/bin/mkcert
mkcert -install
mkcert localhost 127.0.0.1 ::1
mkcert -client localhost 127.0.0.1 ::1
mkcert -client localhost 127.0.0.1 ::1
cp -r localhost+2-client-key.pem localhost+2-client.pem localhost+2-key.pem localhost+2.pem test-certs/
cp -r $(mkcert -CAROOT)/rootCA.pem test-certs/

docker compose -f env/docker-compose-amqp.yaml up -d
sleep 30

mkdir ./coverage-ci
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/amqp.out -covermode=atomic jobs_amqp_test.go
docker compose -f env/docker-compose-amqp.yaml down
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/linux_durability.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ jobs:
timeout-minutes: 60
strategy:
matrix:
php: [ "8.2" ]
go: [ stable ]
os: [ "ubuntu-latest" ]
php: ["8.3"]
go: [stable]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v5 # action page: <https://github.com/actions/setup-go>
Expand Down Expand Up @@ -64,7 +64,7 @@ jobs:
cd tests
docker compose -f env/docker-compose-amqp.yaml up -d
sleep 30

mkdir ./coverage-ci
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/amqp.out -covermode=atomic jobs_amqp_durability_test.go
docker compose -f env/docker-compose-amqp.yaml down
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
vendor/**
.idea
.DS_Store
**/composer.lock
14 changes: 3 additions & 11 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,11 @@

run:
timeout: 1m
skip-dirs:
- .github
- .git
allow-parallel-runners: true

output:
format: colored-line-number # colored-line-number|line-number|json|tab|checkstyle|code-climate

linters-settings:
wsl:
allow-assign-and-anything: true
govet:
check-shadowing: true
golint:
min-confidence: 0.1
godot:
scope: declarations
capital: true
Expand All @@ -34,7 +24,6 @@ linters-settings:
range-loops: true
for-loops: true
nolintlint:
allow-leading-space: false
require-specific: true

linters: # All available linters list: <https://golangci-lint.run/usage/linters/>
Expand Down Expand Up @@ -75,6 +64,9 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- whitespace # Tool for detection of leading and trailing whitespace

issues:
exclude-dirs:
- .github
- .git
exclude-rules:
- path: _test\.go
linters:
Expand Down
34 changes: 28 additions & 6 deletions amqpjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
"github.com/goccy/go-json"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
"github.com/roadrunner-server/api/v4/plugins/v4/jobs"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/events"
jprop "go.opentelemetry.io/contrib/propagators/jaeger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
Expand Down Expand Up @@ -47,6 +48,11 @@ type Driver struct {
tracer *sdktrace.TracerProvider
prop propagation.TextMapPropagator

// events
eventsCh chan events.Event
eventBus *events.Bus
id string

// amqp connection notifiers
notifyCloseConnCh chan *amqp.Error
notifyClosePubCh chan *amqp.Error
Expand Down Expand Up @@ -98,8 +104,8 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg

prop := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}, jprop.Jaeger{})
otel.SetTextMapPropagator(prop)
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
// we need to get two parts of the amqp information here.
// first part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
// if no such key - error
if !cfg.Has(configKey) {
Expand Down Expand Up @@ -129,6 +135,9 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
}
// PARSE CONFIGURATION END -------

eventsCh := make(chan events.Event, 1)
eventBus, id := events.NewEventBus()

jb := &Driver{
tracer: tracer,
prop: prop,
Expand All @@ -137,6 +146,11 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
stopCh: make(chan struct{}, 1),
consumeAll: conf.ConsumeAll,

// events
eventsCh: eventsCh,
eventBus: eventBus,
id: id,

priority: conf.Priority,
delayed: ptrTo(int64(0)),

Expand Down Expand Up @@ -217,8 +231,8 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *

prop := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}, jprop.Jaeger{})
otel.SetTextMapPropagator(prop)
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
// we need to get two parts of the amqp information here.
// first part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information

// only global section
Expand All @@ -244,6 +258,9 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *
log.Error("prefetch parse, driver will use default (10) prefetch", zap.String("prefetch", pipeline.String(prefetch, "10")))
}

eventsCh := make(chan events.Event, 1)
eventBus, id := events.NewEventBus()

jb := &Driver{
prop: prop,
tracer: tracer,
Expand All @@ -252,6 +269,11 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *
stopCh: make(chan struct{}, 1),
delayed: ptrTo(int64(0)),

// events
eventsCh: eventsCh,
eventBus: eventBus,
id: id,

publishChan: make(chan *amqp.Channel, 1),
stateChan: make(chan *amqp.Channel, 1),

Expand Down Expand Up @@ -589,7 +611,7 @@ func (d *Driver) Resume(ctx context.Context, p string) error {
// run listener
d.listener(deliv)

// increase number of listeners
// increase the number of listeners
atomic.AddUint32(&d.listeners, 1)
d.log.Debug("pipeline was resumed",
zap.String("driver", pipe.Driver()),
Expand Down
50 changes: 41 additions & 9 deletions amqpjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package amqpjobs
import (
"context"
stderr "errors"
"maps"
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
"github.com/roadrunner-server/api/v4/plugins/v4/jobs"
"github.com/roadrunner-server/errors"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -41,7 +42,7 @@ type Options struct {
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for. Defaults to none.
Delay int64 `json:"delay,omitempty"`
Delay int `json:"delay,omitempty"`
// AutoAck option
AutoAck bool `json:"auto_ack"`
// AMQP Queue
Expand Down Expand Up @@ -131,6 +132,25 @@ func (i *Item) Ack() error {
return i.Options.ack(i.Options.multipleAck)
}

func (i *Item) NackWithOptions(requeue bool, delay int) error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}

if requeue {
// if delay is set, requeue with delay via non-native requeue
if delay > 0 {
return i.Requeue(nil, delay)
// if delay is not set, requeue via native requeue
}

return i.Options.nack(false, true)
}

// otherwise, nack without requeue
return i.Options.nack(false, false)
}

func (i *Item) Nack() error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
Expand All @@ -142,7 +162,7 @@ func (i *Item) Nack() error {
}

// Requeue with the provided delay, handled by the Nack
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
func (i *Item) Requeue(headers map[string][]string, delay int) error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}
Expand All @@ -152,7 +172,13 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {

// overwrite the delay
i.Options.Delay = delay
i.headers = headers
if i.headers == nil {
i.headers = make(map[string][]string)
}

if len(headers) > 0 {
maps.Copy(i.headers, headers)
}

err := i.Options.requeueFn(context.Background(), i)
if err != nil {
Expand All @@ -164,7 +190,7 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return err
}

// ack the job
// ack the previous message to avoid duplicates
err = i.Options.ack(false)
if err != nil {
return err
Expand Down Expand Up @@ -215,7 +241,7 @@ func fromJob(job jobs.Message) *Item {
Options: &Options{
Priority: job.Priority(),
Pipeline: job.GroupID(),
Delay: job.Delay(),
Delay: int(job.Delay()),
AutoAck: job.AutoAck(),
},
}
Expand Down Expand Up @@ -278,9 +304,15 @@ func (d *Driver) unpack(deliv amqp.Delivery) *Item {
}

if t, ok := deliv.Headers[jobs.RRDelay]; ok {
switch t.(type) {
case int, int16, int32, int64:
item.Options.Delay = t.(int64)
switch tt := t.(type) {
case int:
item.Options.Delay = tt
case int16:
item.Options.Delay = int(tt)
case int32:
item.Options.Delay = int(tt)
case int64:
item.Options.Delay = int(tt)
default:
d.log.Warn("unknown delay type", zap.Strings("want", []string{"int, int16, int32, int64"}), zap.Any("actual", t))
}
Expand Down
2 changes: 1 addition & 1 deletion amqpjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (d *Driver) listener(deliv <-chan amqp.Delivery) {
}

d.log.Debug("delivery channel was closed, leaving the AMQP listener")
// reduce number of listeners
// reduce the number of listeners
if atomic.LoadUint32(&d.listeners) == 0 {
d.log.Debug("number of listeners", zap.Uint32("listeners", atomic.LoadUint32(&d.listeners)))
return
Expand Down
9 changes: 7 additions & 2 deletions amqpjobs/redial.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"github.com/cenkalti/backoff/v4"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/events"
"go.uber.org/zap"
)

const (
// pipeline operation
restartStr string = "restart"
ConnCloseType string = "connection"
ConsumeCloseType string = "consume"
PublishCloseType string = "publish"
Expand Down Expand Up @@ -129,7 +132,7 @@ func (d *Driver) redialer() { //nolint:gocognit,gocyclo
pch := <-d.publishChan
stCh := <-d.stateChan

// cancel new deliviries
// cancel new deliveries
err := pch.Cancel(d.consumeID, false)
if err != nil {
d.log.Error("consumer cancel", zap.Error(err), zap.String("consumerID", d.consumeID))
Expand Down Expand Up @@ -319,7 +322,9 @@ func (d *Driver) redial(rm *redialMsg) {

retryErr := backoff.Retry(operation, expb)
if retryErr != nil {
d.log.Error("backoff operation failed", zap.Error(retryErr))
d.log.Error("backoff operation failed, pipeline will be recreated", zap.Error(retryErr))
// recreate pipeline on fail
d.eventsCh <- events.NewEvent(events.EventJOBSDriverCommand, pipe.Name(), restartStr)
return
}

Expand Down
7 changes: 7 additions & 0 deletions githooks-installer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/sh

set -e

cp ./.githooks/pre-commit .git/hooks/pre-commit

echo "DONE"
Loading
Loading