-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 Source Intercom: Fixed filtering of conversation_parts #12374
🐛 Source Intercom: Fixed filtering of conversation_parts #12374
Conversation
Thank you @lgomezm, I'm going to review this week. |
/test connector=connectors/source-hubspot
|
Hi @alafanechere. I think this ran for a different connector, didn't it?. |
Oh yes sorry! I'm too used to your contributions to the hubspot source. |
/test connector=connectors/source-intercom
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @lgomezm, could you please give more details on how you are solving the original issue in your PR? I'm not sure I totally grasped how your changes fix the problem.
@@ -88,6 +88,9 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, | |||
else: | |||
yield from data | |||
|
|||
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, the purpose of creating a separate get_records_field
function is to enable calling this logic without the stream state argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: | |
def parse_response(self, response: requests.Response, *args, **kwargs) -> Iterable[Mapping]: |
I'd rather keep the logic in the same function but change its signature to allow to call it with the single response argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're right, but I changed my approach in 83afd35.
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: | ||
|
||
|
||
def get_records_field(self, response: requests.Response) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def get_records_field(self, response: requests.Response) -> Any: | |
def filter_record_fields(self, response: requests.Response) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted this line change in 83afd35.
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: | ||
records = super().parse_response(response, stream_state, **kwargs) | ||
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: | ||
records = super().get_records_field(response=response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
records = super().get_records_field(response=response) | |
records = super().parse_response(response) |
or if you prefer to keep a separate get_records_field
method:
records = super().get_records_field(response=response) | |
records = self.get_records_field(response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted these line changes in 83afd35.
def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: | ||
parent_stream = self.parent_stream_class(authenticator=self.authenticator, start_date=self.start_date) | ||
for slice in parent_stream.stream_slices(sync_mode=sync_mode): | ||
for slice in parent_stream.stream_slices(sync_mode=sync_mode, stream_state=stream_state): | ||
for item in self.parent_stream_class( | ||
authenticator=self.authenticator, start_date=self.start_date, stream_slice=slice | ||
).read_records(sync_mode=sync_mode): | ||
).read_records(sync_mode=sync_mode, stream_state=stream_state): | ||
yield {"id": item["id"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm under the impression the fix is there. Could you elaborate a bit on why it fixes your original issue? I'd love to make sure this does not introduce regression on other streams that inherit from ChildStreamMixin
(such as CompanySegments
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I completely changed my approach. I had originally introduced this change in ChildStreamMixin
so that it filters out conversations (from parent stream) if they were updated before the current stream state. You're right in that it might introduce regression issues.
Instead of updating ChildStreamMixin
, I made ConversationParts
extend HttpSubStream
, which is an implementation of HttpStream
that handles computing of slices. That way, I don't have to modify ChildStreamMixin
and we handle filtering/parsing response correctly. Please take a look at it in 83afd35.
requests_mock.register_uri('GET', "https://api.intercom.io/conversations", json=_conversations_response( | ||
conversations=[ | ||
{"id":"151272900026677","updated_at":1650988600}, | ||
{"id":"151272900026666","updated_at":1650988500} | ||
], | ||
next_url="https://api.intercom.io/conversations?per_page=2&page=2" | ||
)) | ||
requests_mock.register_uri('GET', "https://api.intercom.io/conversations?per_page=2&page=2", json=_conversations_response( | ||
conversations=[ | ||
{"id":"151272900026466","updated_at":1650988450}, | ||
{"id":"151272900026680","updated_at":1650988100}, # Older than state, won't be processed | ||
] | ||
)) | ||
requests_mock.register_uri('GET', "https://api.intercom.io/conversations/151272900026677", json=_conversation_response( | ||
conversation_id="151272900026677", | ||
conversation_parts=[{"id": "13740311961","updated_at":1650988300},{"id": "13740311962","updated_at":1650988450}] | ||
)) | ||
requests_mock.register_uri('GET', "https://api.intercom.io/conversations/151272900026666", json=_conversation_response( | ||
conversation_id="151272900026666", | ||
conversation_parts=[{"id": "13740311955","updated_at":1650988150},{"id": "13740312056","updated_at":1650988500}] | ||
)) | ||
requests_mock.register_uri('GET', "https://api.intercom.io/conversations/151272900026466", json=_conversation_response( | ||
conversation_id="151272900026466", | ||
conversation_parts=[{"id": "13740311970","updated_at":1650988600}] | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would declare this request flow in a pytest
fixture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to fixture in b1a6402.
)) | ||
|
||
record_count = 0 | ||
stream1 = ConversationParts(authenticator=NoAuth()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream1 = ConversationParts(authenticator=NoAuth()) | |
conversation_parts = ConversationParts(authenticator=NoAuth()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed variable in b1a6402.
record_count += len(list(stream1.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=state))) | ||
|
||
|
||
assert record_count == 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of:
- Creating/Generating a list of conversations / conversation part record in a pytest fixture. (
test_responses
) - Registering requests mock by looping on this
test_responses
fixture - Build a list of
expected_records
by filtering records fromtest_responses
that are older than your stat. - Assert that the
expected_records
== the output for read records after iteration on stream slices?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I updated the test to reflect that approach in b1a6402.
Hi @alafanechere. Thank you for reviewing! I've updated my solution and it looks simpler now. I hope it makes more sense. Please take a look at it again when you get a chance. |
/test connector=connectors/source-intercom
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for these changes, but to be honest I don't really get the original problem you are trying to solve and why this implementation fixes it 🤔 .
Since the ConversationParts stream is a sub-stream of Conversations, this filtering mechanism can't be done. That is because, given a list of conversations sorted by updated_at, the aggregate list of conversation parts is not guaranteed to be sorted.
Could you please elaborate a bit more on this or rephrase it?
I'm wondering if your new tests pass with the previous implementation.
When you say "fix filtering of conversation_parts" you mean that you only want to keep parent conversation records that have conversation_parts
? You could do this check in an overridden stream_slices
in ConverationParts
: on the iteration on parent records you could only yield those that have a conversation part.
Pure code feedback:
I've one concern about performance.
Using the HttpSubtream.stream_slices
will run a read records with full refresh on Conversation
whereas with ChildStreamMixin
the same sync mode as the current sync for ConversationParts
is used.
What do you think about overriding stream_slices
on conversation parts to mimic the behavior of ChildStreamMixin.stream_slices
. (Asking this makes me think that you will fall back on the original implementation 😆 ) .
I would also suggest to set Conversations.use_cache = True
to leverage the internal caching mechanism of the CDK and avoid unnecessary request to Intercom's API if Conversation
was already replicated in the same sync.
Finally, to what extent do you think that CompanySegments
can benefit from your current changes?
airbyte-integrations/connectors/source-intercom/source_intercom/source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-intercom/source_intercom/source.py
Outdated
Show resolved
Hide resolved
] | ||
|
||
|
||
def _conversations_response(conversations, next_url = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _conversations_response(conversations, next_url = None): | |
def build_conversations_response(conversations, next_url = None): |
Suggesting to add a verb prefix as this is a function and not an attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 03f89b6.
|
||
assert record_count == 2 | ||
def _conversation_response(conversation_id, conversation_parts): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 03f89b6.
@alafanechere Sure. This is a sample run (from master branch). I'm trying to get conversation_parts without passing a state, so I expect it to output all records (which is around 3K in my account). However, it stops getting records at some point, because the stream collects state for each record and the aggregate list of conversation_parts is not sorted. So, it gets the conversation_parts of the first conversation and, if conversation_parts of the second conversation have
I am not sure, as that is a stream I haven't used before. I was concerned after you commented it could be affected by my initial changes, so that I preferred to keep it untouched. |
Ok thank you for the explanation 🙏 |
@alafanechere I've changed my approach again, as discussed in Slack, to override the |
airbyte-integrations/connectors/source-intercom/source_intercom/source.py
Show resolved
Hide resolved
airbyte-integrations/connectors/source-intercom/source_intercom/source.py
Show resolved
Hide resolved
/publish connector=connectors/source-intercom auto-bump-version=false
|
Thanks @lgomezm for this contrib and your patience 😄 |
What
updated_at
field in descending order.updated_at
value is older than the current stream state.ConversationParts
stream is a sub-stream ofConversations
, this filtering mechanism can't be done. That is because, given a list of conversations sorted byupdated_at
, the aggregate list of conversation parts is not guaranteed to be sorted.How
ConversationParts
, we still rely on filtering objects from the parent stream (Conversations
).updated_at
value is greater than or equal to the current stream state.Recommended reading order
source.py
unit_test.py
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.