-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a proxy queue to avoid double-queueing every event when using the shipper output #34377
Conversation
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
Pinging @elastic/elastic-agent (Team:Elastic-Agent) |
|
||
// ackListener's only job is to listen to the persisted index RPC stream | ||
// and forward its values to the ack worker. | ||
func (s *shipper) ackListener(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to have a dedicated little listener-thread-thing that just forwards events from that RPC stream? Are we just trying to make the select statement in ackWorker
cleaner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can go into more detail in the shipper sync but, yes, because we can't directly select on the result of this call, when we make it we are committing to a 30+ second window where we can't handle signals from publish calls, which would mean using a very large channel buffer to avoid spurious blocking (there's no particular limit on how many batches could go thru and they send fast) -- I'm unhappy with the extra goroutine conceptually but it is cheap and robust to bad scheduling and other config interactions. What I'd really like is to fix the shipper API so this is all unnecessary, so let's talk about that at the sync...
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package proxyqueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume something is supposed to be in the README.md
file? Regardless, we may want some kind of package-level comment here describing what each queue type does, since there's a lot of them now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, I'm OK with the overall direction/design. Couple of things I think we need before merging.
- Readme filled in with design & purpose of queue
- diagram of how broker works
- diagram of data structure backing the queue
- more tests, especially around partial acks of batches
- fix for propagating ackloop errors in shipper client or at least an issue to fix
entries []queueEntry | ||
|
||
// Original number of entries (persists even if entries are freed). | ||
entryCount int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs a rename to make the fact that it is the origninal count more explicit.
// try to reconnect. | ||
// (Note: this case would be much easier if the persisted index RPC | ||
// were not a stream.) | ||
s.log.Errorf("acknowledgment listener stopped: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should store the status of the ackLoop in the shipper
struct. That way we can check it before a publish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I handled this by instead calling s.Close()
which will produce an error the next time Publish is called.
@leehinman Makes sense for the most part but note that there's no mechanism for partial acks of batches (this is even mostly true in the other queues -- "partial acks" are implemented in the outputs and do not propagate back to the queue or producer until the full batch has been processed -- but especially so in this case since in the shipper output the publish call itself blocks on the batch it's given rather than retrying through the pipeline) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code LGTM. Thank you for adding the readme and diagram, those are very helpful.
request for adding a few more tests around the done channel and a suggested fix to make the rendered svg more robust.
|
||
queueReader { | ||
explanation: |md | ||
`queueReader` is a worker that reads raw batches (satisfying the<br> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`queueReader` is a worker that reads raw batches (satisfying the<br> | |
`queueReader` is a worker that reads raw batches (satisfying the |
replace "<br>" with 2 spaces at end of line. Annoying, but neither github or firefox will render the svg that is produced with "<br>". Need to do for all the "<br>" tags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
\
also works btw (https://commonmark.org/help/tutorial/03-paragraphs.html#:~:text=For%20a%20line%20break%2C%20add,the%20end%20of%20the%20line.)
or <br />
(they won't render it because <br>
is not semantic xml)
return fmt.Errorf("timed out waiting for acknowledgments: have %d, wanted %d", l.ackedCount, targetCount) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a few more tests around shutdown, so if queue is shutting down we don't publish etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will think about what tests might be appropriate, but:
- there's on some level a fundamental nondeterminism: it's always possible for the shutdown signal to come in simultaneously with a publish request, and what breaks the tie isn't which was sent first but which the queue loop receives first (which we can't tell from outside). We can be certain that no publish requests will go through after
Close
returns (or... at least now we can, since I just added the mistakenly-omittedWait
call now in response to this comment 😅) but we can't have any guarantees based on the close channel alone, or whileClose
is still in progress. But, I can add tests to make sure that write-after-close fails. However, sadly: queue.Close
is never called during Beats shutdown, so it's nice to know that it would work correctly if we used it, but as of right now none of the queues are ever shut down properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A test that shows write after close fails would satisfy me. I think that goes a long way in exercising that code path.
… shipper output (#34377)
Add a "proxy queue" which tracks acknowledgment callbacks for events but does not let events accumulate or keep its own copy of the event data for batches that have been read. This is for use in the shipper output, since events sent to it will be sent and queued in the shipper, and thus don't need to be queued in Beats as well while they wait for upstream acknowledgment.
This also includes significant changes to the shipper output, since there were various race conditions and bugs that interfered with the chain of acknowledgments, and we need precise handling to make sure the events are freed without losing the acknowledgment data.
Currently this change is internal-only; there will be a followup PR to enable the proxy queue when the shipper output is active.
Resolves elastic/elastic-agent-shipper#97
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesI have added an entry inCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.