-
-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize Trace collection #32
base: master
Are you sure you want to change the base?
Optimize Trace collection #32
Conversation
As an update, I've added buffer sizes, tasks, and re-added support for batch sizes. |
I've tested this fix in our environment. We see a marked decrease in overall latency. Along with that, we see consistent latencies even under load. Before, we could see pauses of up to 20ms when sending a single trace. After these changes, the max latencies stay consistent at ~600 usecs even under load. |
Nice, I like where this is heading, but it's obviously a big change so I want to take some time to grok it and play with it before we do a wholesale replacement like this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surface level comments as I went through*. Was interested in seeing what you did. The new buffer is groovy.
Only issue I saw is real minor: the signature of send_trace/{1,2}
changed but that's easily fixed.
* I should specify that's because the questions on the approach I just chatted @ keathley directly :P Looks good to me. Assumed Client
is just the same code that existed before so didn't look at it.
trace = %Trace{spans: spans} | ||
GenServer.call(__MODULE__, {:send_trace, trace}, timeout) | ||
send_trace(trace, opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could switch to just send_trace/1
here
end | ||
|
||
@doc """ | ||
Send spans asynchronously to DataDog. | ||
""" | ||
@spec send_trace(Trace.t(), Keyword.t()) :: :ok | ||
def send_trace(%Trace{} = trace, opts \\ []) do | ||
def send_trace(%Trace{} = trace, _opts \\ []) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should send_trace/2
be deprecated?
sync_threshold: opts[:sync_threshold], | ||
agent_pid: agent_pid | ||
} | ||
task_sup = __MODULE__.TaskSupervisor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe module attribute this eh
:telemetry.span([:spandex_datadog, :send_trace], %{trace: trace}, fn -> | ||
timeout = Keyword.get(opts, :timeout, 30_000) | ||
result = GenServer.call(__MODULE__, {:send_trace, trace}, timeout) | ||
result = Buffer.add_trace(trace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like Buffer.add_trace returns true | :ok
, so spec for this function needs to be updated... or maybe just change the next line to be {:ok, %{trace: trace}}
and ignore the result of add_trace?
@@ -0,0 +1,75 @@ | |||
defmodule SpandexDatadog.ApiServer.Buffer do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be good to spec and doc false all the pub funs in here
def add_trace(trace) do | ||
config = :persistent_term.get(@config_key) | ||
id = :erlang.system_info(:scheduler_id) | ||
buffer = :"#{__MODULE__}-#{id}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buffer = :"#{__MODULE__}-#{id}" | |
buffer = tab_name(id) |
|> update_in([:flush_period], & &1 || 1_000) | ||
|> put_in([:collector_url], collector_url) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason you're using the *_in
instead of Map.
?
# next time = min(max(min_time * 2, 1_000), 1_000) | ||
# If our minimum requests are taking way longer than 1 second than don't try | ||
# schedule another |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these comments still relevant?
|> Enum.each(fn batch -> | ||
Client.send(state.http, state.collector_url, batch, verbose?: state.verbose?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any thoughts on fanning this out in a task async_stream or the like if there's enough batches?
c7c507e
to
3798378
Compare
After #28, we determined that the call to add a new trace was taking several milliseconds in the general case and spiking up to 20ms under load. This PR is a re-write of the existing trace collection process in order to optimize callers and remove this as a bottleneck.
New Technique
With this new method, all traces are buffered in a collection of write-optimized ETS tables. There is one ETS table per scheduler. When a caller writes a trace into the ets table, it first determines its current scheduler, and then writes the trace to the corresponding ets table. Using multiple ets tables in this way helps to reduce on write contention. Periodically (by default every second) a reporting process flushes the data from these ets tables and sends it to the datadog collector.
Future Improvements
There are a few other improvements to make here down the line. The first is that this is now an unbounded buffer which is always a bad idea. We should provide a per-table, maximum number of traces that can be stored and reject new traces (or old traces) based on this maximum. This should remove the bottleneck from the callers and help ensure that we dont' have unbounded memory growth.
It is also probably still worth doing certain operations in a Task or separate, short-lived process to avoid memory bloat from binaries.
I'll work on both of these ideas, but I wanted to present this PR sooner in order to get y'alls input.