-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[filebeat][gcs] Fix for concurrency issues and context timeouts in the GCS input #35605
Conversation
Pinging @elastic/security-external-integrations (Team:Security-External Integrations) |
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
@efd6 , I've addressed the changes and updated the PR |
@efd6, Updated the changes and left comment regarding #35605 (comment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm OK with the concurrency fix, mod the unlocked use of the *Checkpoint
returned by *state.checkpoint()
. I think that method needs documentation clearly stating that it's not safe for concurrent use and that the returned value can not be used while the receiver in operating concurrently. Similar for *state.setCheckpoint()
.
Overall, I'd like to see this broken into separate PRs.
Please wait for @andrewkroh
@efd6, I've removed the enhancements from this PR, I will put up another PR post merging of this PR which will contain those enhancements. |
Thanks. Please still wait for Andrew. |
This pull request is now in conflicts. Could you fix it? 🙏
|
@andrewkroh does the PR look good for merging at the moment ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a good improvement in addressing the concurrency issues. So if you wanted to go ahead as is that's good with me.
But I think there may be some correctness issues w.r.t. the comment I raised. This could be apparent during shutdown where events in the queue are discarded, but the state was already updated with the assumption that the events where written to ES.
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) | ||
} | ||
j.mu.Unlock() | ||
// unlocks after data is saved and published |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the assumptions that you are making about the published state of the event after Publish
returns? I see this comment about it being "saved and published" and that's not correct. So if that is important to your implementation you'll need changes.
When Publish
returns the event has been queued internally. It will have undergone processing via Beats processors then put onto the queue. If it needs to know when the event has been accepted by the destination output then you can register and EventListener
to get a callback.
beats/libbeat/beat/pipeline.go
Lines 60 to 61 in 327b5fe
// Callbacks for when events are added / acknowledged | |
EventListener EventListener |
As an example, the GCP pub/sub input uses an EventListener
to defer ACK'ing the pubsub message until the event has been written to ES.
beats/x-pack/filebeat/input/gcppubsub/input.go
Lines 117 to 121 in 78dc664
EventListener: acker.ConnectionOnly( | |
acker.EventPrivateReporter(func(_ int, privates []interface{}) { | |
for _, priv := range privates { | |
if msg, ok := priv.(*pubsub.Message); ok { | |
msg.Ack() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrewkroh thanks for the feedback, I will revisit this when I do some planned feature enhancements for the gcs input in the near future, will have to separate out the cursor publish from the event publish to properly implement this feature. For now I'm merging then PR since it's related to an sdh.
… context timeouts in the GCS input (#35666) * [filebeat][gcs] Fix for concurrency issues and context timeouts in the GCS input (#35605) * enhancement and bugfix for gcs input * updated changelog * addressed go lint errors * updated changelog and addresses PR suggestions * addressed PR suggestions * updated PR to contain only bugfixes * updated changelog (cherry picked from commit 7788a6d) * Update CHANGELOG.next.asciidoc --------- Co-authored-by: ShourieG <[email protected]>
Type of change
What does this PR do?
Why is it important?
This relates to an SDH from one of our customers and fixes those issues.
This also addresses any issues the input could have at scale.
Checklist
- [ ] I have made corresponding changes to the documentation- [ ] I have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
.