-
Notifications
You must be signed in to change notification settings - Fork 31
Async flushing of writer payloads with jittery exponential retrying #344
Conversation
a41a2f8
to
4626e73
Compare
34e4597
to
bd36cb0
Compare
@@ -7,9 +7,16 @@ import ( | |||
"github.com/DataDog/datadog-trace-agent/config" | |||
) | |||
|
|||
// StatsClient represents a client capable of sending stats to some stat endpoint. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!
Looks like a small change which could be an easy/quick PR on its own (with all the TestStatsClient
stuff).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Should reduce some load from this PR!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we still splitting this out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had forgotten about this sorry. Splitting it out will require some rebase gymnastics which might not make sense now if we are close to merging everything
@@ -69,7 +69,7 @@ func RandomTrace(maxLevels, maxSpans int) model.Trace { | |||
t := model.Trace{RandomSpan()} | |||
|
|||
prevLevel := t | |||
maxDepth := rand.Intn(maxLevels) | |||
maxDepth := 1 + rand.Intn(maxLevels) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: to keep maxLevels
meaningful, prefer a maxDepth := max(1, maxLevels)
? Or maybe the actual implementation is such as maxDepth == 1 + maxLevels? (in that case the method comment is misleading).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rand.Intn(x)
returns something in [0, x[
. So for the random trace to actually be in the range [1, x]
as it seems to be the intention from the method comment (since maxSpans is documented as inclusive, would seem strange to have one be exclusive and the other inclusive), we need to do the +1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
writer/trace_writer.go
Outdated
case <-flushTicker.C: | ||
w.Flush() | ||
log.Debug("Flushing current traces") | ||
w.payloadConstructor.ConstructAndSend() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that both a ticker in the Writer and a call to payloadConstructor.Add
can trigger a flush can make comprehension tricky (and not efficient as the tricker can trigger at moments the buffer is almost empty).
We probably want the "construct" decisions to be taken from a single place ; with the current model it seems simpler to have the ticker inside the payloadConstructor itself (and replace it with a Now() - lastFlush > FlushPeriod
condition).
writer/payload.go
Outdated
Run() | ||
Stop() | ||
Send(payload *Payload) | ||
OnSendSuccess(handler SendSuccessHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be more in the spirit of Go to use channels instead of callbacks here.
We could probably have a single channel if we propagate the code/status along the payload + err + stats.
But in practice, all of that is pretty similar (it is mostly about if we want to only communicate by channels or not).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recoded with a monitor channel.
|
||
var err error | ||
|
||
if !s.queuing { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having the retry decision here might block us if we have writer-specific retry logics (like the support of 413).
An option is to have the Sender communicates back to the writer, in an async channel, the result of each flush (containing payload + source data + response code + info about retries) ; then the writer takes the retry (and potential writer-specific split and changes) decision and push it back to the Sender input (one or more payloads). It can mean a bit more dupe code for the common case of 5xx response, but not much more. Plus it allows us things like not replaying at all under some conditions (for the service metadata, we don't have to flush them all ; the latest is the only we care about).
Another complementary idea is to just remove the 413 case, either by capping the size of the serialized payload (if too big, split as many times as required before flushing) or by considering that the cap on the number of spans/traces is enough (then disable the 413 feature on the server-side).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delegating all retry logic to writers would not only introduce dupe code, it would also make aligned retrying, retry limitation and total limits (say max bytes waiting for retry) a bit more complicated.
We seem to have 2 kinds of error with different needs:
- Some communication/temporary server issue (these would map to
RetriableError
). - Invalid input errors like the 413 one.
Technically, the first should be able to be retried transparently by the Sender. No info is needed from the writer and this way sender can abstract all size/num/retry limit from writer.
Writers only need to weight in for the second class of errors since these are the ones the sender cannot possibly retry.
I suggest something along these lines:
- Add
MaxQueuedPayloads
parameter toQueuablePayloadSender
. This gives us enough flexibility to support all our current cases (setMaxQueuedPayloads
to 1 for service metadata and you get the only retry latest functionality). - If something radically different is needed, a writer can always use a custom Sender on a case-by-case scenario while keeping the common cases clean and dupe-free.
- Add Promise-like responses to
PayloadSender.Send
:response.Then(...)
would run on successful sending.response.Catch(...)
would run on non-retriable error (either transport related like 413 or when we gave up on retrying at payload sender side). Here is where writer-specific logic should go to IMO. Split the payload, resend it as 2 new ones.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also offer a third, non-final promise 'resolution':
response.Retry(...)
allowing a Writer to veto a retry from the sender. It would slightly complicate the sender queue if done asynchronously because right now we are keeping the queue in-order but shouldn't be too difficult to implement and would allow writers to collect total error stats without the current Sender.OnError callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the MaxQueuedPayload parameter I had mentioned before. Promises would add too much complexity. Monitor channel should be sufficient provided that the writer is the only one listening on that channel.
45ed17a
to
33b1bab
Compare
@@ -0,0 +1,84 @@ | |||
package backoff |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be amazing to add a package level doc (backoff/doc.go
) explaining in more detail what the package does and what backoff is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this comment was completely ignored. Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry Gabriel, my bad. Completely missed it. I'll add it in a following PR where I'll also add some testing for the transaction code that got added to the trace_writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gbbr maybe he wants you to (ahem) backoff 🥁
* More modular structure for some extra code reuse and easier testing. * Readded retriable errors. * Async flushing of payloads handled by a PayloadSender. * Added exponential and jittery backoff timer and apply it by default to failed flushes.
c43beaa
to
c237faa
Compare
Includes:
Working on top of: #343