Skip to content

Commit

Permalink
feat: Implemented Exact Once triggering for NATS event bus (#873)
Browse files Browse the repository at this point in the history
* feat: Implemented Exact Once triggering for NATS event bus

* update comments

* still comments

* fix doc format

* fix typo

* lowercase id

* correct start position

* ackwait change back to 1 sec

* log minor change
  • Loading branch information
whynowy authored Sep 25, 2020
1 parent 7864737 commit 166e36d
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 16 deletions.
2 changes: 1 addition & 1 deletion docs/eventsources/naming.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dependencies:
eventName: example
```
The `eventSourceName` ad `eventName` might be confusing. Take the following
The `eventSourceName` and `eventName` might be confusing. Take the following
EventSource example, the `eventSourceName` and `eventName` are described as
below.

Expand Down
18 changes: 9 additions & 9 deletions docs/webhook-authentication.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ Then add `authSecret` to your `webhook` EventSource.
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: webhook
name: webhook
spec:
webhook:
example:
port: "12000"
endpoint: /example
method: POST
authSecret:
name: my-webhook-token
key: my-token
webhook:
example:
port: "12000"
endpoint: /example
method: POST
authSecret:
name: my-webhook-token
key: my-token
```
Now you can authenticate your webhook endpoint with the configured token.
Expand Down
64 changes: 59 additions & 5 deletions eventbus/driver/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"strings"
"sync"
"time"

"github.com/Knetic/govaluate"
Expand Down Expand Up @@ -148,25 +149,61 @@ func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connecti
n.processEventSourceMsg(m, msgHolder, filter, action, log)
}, stan.DurableName(durableName),
stan.SetManualAckMode(),
stan.StartAt(pb.StartPosition_LastReceived),
stan.AckWait(3*time.Second),
stan.StartAt(pb.StartPosition_NewOnly),
stan.AckWait(1*time.Second),
stan.MaxInflight(len(msgHolder.depNames)+2))
if err != nil {
log.Errorf("failed to subscribe to subject %s", n.subject)
return err
}
log.Infof("Subscribed to subject %s ...", n.subject)

// Daemon to evict cache
wg := &sync.WaitGroup{}
cacheEvictorStopCh := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
log.Info("starting ExactOnce cache clean up daemon ...")
ticker := time.NewTicker(60 * time.Second)
for {
select {
case <-cacheEvictorStopCh:
log.Info("exiting ExactOnce cache clean up daemon...")
return
case <-ticker.C:
now := time.Now().UnixNano()
num := 0
msgHolder.smap.Range(func(key, value interface{}) bool {
v := value.(int64)
// Evict cached ID older than 5 minutes
if now-v > 5*60*1000*1000*1000 {
msgHolder.smap.Delete(key)
num++
log.Debugw("cached ID evicted", "id", key)
}
return true
})
log.Infof("finished evicting %v cached IDs, time cost: %v ms", num, (time.Now().UnixNano()-now)/1000/1000)
}
}
}()

for {
select {
case <-ctx.Done():
log.Info("existing, unsubscribing and closing connection...")
_ = sub.Close()
log.Infof("subscription on subject %s closed", n.subject)
cacheEvictorStopCh <- struct{}{}
wg.Wait()
return nil
case <-closeCh:
log.Info("closing subscription...")
_ = sub.Close()
log.Infof("subscription on subject %s closed", n.subject)
cacheEvictorStopCh <- struct{}{}
wg.Wait()
return nil
}
}
Expand Down Expand Up @@ -202,14 +239,22 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc
}
}

// NATS Streaming guarantees At Least Once delivery,
// so need to check if the message is duplicate
if _, ok := msgHolder.smap.Load(event.ID()); ok {
log.Infow("ATTENTION: Duplicate delivered message detected", "message", m)
_ = m.Ack()
return
}

// Clean up old messages before starting a new round
if msgHolder.lastMeetTime > 0 || msgHolder.latestGoodMsgTimestamp > 0 {
// ACK all the old messages after conditions meet
if m.Timestamp <= msgHolder.latestGoodMsgTimestamp {
if depName != "" {
msgHolder.reset(depName)
}
_ = m.Ack()
msgHolder.ackAndCache(m, event.ID())
return
}
return
Expand All @@ -222,7 +267,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc
return
} else if m.Timestamp < existingMsg.timestamp {
// Redelivered old message, ack and return
_ = m.Ack()
msgHolder.ackAndCache(m, event.ID())
return
}
}
Expand Down Expand Up @@ -260,7 +305,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc
go action(messages)

msgHolder.reset(depName)
_ = m.Ack()
msgHolder.ackAndCache(m, event.ID())
}

// eventSourceMessage is used by messageHolder to hold the latest message
Expand All @@ -282,6 +327,8 @@ type eventSourceMessageHolder struct {
sourceDepMap map[string]string
parameters map[string]interface{}
msgs map[string]*eventSourceMessage
// A sync map used to cache the message IDs, it is used to guarantee Exact Once triggering
smap *sync.Map
}

func newEventSourceMessageHolder(dependencyExpr string, dependencies []Dependency) (*eventSourceMessageHolder, error) {
Expand Down Expand Up @@ -315,6 +362,7 @@ func newEventSourceMessageHolder(dependencyExpr string, dependencies []Dependenc
sourceDepMap: srcDepMap,
parameters: parameters,
msgs: msgs,
smap: new(sync.Map),
}, nil
}

Expand All @@ -331,6 +379,12 @@ func (mh *eventSourceMessageHolder) getDependencyName(eventSourceName, eventName
return "", nil
}

// Ack the stan message and cache the ID to make sure Exact Once triggering
func (mh *eventSourceMessageHolder) ackAndCache(m *stan.Msg, id string) {
_ = m.Ack()
mh.smap.Store(id, time.Now().UnixNano())
}

// Reset the parameter and message that a dependency holds
func (mh *eventSourceMessageHolder) reset(depName string) {
mh.parameters[depName] = false
Expand Down
3 changes: 2 additions & 1 deletion eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -284,7 +285,7 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
logger.Error("failed to reconnect to eventbus", zap.Error(err))
continue
}
logger.Info("reconnected the NATS streaming server...")
logger.Info("reconnected to eventbus successfully")
}
}
}
Expand Down

0 comments on commit 166e36d

Please sign in to comment.