-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 New source: AWS CloudTrail #4122
🎉 New source: AWS CloudTrail #4122
Conversation
…udar/2418-source-aws-cloudtrail
/test connector=connectors/source-aws-cloudtrail
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got a small question about fetch date periods, could we incorporate the end_date
for streams to limit the amount of data if there would be a demand to pull out data for a specific period?
Also, the /test
command is failing on the integration-tests
because of the credentials on the CI side.
Did you edit ci_credentials.sh
with the credentials?
Did you add the test credentials to Github Secrets
? If not, please contact @tuliren for this.
/test connector=connectors/source-aws-cloudtrail
|
…udar/2418-source-aws-cloudtrail
@subodh1810 you can put you thoughts about |
For example I can remove |
This sounds reasonable if this parameter is only meant for testing. |
airbyte-integrations/connectors/source-aws-cloudtrail/integration_tests/acceptance.py
Show resolved
Hide resolved
|
||
def parse_response(self, response: dict, **kwargs) -> Iterable[Mapping]: | ||
for event in response[self.data_field]: | ||
# boto3 converts timestamps to datetime object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good job leaving a comment here to explain the non-obvious 👍🏼
def limit(self): | ||
return super().limit | ||
|
||
# API does not support read in ascending order |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should use stream slicing to set boundaries on start/end dates, and sync one day at a time? If there is a lot of events it would make fault tolerance much better since we can save state continuously
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a problem if we fail in the middle of operation. it's may lead to data loss as recent event comes first. State always will be with last date
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we fail in the middle of syncing a particular slice, then state is not saved. State is only saved once the full slice has been synced. See Stream Slicing. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I got your idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sherifnada added stream_slices
method for the stream and some unit tests
|
||
|
||
class IncrementalAwsCloudtrailStream(AwsCloudtrailStream, ABC): | ||
@property |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not needed since it self.limit
would already be equal to super().limit
|
||
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: | ||
# Increment record time to avoid data dupication in the next syncs | ||
record_time = latest_record[self.time_field] + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's ok to duplicate one record. Airbyte offers at-least-once delivery guarantee not exactly-once. This will almost never happen but better be safe (not add 1, potentially duplicate records, but definitely capture all of them) than miss records
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but be aware that each new incremental sync will produce at least one duplicated record
except botocore.exceptions.ClientError as err: | ||
# returns no records if either the start time occurs after the end time | ||
# or the time range is outside the range of possible values. | ||
if err.response["Error"]["Code"] == "InvalidTimeRangeException": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this throw an exception? this seems like a legitimate error which would result in no records returning from the sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but abnormal state test
will fail https://docs.airbyte.io/contributing-to-airbyte/building-new-connector/source-acceptance-tests#teststatewithabnormallylargevalues if we throw exception in this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I see. I think it is better that we throw an error in this case rather than potentially silently fail in production just to make a test succeed.
If you comment out the abnormal state value in the YML then that test will be skipped. suggest we do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like boto3/ or API by iself add EndTime
parameter by default with current time
while not pagination_complete: | ||
params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) | ||
try: | ||
response = self.send_request(**params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send_request
should be defined as an abstract method on this class to make it clear child classes must implement it
* AWS Secret access key | ||
* AWS region name | ||
|
||
### Setup guide |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add a section for the changelog like here
changelogs now live in the external facing docs
|
||
### Performance considerations | ||
|
||
The rate of lookup requests for `events` stream is limited to two per second, per account, per region. If this limit is exceeded, a throttling error occurs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the connector retries it though right? so there shouldn't be an error under normal circumstances. If so I suggest adding This connector gracefully retries when encountering a throttling error. However if the errors continue repeatedly after multiple retries (for example if you setup many instances of this connector using the same account and region), the connector sync will fail.
| Feature | Supported?\(Yes/No\) | Notes | | ||
| :--- | :--- | :--- | | ||
| Full Refresh Sync | Yes | | | ||
| Incremental - Append Sync | Yes | | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incremental-append is the old wording. Incremental is the correct new wording. You might need to update the docs template.
| Incremental - Append Sync | Yes | | | |
| Incremental Sync | Yes | | |
|
||
The AWS CloudTrail source supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run. | ||
|
||
This Source Connector is based on a [Boto3 CloudTrail](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudtrail.html). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is helpful for future reference and debugging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's present in setup.py
so a developer should be able to see it. Or are you saying a user might actually be interested in it? Fair enough. Apologies for the back and forth yaro, mind adding it back in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this is not in the setup.py
in this PR. Are you referring to a file outside of this PR?
Right, I think a user may need it for reference. There are usually detailed explanations of each parameter in this kind of documentation, and we cannot replicate all of those information in spec.json
.
|
||
The AWS CloudTrail source supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run. | ||
|
||
This Source Connector is based on a [Boto3 CloudTrail](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudtrail.html). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is helpful for future reference and debugging.
/test connector=connectors/source-aws-cloudtrail
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you also add the connector to docs/SUMMARY.md
and airbyte-integrations/builds.md
?
|
||
| Version | Date | Pull Request | Subject | | ||
| :------ | :-------- | :----- | :------ | | ||
| 0.1.0 | 2021-06-23 | [4122](https://github.com/airbytehq/airbyte/pull/4122) | LookupEvent API | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| 0.1.0 | 2021-06-23 | [4122](https://github.com/airbytehq/airbyte/pull/4122) | LookupEvent API | | |
| 0.1.0 | 2021-06-23 | [4122](https://github.com/airbytehq/airbyte/pull/4122) | Initial release supporting the LookupEvent API | |
@@ -0,0 +1,38 @@ | |||
{ | |||
"documentationUrl": "https://docsurl.com", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"documentationUrl": "https://docsurl.com", | |
"documentationUrl": "https://docs.airbyte.io/integrations/sources/aws-cloudtrail", |
if next_page_token: | ||
params["NextToken"] = next_page_token | ||
|
||
if not stream_slice: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i suggest moving these lines (86-95) to the ManagementEvents class since it is where slices are defined. Something like:
def request_params():
params = super().request_params()
if stream_slice:
....
return params
stream_state=stream_state, | ||
) | ||
|
||
assert len(slices) < ManagementEvents.data_lifetime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think this check is useful? it will start failling if we make the slicing period smaller than 24h which doesn't seem like an important detail
sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field | ||
) | ||
|
||
assert len(slices) >= ManagementEvents.data_lifetime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we care how many slices there are? we should ideally just test for the invariants which is that slices are contiguous and mutually exclusive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
"start_date": "2020-05-01", | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should also add a test case to assert that if start date is larger than state then we start from start date
|
||
assert len(slices) < ManagementEvents.data_lifetime | ||
# checks that start time not more than 15 days before now | ||
assert slices[0]["StartTime"] >= stream_state["EventTime"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be ==
not >=
?
""" | ||
cursor_data = stream_state.get(self.cursor_field) if stream_state else None | ||
# ignores state if start_date option is higher than cursor | ||
if cursor_data and cursor_data > self.start_date: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it feels like we can make this block a lot simpler if we just do:
start_time = max(now - 90 days, self.start_date, cursor_data or 0)
and I think in that case we don't really need this if/else, and we don't need normalize_start_time
{ | ||
"StartTime": last_start_time, | ||
# decrement second as API include records with specified StartTime and EndTime | ||
"EndTime": last_start_time + self.day_in_seconds - 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about using a library like pendulum
to do date math to avoid hardcoding constants?
it would let us do something like
last_start_time.add(days=1).int_timestamp
/test connector=connectors/source-aws-cloudtrail
|
/test connector=connectors/source-aws-cloudtrail
|
@sherifnada FYI I fixed the test issue |
…udar/2418-source-aws-cloudtrail
/publish connector=connectors/source-aws-cloudtrail
|
What
#2418 - AWS CloudTrail source
How
Connector implemented using
Boto3
libraryConnector supports
management events
as a stream. There no API to generateInsight event
(this types of events generated automatically by AWS ML algorithms) so it's hard to implement acceptance tests for it.Added
records_limit
option as there may be thouthands of event in account and this will lead to slow execution of acceptance testsRecommended reading order
x.java
y.python
Pre-merge Checklist
Expand the checklist which is relevant for this PR.
Connector checklist
airbyte_secret
in output spec./gradlew :airbyte-integrations:connectors:<name>:integrationTest
./test connector=connectors/<name>
command as documented here is passing.docs/integrations/
directory./publish
command described hereConnector Generator checklist
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changes