Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][Kafka] Convert client methods to async #1349

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

phalbert
Copy link
Contributor

@phalbert phalbert commented Jan 23, 2025

Description

This PR converts all Kafka client methods to async to prevent blocking operations. Previously, only describe_consumer_groups was async, while other methods were running synchronously and could potentially block the event loop.

This change improves the overall reliability of the Kafka integration by ensuring that all blocking operations are properly handled in a thread pool.

How:

  • Converted describe_cluster, describe_brokers, and describe_topics to async methods
  • Added anyio.to_thread.run_sync for blocking Kafka operations
  • Ensured all Kafka client operations run in thread pool
  • Updated main.py to properly await the async methods

Type of change

Please leave one option from the following and delete the rest:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • New Integration (non-breaking change which adds a new integration)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Non-breaking change (fix of existing functionality that will not change current behavior)
  • Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • Integration able to create all default resources from scratch
  • Resync finishes successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Scheduled resync able to abort existing resync and start a new one
  • Tested with at least 2 integrations from scratch
  • Tested with Kafka and Polling event listeners
  • Tested deletion of entities that don't pass the selector

Integration testing checklist

  • Integration able to create all default resources from scratch
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Resync finishes successfully
  • If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • Docs PR link here

Preflight checklist

  • Handled rate limiting
  • Handled pagination
  • Implemented the code in async
  • Support Multi account

Screenshots

Include screenshots from your environment showing how the resources of the integration will look.

API Documentation

Provide links to the API documentation used for this integration.

- Convert describe_cluster, describe_brokers, and describe_topics to async methods
- Use anyio.to_thread.run_sync for blocking Kafka operations
- Ensure all Kafka client operations run in thread pool
- Prevent blocking of event loop during Kafka operations

This change aligns all client methods with describe_consumer_groups by making
them async and properly handling blocking operations.
@phalbert phalbert requested a review from a team as a code owner January 23, 2025 15:31
@phalbert phalbert self-assigned this Jan 23, 2025
@phalbert phalbert requested a review from mk-armah January 23, 2025 15:32
Copy link
Member

@mk-armah mk-armah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets improve the user experience and performance of the catalog update process, I suggest introducing pagination for batch operations. By yielding results in batches, we can achieve two key benefits:

  1. Faster Feedback: Entities will become visible in the catalog incrementally, rather than waiting for the entire operation to complete. This improves responsiveness and makes the catalog more interactive for users.
  2. Prevent "All or Nothing" Scenarios: In the current implementation, a single failure can block the entire batch of entities from being processed. With pagination, failures are isolated to individual batches, reducing the impact and improving system resilience.

If you agree, we could:

Transform the functions to generators that yields entities in chunks of a configurable batch size.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants