Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New source: AWS CloudTrail #4122

Merged
merged 19 commits into from
Jun 25, 2021

Conversation

yaroslav-dudar
Copy link
Contributor

@yaroslav-dudar yaroslav-dudar commented Jun 15, 2021

What

#2418 - AWS CloudTrail source

How

Connector implemented using Boto3 library

Connector supports management events as a stream. There no API to generate Insight event (this types of events generated automatically by AWS ML algorithms) so it's hard to implement acceptance tests for it.

Added records_limit option as there may be thouthands of event in account and this will lead to slow execution of acceptance tests

Recommended reading order

  1. x.java
  2. y.python

Pre-merge Checklist

Expand the checklist which is relevant for this PR.

Connector checklist

  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • Secrets are annotated with airbyte_secret in output spec
  • Unit & integration tests added as appropriate (and are passing)
    • Community members: please provide proof of this succeeding locally e.g: screenshot or copy-paste acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • /test connector=connectors/<name> command as documented here is passing.
    • Community members can skip this, Airbyters will run this for you.
  • Code reviews completed
  • Credentials added to Github CI if needed and not already present. instructions for injecting secrets into CI.
  • Documentation updated
    • README
    • CHANGELOG.md
    • Reference docs in the docs/integrations/ directory.
    • Build status added to build page
  • Build is successful
  • Connector version bumped like described here
  • New Connector version released on Dockerhub by running the /publish command described here
  • No major blockers
  • PR merged into master branch
  • Follow up tickets have been created
  • Associated tickets have been closed & stakeholders notified

Connector Generator checklist

  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed.

@github-actions github-actions bot added area/connectors Connector related issues area/documentation Improvements or additions to documentation labels Jun 15, 2021
@yaroslav-dudar yaroslav-dudar changed the title 🎉 Add AWS CloudTrail source connector 🎉 New source: AWS CloudTrail Jun 15, 2021
@bazarnov
Copy link
Collaborator

bazarnov commented Jun 15, 2021

/test connector=connectors/source-aws-cloudtrail

🕑 connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/939566443
❌ connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/939566443

Copy link
Collaborator

@bazarnov bazarnov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got a small question about fetch date periods, could we incorporate the end_date for streams to limit the amount of data if there would be a demand to pull out data for a specific period?

Also, the /test command is failing on the integration-tests because of the credentials on the CI side.
Did you edit ci_credentials.sh with the credentials?
Did you add the test credentials to Github Secrets? If not, please contact @tuliren for this.

docs/integrations/sources/aws-cloudtrail.md Outdated Show resolved Hide resolved
@yaroslav-dudar
Copy link
Contributor Author

yaroslav-dudar commented Jun 16, 2021

/test connector=connectors/source-aws-cloudtrail

🕑 connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/941961027
✅ connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/941961027

@yaroslav-dudar yaroslav-dudar requested a review from bazarnov June 16, 2021 15:47
@yaroslav-dudar
Copy link
Contributor Author

@subodh1810 you can put you thoughts about records_limit option here

@yaroslav-dudar
Copy link
Contributor Author

For example I can remove records_limit from spec.json (so it will not be available in UI) but parse it if we receive this in config --config secrets/config.json . Is it accaptable ?

@tuliren
Copy link
Contributor

tuliren commented Jun 17, 2021

For example I can remove records_limit from spec.json (so it will not be available in UI) but parse it if we receive this in config --config secrets/config.json. Is it accaptable ?

This sounds reasonable if this parameter is only meant for testing.

@yaroslav-dudar yaroslav-dudar marked this pull request as ready for review June 22, 2021 12:19

def parse_response(self, response: dict, **kwargs) -> Iterable[Mapping]:
for event in response[self.data_field]:
# boto3 converts timestamps to datetime object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good job leaving a comment here to explain the non-obvious 👍🏼

def limit(self):
return super().limit

# API does not support read in ascending order
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should use stream slicing to set boundaries on start/end dates, and sync one day at a time? If there is a lot of events it would make fault tolerance much better since we can save state continuously

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a problem if we fail in the middle of operation. it's may lead to data loss as recent event comes first. State always will be with last date

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we fail in the middle of syncing a particular slice, then state is not saved. State is only saved once the full slice has been synced. See Stream Slicing. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I got your idea

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada added stream_slices method for the stream and some unit tests



class IncrementalAwsCloudtrailStream(AwsCloudtrailStream, ABC):
@property
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not needed since it self.limit would already be equal to super().limit


def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
# Increment record time to avoid data dupication in the next syncs
record_time = latest_record[self.time_field] + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's ok to duplicate one record. Airbyte offers at-least-once delivery guarantee not exactly-once. This will almost never happen but better be safe (not add 1, potentially duplicate records, but definitely capture all of them) than miss records

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but be aware that each new incremental sync will produce at least one duplicated record

except botocore.exceptions.ClientError as err:
# returns no records if either the start time occurs after the end time
# or the time range is outside the range of possible values.
if err.response["Error"]["Code"] == "InvalidTimeRangeException":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this throw an exception? this seems like a legitimate error which would result in no records returning from the sync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see. I think it is better that we throw an error in this case rather than potentially silently fail in production just to make a test succeed.

If you comment out the abnormal state value in the YML then that test will be skipped. suggest we do that

Copy link
Contributor Author

@yaroslav-dudar yaroslav-dudar Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like boto3/ or API by iself add EndTime parameter by default with current time

while not pagination_complete:
params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
try:
response = self.send_request(**params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_request should be defined as an abstract method on this class to make it clear child classes must implement it

* AWS Secret access key
* AWS region name

### Setup guide
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a section for the changelog like here

changelogs now live in the external facing docs


### Performance considerations

The rate of lookup requests for `events` stream is limited to two per second, per account, per region. If this limit is exceeded, a throttling error occurs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the connector retries it though right? so there shouldn't be an error under normal circumstances. If so I suggest adding This connector gracefully retries when encountering a throttling error. However if the errors continue repeatedly after multiple retries (for example if you setup many instances of this connector using the same account and region), the connector sync will fail.

| Feature | Supported?\(Yes/No\) | Notes |
| :--- | :--- | :--- |
| Full Refresh Sync | Yes | |
| Incremental - Append Sync | Yes | |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incremental-append is the old wording. Incremental is the correct new wording. You might need to update the docs template.

Suggested change
| Incremental - Append Sync | Yes | |
| Incremental Sync | Yes | |


The AWS CloudTrail source supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run.

This Source Connector is based on a [Boto3 CloudTrail](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudtrail.html).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is helpful for future reference and debugging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's present in setup.py so a developer should be able to see it. Or are you saying a user might actually be interested in it? Fair enough. Apologies for the back and forth yaro, mind adding it back in?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is not in the setup.py in this PR. Are you referring to a file outside of this PR?

Right, I think a user may need it for reference. There are usually detailed explanations of each parameter in this kind of documentation, and we cannot replicate all of those information in spec.json.


The AWS CloudTrail source supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run.

This Source Connector is based on a [Boto3 CloudTrail](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudtrail.html).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is helpful for future reference and debugging.

@yaroslav-dudar
Copy link
Contributor Author

yaroslav-dudar commented Jun 23, 2021

/test connector=connectors/source-aws-cloudtrail

🕑 connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/963961553
✅ connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/963961553

Copy link
Contributor

@sherifnada sherifnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add the connector to docs/SUMMARY.md and airbyte-integrations/builds.md?


| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.0 | 2021-06-23 | [4122](https://github.com/airbytehq/airbyte/pull/4122) | LookupEvent API |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| 0.1.0 | 2021-06-23 | [4122](https://github.com/airbytehq/airbyte/pull/4122) | LookupEvent API |
| 0.1.0 | 2021-06-23 | [4122](https://github.com/airbytehq/airbyte/pull/4122) | Initial release supporting the LookupEvent API |

@@ -0,0 +1,38 @@
{
"documentationUrl": "https://docsurl.com",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"documentationUrl": "https://docsurl.com",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/aws-cloudtrail",

if next_page_token:
params["NextToken"] = next_page_token

if not stream_slice:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suggest moving these lines (86-95) to the ManagementEvents class since it is where slices are defined. Something like:

def request_params(): 
   params = super().request_params()
   if stream_slice: 
       ....

   return params

stream_state=stream_state,
)

assert len(slices) < ManagementEvents.data_lifetime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think this check is useful? it will start failling if we make the slicing period smaller than 24h which doesn't seem like an important detail

sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field
)

assert len(slices) >= ManagementEvents.data_lifetime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we care how many slices there are? we should ideally just test for the invariants which is that slices are contiguous and mutually exclusive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

"start_date": "2020-05-01",
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also add a test case to assert that if start date is larger than state then we start from start date


assert len(slices) < ManagementEvents.data_lifetime
# checks that start time not more than 15 days before now
assert slices[0]["StartTime"] >= stream_state["EventTime"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be == not >=?

"""
cursor_data = stream_state.get(self.cursor_field) if stream_state else None
# ignores state if start_date option is higher than cursor
if cursor_data and cursor_data > self.start_date:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels like we can make this block a lot simpler if we just do:

start_time = max(now - 90 days, self.start_date, cursor_data or 0)

and I think in that case we don't really need this if/else, and we don't need normalize_start_time

{
"StartTime": last_start_time,
# decrement second as API include records with specified StartTime and EndTime
"EndTime": last_start_time + self.day_in_seconds - 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about using a library like pendulum to do date math to avoid hardcoding constants?
it would let us do something like

last_start_time.add(days=1).int_timestamp

@yaroslav-dudar
Copy link
Contributor Author

yaroslav-dudar commented Jun 24, 2021

/test connector=connectors/source-aws-cloudtrail

🕑 connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/968219941
❌ connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/968219941

@yaroslav-dudar
Copy link
Contributor Author

yaroslav-dudar commented Jun 24, 2021

/test connector=connectors/source-aws-cloudtrail

🕑 connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/968328617
✅ connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/968328617

@yaroslav-dudar
Copy link
Contributor Author

yaroslav-dudar commented Jun 24, 2021

@sherifnada FYI I fixed the test issue

@yaroslav-dudar
Copy link
Contributor Author

yaroslav-dudar commented Jun 25, 2021

/publish connector=connectors/source-aws-cloudtrail

🕑 connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/970495990
✅ connectors/source-aws-cloudtrail https://github.com/airbytehq/airbyte/actions/runs/970495990

@yaroslav-dudar yaroslav-dudar merged commit 2ccb1c2 into master Jun 25, 2021
@yaroslav-dudar yaroslav-dudar deleted the yaroslav-dudar/2418-source-aws-cloudtrail branch June 25, 2021 08:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/source/aws-cloudtrail connectors/sources-api
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants