Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(#27): address ambiguity for score fields, allow assign the priority manually #29

Merged
merged 21 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# TODO: configure linter correctly
# Disabling staticcheck since we deprecated proto field
linters:
disable:
- staticcheck
32 changes: 16 additions & 16 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,22 @@ You may also download the latest release from the [releases](https://github.com/
>
> To change the default configuration see the [configuration section](/README.md?#configuration).

### Mocks

We use the [GoMock](https://github.com/golang/mock) project to generate mocks.

To update the mocks you must run the following command:
```shell
make gen-mocks
```

Mocks are generated in the [/internal/mocks](/internal/mocks) folder.

When creating interfaces with the need to generate mocks, you must add the following directive to the interface file:
```go
//go:generate mockgen -destination=<path_to_mocks>/mock_<file>.go -package=mocks -source=<file>.go
```

## Running tests

To run project tests you must first generate all mock files with the following command:
Expand Down Expand Up @@ -202,22 +218,6 @@ make gen-cert
>
> Check the [generation script](/internal/service/cert/gen.sh) for more details.

### Mocks

We use the [GoMock](https://github.com/golang/mock) project to generate mocks.

To update the mocks you must run the following command:
```shell
make gen-mocks
```

Mocks are generated in the [/internal/mocks](/internal/mocks) folder.

When creating interfaces with the need to generate mocks, you must add the following directive to the interface file:
```go
//go:generate mockgen -destination=<path_to_mocks>/mock_<file>.go -package=mocks -source=<file>.go
```

## Docker Image

To generate the Deckard image we use the [ko](https://github.com/google/ko) tool.
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@

[![Slack](https://img.shields.io/badge/slack-Gophers_%28Deckard%29-blue?logo=slack&link=https://gophers.slack.com/archives/C05E1TMS1FY)](https://gophers.slack.com/archives/C05E1TMS1FY)

Deckard is a priority queue system inspired by projects such as Google Cloud PubSub, Nats, Kafka, and others. Its main distinction lies in its ability to associate a priority with each message and have a queue that can be optionally cyclic. This means that messages can be delivered again after a user-managed time. Additionally, Deckard implements a locking mechanism to prevent message processing for a specified duration.
Deckard is a priority queue system inspired by projects such as Google Cloud PubSub, Nats, Kafka, and others. Its main distinction lies in its ability to associate a priority score with each message and have a queue that can be optionally cyclic. This means that messages can be delivered again after a user-managed time. Additionally, Deckard implements a locking mechanism to prevent message processing for a specified duration.

![deckard](docs/deckard_cartoon.webp)

Briefly:
- An application inserts a message to be queued and its configuration (TTL, metadata, payload, etc).
- The message will be prioritized with a default timestamp-based algorithm. The priority can also be provided by the application.
- An application inserts a message to be queued and its configuration (TTL, metadata, payload, priority score, etc).
- The message will be prioritized with a default timestamp-based algorithm if the provided score is 0 (the default value).
- A worker application pull messages from Deckard at regular intervals and performs any processing.
- When it finishes processing a message, the application must notify with the processing result.
- When notifying, the application may provide a lock time, to lock the message for a certain duration of time before being requeued and delivered again.
- It is also possible to notify a message changing its priority.
- It is also possible to notify a message changing its priority score.
- When the message's TTL is reached, it stops being delivered;
- For some use cases the TTL can be set as infinite.
- An application can also remove the message when notifying.
Expand Down
501 changes: 294 additions & 207 deletions deckard_service.pb.go

Large diffs are not rendered by default.

57 changes: 35 additions & 22 deletions deckard_service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/go-redis/redis/v8 v8.11.5
github.com/gobwas/glob v0.2.3
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_golang v1.14.0
Expand Down Expand Up @@ -42,7 +43,6 @@ require (
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-redis/redis/extra/rediscmd/v8 v8.11.5 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions internal/audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
"time"

"github.com/takenet/deckard/internal/config"
"github.com/takenet/deckard/internal/dtime"
"github.com/takenet/deckard/internal/logger"
"github.com/takenet/deckard/internal/metrics"
"github.com/takenet/deckard/internal/project"
"github.com/takenet/deckard/internal/queue/entities"
"github.com/takenet/deckard/internal/queue/utils"
"github.com/takenet/deckard/internal/queue/message"
"go.uber.org/zap"

"github.com/elastic/go-elasticsearch/v7"
Expand All @@ -36,7 +36,7 @@ type Entry struct {
Queue string `json:"queue"`
QueuePrefix string `json:"queue_prefix"`
QueueSuffix string `json:"queue_suffix"`
LastScoreSubtract float64 `json:"last_score_subtract"`
LastScoreSubtract float64 `json:"last_score_subtract"` // deprecated
Timestamp time.Time `json:"timestamp"`
Breakpoint string `json:"breakpoint"`
Signal Signal `json:"signal"`
Expand Down Expand Up @@ -146,9 +146,9 @@ func (a *AuditorImpl) send(ctx context.Context, entries ...Entry) {
return
}

start := time.Now()
start := dtime.Now()
defer func() {
metrics.AuditorStoreLatency.Record(ctx, utils.ElapsedTime(start))
metrics.AuditorStoreLatency.Record(ctx, dtime.ElapsedTime(start))
}()

body := ""
Expand Down Expand Up @@ -192,9 +192,9 @@ func (a *AuditorImpl) Store(ctx context.Context, entry Entry) {
return
}

entry.Timestamp = time.Now()
entry.Timestamp = dtime.Now()

queuePrefix, queueSuffix := entities.GetQueueParts(entry.Queue)
queuePrefix, queueSuffix := message.GetQueueParts(entry.Queue)

entry.QueuePrefix = queuePrefix

Expand All @@ -203,7 +203,7 @@ func (a *AuditorImpl) Store(ctx context.Context, entry Entry) {
}

defer func() {
metrics.AuditorAddToStoreLatency.Record(ctx, utils.ElapsedTime(entry.Timestamp))
metrics.AuditorAddToStoreLatency.Record(ctx, dtime.ElapsedTime(entry.Timestamp))
}()

a.entries <- entry
Expand Down
10 changes: 5 additions & 5 deletions internal/cmd/deckard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (

"github.com/takenet/deckard/internal/audit"
"github.com/takenet/deckard/internal/config"
"github.com/takenet/deckard/internal/dtime"
"github.com/takenet/deckard/internal/logger"
"github.com/takenet/deckard/internal/metrics"
"github.com/takenet/deckard/internal/queue"
"github.com/takenet/deckard/internal/queue/cache"
"github.com/takenet/deckard/internal/queue/storage"
"github.com/takenet/deckard/internal/queue/utils"
"github.com/takenet/deckard/internal/service"
"github.com/takenet/deckard/internal/shutdown"
"github.com/takenet/deckard/internal/trace"
Expand Down Expand Up @@ -189,7 +189,7 @@ func startHouseKeeperJobs(pool *queue.Queue) {
shutdown.WaitGroup,
config.HousekeeperTaskTTLDelay.GetDuration(),
func() bool {
now := time.Now()
now := dtime.Now()

metrify, _ := queue.RemoveTTLMessages(ctx, pool, &now)

Expand Down Expand Up @@ -222,15 +222,15 @@ func scheduleTask(taskName string, lock *sync.Mutex, taskWaitGroup *sync.WaitGro
}

func executeTask(taskName string, fn func() bool) {
now := time.Now()
now := dtime.Now()
var metrify bool
defer func() {
if metrify {
metrics.HousekeeperTaskLatency.Record(ctx, utils.ElapsedTime(now), attribute.String("task", taskName))
metrics.HousekeeperTaskLatency.Record(ctx, dtime.ElapsedTime(now), attribute.String("task", taskName))
}
}()

metrify = fn()

logger.S(ctx).Debug("Finished ", taskName, " task. Took ", utils.ElapsedTime(now), ".")
logger.S(ctx).Debug("Finished ", taskName, " task. Took ", dtime.ElapsedTime(now), ".")
}
Loading