-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Define a queue metrics reporter interface #31289
Define a queue metrics reporter interface #31289
Conversation
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
libbeat/publisher/queue/queue.go
Outdated
//Metrics is a set of basic-user friendly metrics that report the current state of the queue. These metrics are meant to be relatively generic and high-level, and when reported directly, can be comprehensible to a user. | ||
type Metrics struct { | ||
//QueueLimitCount is the count of times that the queue has reached it's user-configured limit, either in terms of storage space, or count of events. | ||
QueueLimitCount uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QueueLimitCount uint64 | |
QueueFullCount uint64 |
The one below this is QueueIsFull
so this seems more consistent. Either Limit or Full works as long as it is used consistently.
@@ -34,6 +34,21 @@ type ACKListener interface { | |||
OnACK(eventCount int) // number of consecutively published events acked by producers | |||
} | |||
|
|||
//Metrics is a set of basic-user friendly metrics that report the current state of the queue. These metrics are meant to be relatively generic and high-level, and when reported directly, can be comprehensible to a user. | |||
type Metrics struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like if it were more explicit / opinionated about the resource types -- e.g. EventCount
, ByteCount
, EventLimit
and ByteLimit
could all be fields (with zero indicating not-applicable).
I'd also rather leave off full and percent: "full" is a relative concept (a queue with 1MB free is "full" if it's 100MB and most events are 2MB; it's not "full" if it's 5MB and most events are 10KB) so let's not try to guess at this granularity. Percent can be derived for the respective types from the count and limit values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point about what full means. It seems like just having a counter for events that were rejected because they wouldn't fit is what we'd want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah, that's a good point about "full" too. Perhaps leave that to the metrics code to derive, and let this interface just report counts and byte values, as you suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes sense. The queue itself doesn't care if something can fit into it, that is a concern of whatever is using the Producer interface.
@@ -34,6 +34,21 @@ type ACKListener interface { | |||
OnACK(eventCount int) // number of consecutively published events acked by producers | |||
} | |||
|
|||
//Metrics is a set of basic-user friendly metrics that report the current state of the queue. These metrics are meant to be relatively generic and high-level, and when reported directly, can be comprehensible to a user. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'd rather these metrics be unambiguous and consistently implementable. Generic / high-level / comprehensible ones should ideally happen in the metrics code in a queue-agnostic way (rather than being duplicated in different queues where the semantics can subtly diverge). So my main comment below is about making the metrics more concrete and low-level 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah, that's a good point, I think. When I started I was super on the fence about how much of the work of "deriving" certain values should be done by the queue, or by the metrics code.
So, made some super basic changes based on what @faec said. There's still a few missing pieces here that I want input on before we expand the metrics here. The first is how aggressive we want to get with the metrics reported through this particular interface, since there's a lot of queue-implementation-specific metrics we could report, but I'm also not sure what additional metrics we could add that would make sense with both the memory and disk queue. The memory queue isn't particularly well documented, so I'm still figuring out how it works and trying to compare it to the disk queue. A lot of metrics we might want would also require us to actually instrument the code with additional counters, as opposed to just passing along channel sizes or user configs. I'm not sure how much of that we need/want to tackle now. I'm also not sure if we want to implement the "queue lag" and "oldest item in queue" metrics in the queue, or if the queues should pass along some kind of internal event channel to the metrics code that we can use to track what's moving through the queue at various intervals. It's easy to imagine some kind of channel that reports things like "event with this ID and timestamp just pass through the consumer/producer" down to the metrics code, but that might be more complex than having the queue figure that out itself. |
One thing we will need to do is assign elements in the queue a monotonic identifier or sequence number. This will be necessary to correctly acknowledge messages between the shipper and input process (see elastic/elastic-agent-shipper#9 for this work). Once we have that, queue lag is just the different between the sequence numbers of the newest and oldest item in the queue. It may make sense to add those sequences numbers first if it makes the metrics easier to implement. I feel that a queue lag metric is very important as it will allow us to immediately see if a beat or shipper is being backpressued by looking at the lag metric over time. Identifying backpressure is something we really struggle with right now. |
Yah, an incrementing queue ID value would simplify a lot of this, agreed; it also means (at least to me) that deriving some of these values are simple enough that it can be done in the queue, and the metrics code shouldn't need to worry about handling some kind of event stream, or...something. |
I've split off adding the queue ID / sequence numbers into a separate ticket: elastic/elastic-agent-shipper#27 It seems like it will make sense to add that first since it simplifies the metrics implementation. |
//ByteCount is the total byte size of the queue | ||
ByteCount opt.Uint | ||
//ByteLimit is the user-configured byte limit of the queue | ||
ByteLimit opt.Uint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably requires us to implement #30069 to get. Doing this for the memory queue might be tricky.
I like these and think they are a much more intuitive way to think about the queue size, but I'm not convinced adding this is more important than a lot of the other work we are already doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah, hence why I made all these opt
values; the thinking was that some queues might care more about bytes, and others would be more event-based, the opt
type would allow this one struct to forward whatever metrics types are available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK cool, they can just stay unimplemented then until we add support for them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of these fields as referring to the queue's main buffer, which for the disk queue would mean disk usage. That data is already available in the current code, there's just no one to report it to. The memory queue could leave those fields zero for now, since it doesn't enforce a byte limit.
(We could consider adding metrics about memory overhead outside the main buffer too, but like you point out nobody tracks that yet.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@faec Do you think this is something we should merge now? Should this or a follow-up PR actually add a Metrics()
method to the actual interfaces?
I'm not sure adding that first is best, partly because I'm already working on that part and it is painfully complicated 😅 I've added an outline of the relevant state to elastic/elastic-agent-shipper#27. It seems like there are still a fair number of things we can immediately report or get started on, so I think we should proceed with those while the memory queue is refactored into a shape where it will handle our data. Once #31307 is checked in the id change should become a lot more feasible. |
Thanks for clarifying. Let's split each of the metrics into separate issues since some of them are blocked and some of them aren't:
|
libbeat/publisher/queue/queue.go
Outdated
EventLimit opt.Uint | ||
|
||
//LongestWaitingItem is the timestamp of the oldest item in the queue. | ||
LongestWaitingItem common.Time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like nouns to match up with datatypes, how about OldestActiveTimestamp
?
libbeat/publisher/queue/queue.go
Outdated
|
||
//LongestWaitingItem is the timestamp of the oldest item in the queue. | ||
LongestWaitingItem common.Time | ||
//QueueLag is the difference between the consumer and producer position in the queue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "position" mean here? If it means byte ( / event) index in the queue buffer then this is very similar to ByteCount
/ EventCount
(maybe modulo some rounding depending on how e.g. the disk queue reports events that are acknowledged but not yet deleted).
I'd rather leave this one off for now. I'd like things in the global interface to have an unambiguous meaning that would be consistently derivable for any queue, at least in principle, and I don't think this one is yet. If we could define it in language that is readily convertable to a unit test I'd feel better :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, my understanding is that this will be a measure of the "delay" involved in an event entering then exiting a queue, as told by the event IDs. Since the IDs increment, if the event leaving the queue for the output has ID = 10
and the event entering has ID =15
, the QueueLag is 5.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, but if the event leaving the queue has id 10 and the event entering has id 15, then isn't that also the same as EventCount
being 5? (+- 1 anyway) It seems redundant with that definition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah, I'm probably describing it wrong, and discussion of this is spread out across like, 3 different issues, which isn't helping. From Craig's earlier comment:
queue lag is just the difference between the sequence numbers of the newest and oldest item in the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, to me that still sounds the same? I don't see how it isn't redundant with EventCount
. Is there any situation where this should be different than "the number of items in the queue"?
libbeat/publisher/queue/queue.go
Outdated
|
||
// Leave this commented out for now so we don't break the interfaces | ||
// Should the main Queue implementation itself expose the metrics, or some other component in the interfaces? | ||
//Metrics() (Metrics, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a reasonable place to put it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright. Should this PR actually add this to the interface and made some dummy methods for the queues, or should we save that for when we actually implement them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be fine either way, as long as the comments explain the intention. Dummy methods that just return an "unimplemented" error would be fine in this PR so you can start connecting the reporting components, and we can flesh out the queue-specific implementations as more pieces get connected.
@faec added bare methods to the queues. Hopefully we're about done. |
…er-tar-gz * upstream/main: (139 commits) [Automation] Update elastic stack version to 8.3.0-c655cda8 for testing (elastic#31322) Define a queue metrics reporter interface (elastic#31289) [Oracle Module] Change tablespace metricset collection period (elastic#31259) libbeat/reader/syslog: relax timestamp parsing to allow leading zero (elastic#31254) [Automation] Update elastic stack version to 8.3.0-55ba6f37 for testing (elastic#31311) [libbeat] Remove unused fields and functions in the memory queue (elastic#31302) [libbeat] Cleaning up some unneeded helper types (elastic#31290) Readme for kibana module (elastic#31276) [Automation] Update elastic stack version to 8.3.0-4be61f32 for testing (elastic#31296) x-pack/winlogbeat/module/routing/ingest: fix typo for channel name (elastic#31291) Small pipeline cleanup removing some unused data fields (elastic#31288) removing info log (elastic#30971) Simplify TLS config deserialization (elastic#31168) Detect new files under known paths in filestream input (elastic#31268) Add support for port mapping in docker hints (elastic#31243) Update qa-labels.yml (elastic#31260) libbeat: log debug for `proxy_url` and fixed docs (elastic#31130) [heartbeat][docs] Add note about ensuring correct index settings for uptime (elastic#31146) [Automation] Update elastic stack version to 8.3.0-2c8f9574 for testing (elastic#31256) [Filebeat] fix m365_defender pipeline bug (elastic#31227) ...
* first pass at a queue metrics reporter * change interface * change monitored types * change timestamp field name * add bare queue implementations * make linter happy * fix linter ignore * remove QueueLag for now
* first pass at a queue metrics reporter * change interface * change monitored types * change timestamp field name * add bare queue implementations * make linter happy * fix linter ignore * remove QueueLag for now
This is the result of a brief conversation with @faec about how to add metrics reporting to the queue. After going over some of the requirements in #31113, I just made a first attempt at defining an interface for what metrics would actually be reported.
I added a simple "reporter" to the
Queue
interface, but I'm not sure we might actually want to do it that way. Whatever calls thatMetrics()
function will have to be handled by metrics reporter that lives in its own thread, so maybe theQueue
interface should return some kind of handler or something, but I'm not sure.