-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ANT-745] Complete Lever Integration #2
Conversation
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
def parse_response(self, response: requests.Response, stream_slice:[Mapping[str, Any]], **kwargs) -> Iterable[Mapping]: | ||
records = response.json()["data"] | ||
if not records: | ||
records = [{}] | ||
|
||
for record in records: | ||
record["opportunity"] = stream_slice["opportunity_id"] | ||
yield from records |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix for causing heartbeat issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment explaining the fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I'm going to take a longer look at this later, hopefully today. But can you explain why we have fully forked the repo? I am not against it, just want to understand the reasoning (as opposed to simply using the SDK in a separate repo, if such a thing is possible). |
I just worry about it being a maintenance burden. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just gave this a quick first pass. Would love to touch base on this later this evening (your morning) for some synchronous review.
# 1797 opportunities | ||
# 1 record read | ||
# stream_params = {"confidentiality": "all", "expand": "contact", "stage_id": "e54475bb-d3ad-43ff-b8b9-76c4fc38e78c" } | ||
|
||
# 8311 opportunities | ||
# stream_params = {"confidentiality": "all", "expand": "contact", "stage_id": "3a255cc8-0732-4bee-92bd-62acfec3572c" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dead code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed :)
# TODO: Basic incremental stream | ||
class IncrementalLeverStream(LeverStream, ABC): | ||
""" | ||
TODO fill in details of this class to implement functionality related to incremental syncs for your connector. | ||
if you do not need to implement incremental sync for any streams, remove this class. | ||
""" | ||
|
||
# TODO: Fill in to checkpoint stream reads after N records. This prevents re-reading of data if the stream fails for any reason. | ||
state_checkpoint_interval = None | ||
|
||
@property | ||
def cursor_field(self) -> str: | ||
""" | ||
TODO | ||
Override to return the cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. This is | ||
usually id or date based. This field's presence tells the framework this in an incremental stream. Required for incremental. | ||
|
||
:return str: The name of the cursor field. | ||
""" | ||
return [] | ||
|
||
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: | ||
""" | ||
Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and | ||
the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental. | ||
""" | ||
return {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are not using this, should we delete it? If we want to keep it around in case we plan on implementing it later, can we improve comment at the top?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch but I will keep it for the next upcoming PR to add incremental extracts
def path(self, stream_slice: Mapping[str, any] = None, **kwargs) -> str: | ||
return f"opportunities/{stream_slice['opportunity_id']}/{self.name}" | ||
|
||
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it permitted to type Any
as a more specific type? Or must it match Any
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved this. It's permitted to type Any
. Not being loose here, this is generated from the Airbyte script
def parse_response(self, response: requests.Response, stream_slice:[Mapping[str, Any]], **kwargs) -> Iterable[Mapping]: | ||
records = response.json()["data"] | ||
if not records: | ||
records = [{}] | ||
|
||
for record in records: | ||
record["opportunity"] = stream_slice["opportunity_id"] | ||
yield from records |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment explaining the fix?
def parse_response(self, response: requests.Response, stream_slice:Mapping[str, Any], **kwargs) -> Iterable[Mapping]: | ||
yield from response.json()["data"] | ||
|
||
class Opportunities(LeverStream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a link to the Opportunities API docs?
https://hire.lever.co/developer/documentation#list-all-opportunities
Or
https://hire.lever.co/developer/documentation#retrieve-a-single-opportunity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap added
return {"offset": response_data["next"]} | ||
|
||
def request_params( | ||
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should any
be Any
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, resolved
# super().__init__(**kwargs) | ||
# self._start_date = start_date | ||
|
||
def path(self, stream_slice: Mapping[str, any] = None, **kwargs) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be Any
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, resolved
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this file actually being used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, but i'll leave it to write tests
source = SourceLever() | ||
config_mock = MagicMock() | ||
streams = source.streams(config_mock) | ||
# TODO: replace this with your streams number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dead code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved
config_mock = MagicMock() | ||
streams = source.streams(config_mock) | ||
# TODO: replace this with your streams number | ||
expected_streams_number = 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this assertion correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed :)
def parse_response(self, response: requests.Response, stream_slice:Mapping[str, Any], **kwargs) -> Iterable[Mapping]: | ||
yield from response.json()["data"] | ||
|
||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important - need to pay special attention here @junquanlim
Requires setting up connectors with this initial state. 631152000000 = 1990-01-01, we are essentially initializing state for the first time which is necessary for our current implementation of incremental sub streams.