Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][gcs] Fix for concurrency issues and context timeouts in the GCS input #35605

Merged
merged 10 commits into from
Jun 3, 2023

Conversation

ShourieG
Copy link
Contributor

@ShourieG ShourieG commented May 29, 2023

Type of change

  • Bug

What does this PR do?

  1. It fixes a map concurrency issue which was revealed when testing the input at scale.
  2. Fixes the issue were bucket_timeout was being applied to the bucket poll interval instead of individual bucket object read ops.
  3. Fixes flakey test issue.

Why is it important?

This relates to an SDH from one of our customers and fixes those issues.
This also addresses any issues the input could have at scale.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
    - [ ] I have made corresponding changes to the documentation
    - [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc.

@ShourieG ShourieG requested a review from a team as a code owner May 29, 2023 10:48
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label May 29, 2023
@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label May 29, 2023
@ShourieG ShourieG self-assigned this May 29, 2023
@mergify
Copy link
Contributor

mergify bot commented May 29, 2023

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @ShourieG? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

@ShourieG ShourieG requested review from efd6, andrewkroh and P1llus May 29, 2023 10:52
@ShourieG ShourieG added the backport-v8.8.0 Automated backport with mergify label May 29, 2023
@elasticmachine
Copy link
Collaborator

elasticmachine commented May 29, 2023

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2023-06-01T08:13:47.883+0000

  • Duration: 74 min 39 sec

Test stats 🧪

Test Results
Failed 0
Passed 2959
Skipped 173
Total 3132

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

CHANGELOG.next.asciidoc Outdated Show resolved Hide resolved
x-pack/filebeat/input/gcs/input_stateless.go Outdated Show resolved Hide resolved
x-pack/filebeat/input/gcs/job.go Outdated Show resolved Hide resolved
@ShourieG
Copy link
Contributor Author

@efd6 , I've addressed the changes and updated the PR

x-pack/filebeat/input/gcs/job.go Outdated Show resolved Hide resolved
x-pack/filebeat/input/gcs/scheduler.go Outdated Show resolved Hide resolved
x-pack/filebeat/input/gcs/scheduler.go Outdated Show resolved Hide resolved
@ShourieG
Copy link
Contributor Author

@efd6, Updated the changes and left comment regarding #35605 (comment)

@ShourieG ShourieG changed the title [filebeat][gcs] Enhancement and bugfix for Gcs input [filebeat][gcs] Enhancement and bugfix for GCS input May 30, 2023
Copy link
Contributor

@efd6 efd6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm OK with the concurrency fix, mod the unlocked use of the *Checkpoint returned by *state.checkpoint(). I think that method needs documentation clearly stating that it's not safe for concurrent use and that the returned value can not be used while the receiver in operating concurrently. Similar for *state.setCheckpoint().

Overall, I'd like to see this broken into separate PRs.

Please wait for @andrewkroh

CHANGELOG.next.asciidoc Outdated Show resolved Hide resolved
@ShourieG
Copy link
Contributor Author

@efd6, I've removed the enhancements from this PR, I will put up another PR post merging of this PR which will contain those enhancements.

@ShourieG ShourieG changed the title [filebeat][gcs] Enhancement and bugfix for GCS input [filebeat][gcs] Fix for concurrency issues and context timeouts in the GCS input May 31, 2023
@efd6
Copy link
Contributor

efd6 commented May 31, 2023

Thanks. Please still wait for Andrew.

@mergify
Copy link
Contributor

mergify bot commented May 31, 2023

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b gcs/enhancement upstream/gcs/enhancement
git merge upstream/main
git push upstream gcs/enhancement

CHANGELOG.next.asciidoc Outdated Show resolved Hide resolved
@ShourieG
Copy link
Contributor Author

ShourieG commented Jun 1, 2023

@andrewkroh does the PR look good for merging at the moment ?

Copy link
Member

@andrewkroh andrewkroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a good improvement in addressing the concurrency issues. So if you wanted to go ahead as is that's good with me.

But I think there may be some correctness issues w.r.t. the comment I raised. This could be apparent during shutdown where events in the queue are discarded, but the state was already updated with the assumption that the events where written to ES.

j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
// unlocks after data is saved and published
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the assumptions that you are making about the published state of the event after Publish returns? I see this comment about it being "saved and published" and that's not correct. So if that is important to your implementation you'll need changes.

When Publish returns the event has been queued internally. It will have undergone processing via Beats processors then put onto the queue. If it needs to know when the event has been accepted by the destination output then you can register and EventListener to get a callback.

// Callbacks for when events are added / acknowledged
EventListener EventListener

As an example, the GCP pub/sub input uses an EventListener to defer ACK'ing the pubsub message until the event has been written to ES.

EventListener: acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, privates []interface{}) {
for _, priv := range privates {
if msg, ok := priv.(*pubsub.Message); ok {
msg.Ack()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewkroh thanks for the feedback, I will revisit this when I do some planned feature enhancements for the gcs input in the near future, will have to separate out the cursor publish from the event publish to properly implement this feature. For now I'm merging then PR since it's related to an sdh.

@ShourieG ShourieG merged commit 7788a6d into elastic:main Jun 3, 2023
mergify bot pushed a commit that referenced this pull request Jun 3, 2023
…e GCS input (#35605)

* enhancement and bugfix for gcs input

* updated changelog

* addressed go lint errors

* updated changelog and addresses PR suggestions

* addressed PR suggestions

* updated PR to contain only bugfixes

* updated changelog

(cherry picked from commit 7788a6d)
@ShourieG ShourieG deleted the gcs/enhancement branch June 3, 2023 09:49
ShourieG added a commit that referenced this pull request Jun 9, 2023
… context timeouts in the GCS input (#35666)

* [filebeat][gcs] Fix for concurrency issues and context timeouts in the GCS input  (#35605)

* enhancement and bugfix for gcs input

* updated changelog

* addressed go lint errors

* updated changelog and addresses PR suggestions

* addressed PR suggestions

* updated PR to contain only bugfixes

* updated changelog

(cherry picked from commit 7788a6d)

* Update CHANGELOG.next.asciidoc

---------

Co-authored-by: ShourieG <[email protected]>
@reakaleek reakaleek mentioned this pull request Jul 19, 2023
6 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-v8.8.0 Automated backport with mergify bugfix
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants