Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Intercom: Fixed filtering of conversation_parts #12374

Conversation

lgomezm
Copy link
Contributor

@lgomezm lgomezm commented Apr 26, 2022

What

  • The Intercom source gets objects sorted by the updated_at field in descending order.
  • In order to achieve incremental reads, records are filtered if their updated_at value is older than the current stream state.
  • Since the ConversationParts stream is a sub-stream of Conversations, this filtering mechanism can't be done. That is because, given a list of conversations sorted by updated_at, the aggregate list of conversation parts is not guaranteed to be sorted.

How

  • For ConversationParts, we still rely on filtering objects from the parent stream (Conversations).
  • It will sync all conversation parts from the conversations whose updated_at value is greater than or equal to the current stream state.
  • This might involve duplicate records between syncs, but we're working under the principal that Airbyte is at-least-once delivery, but skipping records is data loss.

Recommended reading order

  1. source.py
  2. unit_test.py

🚨 User Impact 🚨

Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/SUMMARY.md
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the connector is published, connector added to connector index as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here
Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub and connector version bumped by running the /publish command described here
Connector Generator
  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed

Tests

Unit

Put your unit tests output here.

Integration

Put your integration tests output here.

Acceptance

Put your acceptance tests output here.

@github-actions github-actions bot added the area/connectors Connector related issues label Apr 26, 2022
@alafanechere alafanechere self-assigned this Apr 26, 2022
@alafanechere
Copy link
Contributor

Thank you @lgomezm, I'm going to review this week.

@alafanechere
Copy link
Contributor

alafanechere commented Apr 26, 2022

/test connector=connectors/source-hubspot

🕑 connectors/source-hubspot https://github.com/airbytehq/airbyte/actions/runs/2229701987
✅ connectors/source-hubspot https://github.com/airbytehq/airbyte/actions/runs/2229701987
Python tests coverage:

Name                                                 Stmts   Miss  Cover
------------------------------------------------------------------------
source_acceptance_test/utils/__init__.py                 6      0   100%
source_acceptance_test/tests/__init__.py                 4      0   100%
source_acceptance_test/__init__.py                       2      0   100%
source_acceptance_test/tests/test_full_refresh.py       52      2    96%
source_acceptance_test/utils/asserts.py                 37      2    95%
source_acceptance_test/config.py                        74      6    92%
source_acceptance_test/utils/json_schema_helper.py     105     13    88%
source_acceptance_test/utils/common.py                  80     17    79%
source_acceptance_test/utils/compare.py                 62     23    63%
source_acceptance_test/tests/test_core.py              285    106    63%
source_acceptance_test/base.py                          10      4    60%
source_acceptance_test/utils/connector_runner.py       110     48    56%
source_acceptance_test/tests/test_incremental.py        69     38    45%
------------------------------------------------------------------------
TOTAL                                                  896    259    71%
Name                         Stmts   Miss  Cover
------------------------------------------------
source_hubspot/errors.py         6      0   100%
source_hubspot/__init__.py       2      0   100%
source_hubspot/streams.py      707     56    92%
source_hubspot/source.py        69     12    83%
------------------------------------------------
TOTAL                          784     68    91%

@lgomezm
Copy link
Contributor Author

lgomezm commented Apr 26, 2022

/test connector=connectors/source-hubspot

🕑 connectors/source-hubspot https://github.com/airbytehq/airbyte/actions/runs/2229701987
✅ connectors/source-hubspot https://github.com/airbytehq/airbyte/actions/runs/2229701987
Python tests coverage:

Hi @alafanechere. I think this ran for a different connector, didn't it?.

@alafanechere
Copy link
Contributor

Hi @alafanechere. I think this ran for a different connector, didn't it?.

Oh yes sorry! I'm too used to your contributions to the hubspot source.

@alafanechere
Copy link
Contributor

alafanechere commented Apr 27, 2022

/test connector=connectors/source-intercom

🕑 connectors/source-intercom https://github.com/airbytehq/airbyte/actions/runs/2232693290
✅ connectors/source-intercom https://github.com/airbytehq/airbyte/actions/runs/2232693290
Python tests coverage:

Name                                                 Stmts   Miss  Cover
------------------------------------------------------------------------
source_acceptance_test/utils/__init__.py                 6      0   100%
source_acceptance_test/tests/__init__.py                 4      0   100%
source_acceptance_test/__init__.py                       2      0   100%
source_acceptance_test/tests/test_full_refresh.py       52      2    96%
source_acceptance_test/utils/asserts.py                 37      2    95%
source_acceptance_test/config.py                        74      6    92%
source_acceptance_test/utils/json_schema_helper.py     105     13    88%
source_acceptance_test/utils/common.py                  80     17    79%
source_acceptance_test/utils/compare.py                 62     23    63%
source_acceptance_test/tests/test_core.py              285    106    63%
source_acceptance_test/base.py                          10      4    60%
source_acceptance_test/utils/connector_runner.py       110     48    56%
source_acceptance_test/tests/test_incremental.py        69     38    45%
------------------------------------------------------------------------
TOTAL                                                  896    259    71%
Name                          Stmts   Miss  Cover
-------------------------------------------------
source_intercom/__init__.py       2      0   100%
source_intercom/source.py       235     44    81%
-------------------------------------------------
TOTAL                           237     44    81%
Name                          Stmts   Miss  Cover
-------------------------------------------------
source_intercom/__init__.py       2      0   100%
source_intercom/source.py       235     19    92%
-------------------------------------------------
TOTAL                           237     19    92%

Copy link
Contributor

@alafanechere alafanechere left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @lgomezm, could you please give more details on how you are solving the original issue in your PR? I'm not sure I totally grasped how your changes fix the problem.

@@ -88,6 +88,9 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
else:
yield from data

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, the purpose of creating a separate get_records_field function is to enable calling this logic without the stream state argument?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
def parse_response(self, response: requests.Response, *args, **kwargs) -> Iterable[Mapping]:

I'd rather keep the logic in the same function but change its signature to allow to call it with the single response argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right, but I changed my approach in 83afd35.

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:


def get_records_field(self, response: requests.Response) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_records_field(self, response: requests.Response) -> Any:
def filter_record_fields(self, response: requests.Response) -> Any:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this line change in 83afd35.

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
records = super().parse_response(response, stream_state, **kwargs)
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
records = super().get_records_field(response=response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
records = super().get_records_field(response=response)
records = super().parse_response(response)

or if you prefer to keep a separate get_records_field method:

Suggested change
records = super().get_records_field(response=response)
records = self.get_records_field(response)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted these line changes in 83afd35.

Comment on lines 139 to 145
def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
parent_stream = self.parent_stream_class(authenticator=self.authenticator, start_date=self.start_date)
for slice in parent_stream.stream_slices(sync_mode=sync_mode):
for slice in parent_stream.stream_slices(sync_mode=sync_mode, stream_state=stream_state):
for item in self.parent_stream_class(
authenticator=self.authenticator, start_date=self.start_date, stream_slice=slice
).read_records(sync_mode=sync_mode):
).read_records(sync_mode=sync_mode, stream_state=stream_state):
yield {"id": item["id"]}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm under the impression the fix is there. Could you elaborate a bit on why it fixes your original issue? I'd love to make sure this does not introduce regression on other streams that inherit from ChildStreamMixin (such as CompanySegments).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely changed my approach. I had originally introduced this change in ChildStreamMixin so that it filters out conversations (from parent stream) if they were updated before the current stream state. You're right in that it might introduce regression issues.

Instead of updating ChildStreamMixin, I made ConversationParts extend HttpSubStream, which is an implementation of HttpStream that handles computing of slices. That way, I don't have to modify ChildStreamMixin and we handle filtering/parsing response correctly. Please take a look at it in 83afd35.

Comment on lines 253 to 277
requests_mock.register_uri('GET', "https://api.intercom.io/conversations", json=_conversations_response(
conversations=[
{"id":"151272900026677","updated_at":1650988600},
{"id":"151272900026666","updated_at":1650988500}
],
next_url="https://api.intercom.io/conversations?per_page=2&page=2"
))
requests_mock.register_uri('GET', "https://api.intercom.io/conversations?per_page=2&page=2", json=_conversations_response(
conversations=[
{"id":"151272900026466","updated_at":1650988450},
{"id":"151272900026680","updated_at":1650988100}, # Older than state, won't be processed
]
))
requests_mock.register_uri('GET', "https://api.intercom.io/conversations/151272900026677", json=_conversation_response(
conversation_id="151272900026677",
conversation_parts=[{"id": "13740311961","updated_at":1650988300},{"id": "13740311962","updated_at":1650988450}]
))
requests_mock.register_uri('GET', "https://api.intercom.io/conversations/151272900026666", json=_conversation_response(
conversation_id="151272900026666",
conversation_parts=[{"id": "13740311955","updated_at":1650988150},{"id": "13740312056","updated_at":1650988500}]
))
requests_mock.register_uri('GET', "https://api.intercom.io/conversations/151272900026466", json=_conversation_response(
conversation_id="151272900026466",
conversation_parts=[{"id": "13740311970","updated_at":1650988600}]
))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would declare this request flow in a pytestfixture.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to fixture in b1a6402.

))

record_count = 0
stream1 = ConversationParts(authenticator=NoAuth())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
stream1 = ConversationParts(authenticator=NoAuth())
conversation_parts = ConversationParts(authenticator=NoAuth())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed variable in b1a6402.

record_count += len(list(stream1.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=state)))


assert record_count == 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of:

  • Creating/Generating a list of conversations / conversation part record in a pytest fixture. (test_responses)
  • Registering requests mock by looping on this test_responses fixture
  • Build a list of expected_records by filtering records from test_responses that are older than your stat.
  • Assert that the expected_records == the output for read records after iteration on stream slices?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I updated the test to reflect that approach in b1a6402.

@lgomezm
Copy link
Contributor Author

lgomezm commented Apr 27, 2022

Hi @alafanechere. Thank you for reviewing! I've updated my solution and it looks simpler now. I hope it makes more sense. Please take a look at it again when you get a chance.

@alafanechere
Copy link
Contributor

alafanechere commented Apr 28, 2022

/test connector=connectors/source-intercom

🕑 connectors/source-intercom https://github.com/airbytehq/airbyte/actions/runs/2237758332
✅ connectors/source-intercom https://github.com/airbytehq/airbyte/actions/runs/2237758332
Python tests coverage:

Name                                                 Stmts   Miss  Cover
------------------------------------------------------------------------
source_acceptance_test/utils/__init__.py                 6      0   100%
source_acceptance_test/tests/__init__.py                 4      0   100%
source_acceptance_test/__init__.py                       2      0   100%
source_acceptance_test/tests/test_full_refresh.py       52      2    96%
source_acceptance_test/utils/asserts.py                 37      2    95%
source_acceptance_test/config.py                        74      6    92%
source_acceptance_test/utils/json_schema_helper.py     105     13    88%
source_acceptance_test/utils/common.py                  80     17    79%
source_acceptance_test/utils/compare.py                 62     23    63%
source_acceptance_test/tests/test_core.py              285    106    63%
source_acceptance_test/base.py                          10      4    60%
source_acceptance_test/utils/connector_runner.py       110     48    56%
source_acceptance_test/tests/test_incremental.py        69     38    45%
------------------------------------------------------------------------
TOTAL                                                  896    259    71%
Name                          Stmts   Miss  Cover
-------------------------------------------------
source_intercom/__init__.py       2      0   100%
source_intercom/source.py       233     48    79%
-------------------------------------------------
TOTAL                           235     48    80%
Name                          Stmts   Miss  Cover
-------------------------------------------------
source_intercom/__init__.py       2      0   100%
source_intercom/source.py       233     19    92%
-------------------------------------------------
TOTAL                           235     19    92%

Copy link
Contributor

@alafanechere alafanechere left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for these changes, but to be honest I don't really get the original problem you are trying to solve and why this implementation fixes it 🤔 .

Since the ConversationParts stream is a sub-stream of Conversations, this filtering mechanism can't be done. That is because, given a list of conversations sorted by updated_at, the aggregate list of conversation parts is not guaranteed to be sorted.

Could you please elaborate a bit more on this or rephrase it?
I'm wondering if your new tests pass with the previous implementation.

When you say "fix filtering of conversation_parts" you mean that you only want to keep parent conversation records that have conversation_parts? You could do this check in an overridden stream_slices in ConverationParts: on the iteration on parent records you could only yield those that have a conversation part.


Pure code feedback:
I've one concern about performance.
Using the HttpSubtream.stream_slices will run a read records with full refresh on Conversation whereas with ChildStreamMixin the same sync mode as the current sync for ConversationParts is used.
What do you think about overriding stream_slices on conversation parts to mimic the behavior of ChildStreamMixin.stream_slices. (Asking this makes me think that you will fall back on the original implementation 😆 ) .
I would also suggest to set Conversations.use_cache = True to leverage the internal caching mechanism of the CDK and avoid unnecessary request to Intercom's API if Conversation was already replicated in the same sync.

Finally, to what extent do you think that CompanySegments can benefit from your current changes?

]


def _conversations_response(conversations, next_url = None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _conversations_response(conversations, next_url = None):
def build_conversations_response(conversations, next_url = None):

Suggesting to add a verb prefix as this is a function and not an attribute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 03f89b6.


assert record_count == 2
def _conversation_response(conversation_id, conversation_parts):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 03f89b6.

@lgomezm
Copy link
Contributor Author

lgomezm commented Apr 28, 2022

Could you please elaborate a bit more on this or rephrase it? I'm wondering if your new tests pass with the previous implementation.

@alafanechere Sure. This is a sample run (from master branch). I'm trying to get conversation_parts without passing a state, so I expect it to output all records (which is around 3K in my account). However, it stops getting records at some point, because the stream collects state for each record and the aggregate list of conversation_parts is not sorted. So, it gets the conversation_parts of the first conversation and, if conversation_parts of the second conversation have updated_at before current collected stream state, they won't be returned, which is what we can see:
conversation_parts.txt

Finally, to what extent do you think that CompanySegments can benefit from your current changes?

I am not sure, as that is a stream I haven't used before. I was concerned after you commented it could be affected by my initial changes, so that I preferred to keep it untouched.

@alafanechere
Copy link
Contributor

So, it gets the conversation_parts of the first conversation and, if conversation_parts of the second conversation have updated_at before current collected stream state, they won't be returned, which is what we can see

Ok thank you for the explanation 🙏
I'm under the impression that the culprit of this behavior is IncrementalIntercomStream.filter_by_state. You could set a boolean state_filtering class attribute and create an if else statement in IncrementalIntercomStream.parse_response to use it or not. And set Conversation.state_filtering = False. This way you can keep the original implementation using ChildStreamMixin, and my concerns about performance will vanish 😄 .

@lgomezm
Copy link
Contributor Author

lgomezm commented Apr 28, 2022

@alafanechere I've changed my approach again, as discussed in Slack, to override the stream_slices method in ConversationParts and use sync_mode accordingly. Please take a look again when you get a chance 🙏

@github-actions github-actions bot added the area/documentation Improvements or additions to documentation label Apr 29, 2022
@alafanechere
Copy link
Contributor

alafanechere commented Apr 29, 2022

/publish connector=connectors/source-intercom auto-bump-version=false

🕑 connectors/source-intercom https://github.com/airbytehq/airbyte/actions/runs/2245624316
🚀 Successfully published connectors/source-intercom
✅ connectors/source-intercom https://github.com/airbytehq/airbyte/actions/runs/2245624316

@alafanechere alafanechere merged commit 1642630 into airbytehq:master Apr 29, 2022
@alafanechere
Copy link
Contributor

Thanks @lgomezm for this contrib and your patience 😄

@lgomezm lgomezm deleted the lgomez/fix_intercom_conversation_parts_filtering branch April 29, 2022 13:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation community
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants