Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a proxy queue to avoid double-queueing every event when using the shipper output #34377

Merged
merged 35 commits into from
Feb 23, 2023

Conversation

faec
Copy link
Contributor

@faec faec commented Jan 24, 2023

Add a "proxy queue" which tracks acknowledgment callbacks for events but does not let events accumulate or keep its own copy of the event data for batches that have been read. This is for use in the shipper output, since events sent to it will be sent and queued in the shipper, and thus don't need to be queued in Beats as well while they wait for upstream acknowledgment.

This also includes significant changes to the shipper output, since there were various race conditions and bugs that interfered with the chain of acknowledgments, and we need precise handling to make sure the events are freed without losing the acknowledgment data.

Currently this change is internal-only; there will be a followup PR to enable the proxy queue when the shipper output is active.

Resolves elastic/elastic-agent-shipper#97

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

@faec faec self-assigned this Jan 24, 2023
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Jan 24, 2023
@mergify
Copy link
Contributor

mergify bot commented Jan 24, 2023

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @faec? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

@pierrehilbert pierrehilbert added the Team:Elastic-Agent Label for the Agent team label Jan 25, 2023
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Jan 25, 2023
@faec faec marked this pull request as ready for review January 25, 2023 20:12
@faec faec requested a review from a team as a code owner January 25, 2023 20:12
@faec faec requested review from cmacknz and leehinman and removed request for a team January 25, 2023 20:12
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent (Team:Elastic-Agent)

@faec faec changed the title (Draft) Add a proxy queue to avoid double-queueing every event when using the shipper output Add a proxy queue to avoid double-queueing every event when using the shipper output Jan 25, 2023
@elasticmachine
Copy link
Collaborator

elasticmachine commented Jan 25, 2023

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2023-02-23T21:29:07.535+0000

  • Duration: 67 min 40 sec

Test stats 🧪

Test Results
Failed 0
Passed 25983
Skipped 1962
Total 27945

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@faec faec requested a review from fearful-symmetry January 25, 2023 20:47

// ackListener's only job is to listen to the persisted index RPC stream
// and forward its values to the ack worker.
func (s *shipper) ackListener(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to have a dedicated little listener-thread-thing that just forwards events from that RPC stream? Are we just trying to make the select statement in ackWorker cleaner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can go into more detail in the shipper sync but, yes, because we can't directly select on the result of this call, when we make it we are committing to a 30+ second window where we can't handle signals from publish calls, which would mean using a very large channel buffer to avoid spurious blocking (there's no particular limit on how many batches could go thru and they send fast) -- I'm unhappy with the extra goroutine conceptually but it is cheap and robust to bad scheduling and other config interactions. What I'd really like is to fix the shipper API so this is all unnecessary, so let's talk about that at the sync...

// specific language governing permissions and limitations
// under the License.

package proxyqueue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume something is supposed to be in the README.md file? Regardless, we may want some kind of package-level comment here describing what each queue type does, since there's a lot of them now...

Copy link
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, I'm OK with the overall direction/design. Couple of things I think we need before merging.

  • Readme filled in with design & purpose of queue
  • diagram of how broker works
  • diagram of data structure backing the queue
  • more tests, especially around partial acks of batches
  • fix for propagating ackloop errors in shipper client or at least an issue to fix

entries []queueEntry

// Original number of entries (persists even if entries are freed).
entryCount int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs a rename to make the fact that it is the origninal count more explicit.

// try to reconnect.
// (Note: this case would be much easier if the persisted index RPC
// were not a stream.)
s.log.Errorf("acknowledgment listener stopped: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should store the status of the ackLoop in the shipper struct. That way we can check it before a publish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I handled this by instead calling s.Close() which will produce an error the next time Publish is called.

@faec
Copy link
Contributor Author

faec commented Feb 2, 2023

@leehinman Makes sense for the most part but note that there's no mechanism for partial acks of batches (this is even mostly true in the other queues -- "partial acks" are implemented in the outputs and do not propagate back to the queue or producer until the full batch has been processed -- but especially so in this case since in the shipper output the publish call itself blocks on the batch it's given rather than retrying through the pipeline)

Copy link
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code LGTM. Thank you for adding the readme and diagram, those are very helpful.

request for adding a few more tests around the done channel and a suggested fix to make the rendered svg more robust.


queueReader {
explanation: |md
`queueReader` is a worker that reads raw batches (satisfying the<br>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`queueReader` is a worker that reads raw batches (satisfying the<br>
`queueReader` is a worker that reads raw batches (satisfying the

replace "<br>" with 2 spaces at end of line. Annoying, but neither github or firefox will render the svg that is produced with "<br>". Need to do for all the "<br>" tags.

Copy link

@alixander alixander Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\ also works btw (https://commonmark.org/help/tutorial/03-paragraphs.html#:~:text=For%20a%20line%20break%2C%20add,the%20end%20of%20the%20line.)

or <br /> (they won't render it because <br> is not semantic xml)

return fmt.Errorf("timed out waiting for acknowledgments: have %d, wanted %d", l.ackedCount, targetCount)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a few more tests around shutdown, so if queue is shutting down we don't publish etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will think about what tests might be appropriate, but:

  • there's on some level a fundamental nondeterminism: it's always possible for the shutdown signal to come in simultaneously with a publish request, and what breaks the tie isn't which was sent first but which the queue loop receives first (which we can't tell from outside). We can be certain that no publish requests will go through after Close returns (or... at least now we can, since I just added the mistakenly-omitted Wait call now in response to this comment 😅) but we can't have any guarantees based on the close channel alone, or while Close is still in progress. But, I can add tests to make sure that write-after-close fails. However, sadly:
  • queue.Close is never called during Beats shutdown, so it's nice to know that it would work correctly if we used it, but as of right now none of the queues are ever shut down properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test that shows write after close fails would satisfy me. I think that goes a long way in exercising that code path.

@faec faec merged commit 4164cf6 into elastic:main Feb 23, 2023
@faec faec deleted the proxy-queue branch February 23, 2023 22:40
chrisberkhout pushed a commit that referenced this pull request Jun 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Team:Elastic-Agent Label for the Agent team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Beats shipper output shouldn't keep pending events in the queue
6 participants