Skip to content

Commit

Permalink
Merge branch 'argoproj:master' into feature/zg
Browse files Browse the repository at this point in the history
  • Loading branch information
talebzeghmi authored Jul 12, 2024
2 parents 3670ac9 + 2451789 commit 9daefb8
Show file tree
Hide file tree
Showing 25 changed files with 680 additions and 342 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ debug.test
site/
/go-diagrams/
argo-events
.swo
.swp
# ignore temp vi files
*.swo
*.swp
3 changes: 2 additions & 1 deletion api/event-bus.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion api/event-bus.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@
"type": "array"
},
"streamConfig": {
"description": "Optional configuration for the streams to be created in this JetStream service, if specified, it will be merged with the default configuration in controller-config. It accepts a YAML format configuration, available fields include, \"maxBytes\", \"maxMsgs\", \"maxAge\" (e.g. 72h), \"replicas\" (1, 3, 5), \"duplicates\" (e.g. 5m).",
"description": "Optional configuration for the streams to be created in this JetStream service, if specified, it will be merged with the default configuration in controller-config. It accepts a YAML format configuration, available fields include, \"maxBytes\", \"maxMsgs\", \"maxAge\" (e.g. 72h), \"replicas\" (1, 3, 5), \"duplicates\" (e.g. 5m), \"retention\" (e.g. 0: Limits (default), 1: Interest, 2: WorkQueue), \"Discard\" (e.g. 0: DiscardOld (default), 1: DiscardNew).",
"type": "string"
},
"tolerations": {
Expand Down Expand Up @@ -4514,6 +4514,10 @@
"description": "AtLeastOnce determines the trigger execution semantics. Defaults to false. Trigger execution will use at-most-once semantics. If set to true, Trigger execution will switch to at-least-once semantics.",
"type": "boolean"
},
"dlqTrigger": {
"$ref": "#/definitions/io.argoproj.sensor.v1alpha1.Trigger",
"description": "If the trigger fails, it will retry up to the configured number of retries. If the maximum retries are reached and the trigger is set to execute atLeastOnce, the dead letter queue (DLQ) trigger will be invoked if specified. Invoking the dead letter queue trigger helps prevent data loss."
},
"parameters": {
"description": "Parameters is the list of parameters applied to the trigger template definition",
"items": {
Expand Down
6 changes: 5 additions & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion api/sensor.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion api/sensor.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 36 additions & 7 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,52 @@ func validateTriggers(triggers []v1alpha1.Trigger) error {
trigNames := make(map[string]bool)

for _, trigger := range triggers {
if err := validateTriggerTemplate(trigger.Template); err != nil {
if err := validateTrigger(trigger); err != nil {
return err
}
if _, ok := trigNames[trigger.Template.Name]; ok {
return fmt.Errorf("duplicate trigger name: %s", trigger.Template.Name)
}
trigNames[trigger.Template.Name] = true
if err := validateTriggerPolicy(&trigger); err != nil {
return err
}
if err := validateTriggerTemplateParameters(&trigger); err != nil {
return err
}
}
return nil
}

func validateTrigger(trigger v1alpha1.Trigger) error {
if err := validateTriggerTemplate(trigger.Template); err != nil {
return err
}
if err := validateTriggerPolicy(&trigger); err != nil {
return err
}
if err := validateTriggerTemplateParameters(&trigger); err != nil {
return err
}
if err := validateDlqTrigger(&trigger); err != nil {
return err
}

return nil
}

// validateDlqTrigger validates trigger.atLeastOnce==true and the trigger.dlqTrigger
func validateDlqTrigger(trigger *v1alpha1.Trigger) error {
if trigger == nil {
return fmt.Errorf("trigger can't be nil")
}
if trigger.DlqTrigger == nil {
return nil
}
if !trigger.AtLeastOnce {
return fmt.Errorf("to use dlqTrigger, trigger.atLeastOnce must be set to true")
}
if !trigger.DlqTrigger.AtLeastOnce {
return fmt.Errorf("atLeastOnce must be set to true within the dlqTrigger")
}

return validateTrigger(*trigger.DlqTrigger)
}

// validateTriggerTemplate validates trigger template
func validateTriggerTemplate(template *v1alpha1.TriggerTemplate) error {
if template == nil {
Expand Down
80 changes: 80 additions & 0 deletions controllers/sensor/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,86 @@ func TestValidTriggers(t *testing.T) {
assert.Equal(t, true, strings.Contains(err.Error(), "trigger template can't be nil"))
})

t.Run("vanilla dlqTrigger", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
AtLeastOnce: true,
DlqTrigger: &v1alpha1.Trigger{
AtLeastOnce: true,
Template: &v1alpha1.TriggerTemplate{
Name: "dlq-fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
},
}
err := validateTriggers(triggers)
assert.Nil(t, err)
})

t.Run("!dlqTrigger.atLeastOnce", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
AtLeastOnce: true,
DlqTrigger: &v1alpha1.Trigger{
Template: &v1alpha1.TriggerTemplate{
Name: "dlq-fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
},
}
err := validateTriggers(triggers)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "atLeastOnce must be set to true within the dlqTrigger"))
})

t.Run("dlqTrigger !.atLeastOnce", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
DlqTrigger: &v1alpha1.Trigger{
Template: &v1alpha1.TriggerTemplate{
Name: "dlq-fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
},
}
err := validateTriggers(triggers)
assert.NotNil(t, err)
assert.Equal(t, true, strings.Contains(err.Error(), "to use dlqTrigger, trigger.atLeastOnce must be set to true"))
})

t.Run("invalid conditions reset - cron", func(t *testing.T) {
triggers := []v1alpha1.Trigger{
{
Expand Down
40 changes: 40 additions & 0 deletions docs/sensors/more-about-sensors-and-triggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,43 @@ spec:
# Optional
revisionHistoryLimit: 3
```

## Dead Letter Queue Trigger

To help avoid data loss and dropping a message on failure after all the retries are
exhausted, optionally, a `dlqTrigger` may be configured as following to invoke
any of the [10+ triggers](https://argoproj.github.io/argo-events/concepts/trigger/):

```yaml
spec:
triggers:
- template:
name: http-trigger
http:
url: https://xxxxx.com/
method: GET
# must be true for dlqTrigger
atLeastOnce: true
retryStrategy:
steps: 3
dlqTrigger:
template:
name: dlq-http-trigger
http:
url: https://xxxxx.com/
method: PUT
# must be true for dlqTrigger
atLeastOnce: true
# retries the dlqTrigger 5 times
retryStrategy:
steps: 5
```

If the trigger fails, it will retry up to the configured number of retries based
on `retryStrategy`. If the maximum retries are reached and the trigger, the
`dlqTrigger` will be invoked if specified. In order to use the `dlqTrigger`,
the `atLeastOnce` must be set to true within the trigger and the `dlqTrigger` for
the Sensor to know about the failure and invoke the `dlqTrigger`.

**note:** `dlqTrigger` is only available for the top level trigger and not
*recursively within the `dlqTrigger` template.
33 changes: 31 additions & 2 deletions eventbus/jetstream/base/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,25 @@ func (stream *Jetstream) CreateStream(conn *JetstreamConnection) error {
return err
}

v.SetDefault("retention", 0) // Limits
v.SetDefault("discard", 0) // DiscardOld

retentionPolicy, err := intToRetentionPolicy(v.GetInt("retention"))
if err != nil {
stream.Logger.Errorf("invalid retention policy: %s, error: %v", retentionPolicy, err)
return err
}

discardPolicy, err := intToDiscardPolicy(v.GetInt("discard"))
if err != nil {
stream.Logger.Errorf("invalid discard policy: %s, error: %v", discardPolicy, err)
return err
}
streamConfig := nats.StreamConfig{
Name: common.JetStreamStreamName,
Subjects: []string{common.JetStreamStreamName + ".*.*"},
Retention: nats.LimitsPolicy,
Discard: nats.DiscardOld,
Retention: retentionPolicy,
Discard: discardPolicy,
MaxMsgs: v.GetInt64("maxMsgs"),
MaxAge: v.GetDuration("maxAge"),
MaxBytes: v.GetInt64("maxBytes"),
Expand All @@ -158,3 +172,18 @@ func (stream *Jetstream) CreateStream(conn *JetstreamConnection) error {
stream.Logger.Infof("Created Jetstream stream '%s' for connection %+v", common.JetStreamStreamName, conn)
return nil
}

func intToRetentionPolicy(i int) (nats.RetentionPolicy, error) {
if i < 0 || i > int(nats.WorkQueuePolicy) {
// Handle invalid value, return a default value or panic
return -1, fmt.Errorf("invalid int for RetentionPolicy: %d", i)
}
return nats.RetentionPolicy(i), nil
}

func intToDiscardPolicy(i int) (nats.DiscardPolicy, error) {
if i < 0 || i > int(nats.DiscardNew) {
return -1, fmt.Errorf("invalid int for DiscardPolicy: %d", i)
}
return nats.DiscardPolicy(i), nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/nats-io/nats-server/v2 v2.9.23 // indirect
github.com/nats-io/nats-streaming-server v0.24.3 // indirect
github.com/nats-io/nats-streaming-server v0.24.6 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nicksnyder/go-i18n v1.10.1-0.20190510212457-b280125b035a // indirect
Expand Down
Loading

0 comments on commit 9daefb8

Please sign in to comment.