Skip to content

Commit

Permalink
Customizable retry parameters and improved logging (#776)
Browse files Browse the repository at this point in the history
* Customizable retry parameters and improved logging

* consolidate into struct
  • Loading branch information
davidzhao committed Aug 2, 2024
1 parent 0584f0c commit cc4df56
Showing 1 changed file with 77 additions and 4 deletions.
81 changes: 77 additions & 4 deletions webhook/url_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@ import (
)

type URLNotifierParams struct {
HTTPClientParams
Logger logger.Logger
QueueSize int
URL string
APIKey string
APISecret string
}

type HTTPClientParams struct {
RetryWaitMin time.Duration
RetryWaitMax time.Duration
MaxRetries int
ClientTimeout time.Duration
}

const defaultQueueSize = 100

// URLNotifier is a QueuedNotifier that sends a POST request to a Webhook URL.
Expand All @@ -59,9 +67,22 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier {
params.Logger = logger.GetLogger()
}

rhc := retryablehttp.NewClient()
if params.RetryWaitMin > 0 {
rhc.RetryWaitMin = params.RetryWaitMin
}
if params.RetryWaitMax > 0 {
rhc.RetryWaitMax = params.RetryWaitMax
}
if params.MaxRetries > 0 {
rhc.RetryMax = params.MaxRetries
}
if params.ClientTimeout > 0 {
rhc.HTTPClient.Timeout = params.ClientTimeout
}
n := &URLNotifier{
params: params,
client: retryablehttp.NewClient(),
client: rhc,
}
n.client.Logger = &logAdapter{}
n.worker = core.NewQueueWorker(core.QueueWorkerParams{
Expand All @@ -80,12 +101,21 @@ func (n *URLNotifier) SetKeys(apiKey, apiSecret string) {
}

func (n *URLNotifier) QueueNotify(event *livekit.WebhookEvent) error {
enqueuedAt := time.Now()
n.worker.Submit(func() {
if err := n.send(event); err != nil {
n.params.Logger.Warnw("failed to send webhook", err, "url", n.params.URL, "event", event.Event)
fields := logFields(event)
fields = append(fields,
"url", n.params.URL,
"queueDuration", time.Since(enqueuedAt),
)
sentStart := time.Now()
err := n.send(event)
fields = append(fields, "sendDuration", time.Since(sentStart))
if err != nil {
n.params.Logger.Warnw("failed to send webhook", err, fields...)
n.dropped.Add(event.NumDropped + 1)
} else {
n.params.Logger.Infow("sent webhook", "url", n.params.URL, "event", event.Event, "eventDetails", logger.Proto(event))
n.params.Logger.Infow("sent webhook", fields...)
}
})
return nil
Expand Down Expand Up @@ -141,3 +171,46 @@ func (n *URLNotifier) send(event *livekit.WebhookEvent) error {
type logAdapter struct{}

func (l *logAdapter) Printf(string, ...interface{}) {}

func logFields(event *livekit.WebhookEvent) []interface{} {
fields := make([]interface{}, 0, 20)
fields = append(fields,
"event", event.Event,
"id", event.Id,
"webhookTime", event.CreatedAt,
)

if event.Room != nil {
fields = append(fields,
"room", event.Room.Name,
"roomID", event.Room.Sid,
)
}
if event.Participant != nil {
fields = append(fields,
"participant", event.Participant.Identity,
"pID", event.Participant.Sid,
)
}
if event.EgressInfo != nil {
fields = append(fields,
"egressID", event.EgressInfo.EgressId,
"status", event.EgressInfo.Status,
)
if event.EgressInfo.Error != "" {
fields = append(fields, "error", event.EgressInfo.Error)
}
}
if event.IngressInfo != nil {
fields = append(fields,
"ingressID", event.IngressInfo.IngressId,
)
if event.IngressInfo.State != nil {
fields = append(fields, "status", event.IngressInfo.State.Status)
if event.IngressInfo.State.Error != "" {
fields = append(fields, "error", event.IngressInfo.State.Error)
}
}
}
return fields
}

0 comments on commit cc4df56

Please sign in to comment.