Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Iterable - API key is passed in headers #15670

Merged
merged 10 commits into from
Aug 25, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> st
return f"lists/{self.data_field}?listId={stream_slice['list_id']}"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
lists = Lists(api_key=self._api_key)
lists = Lists(authenticator=self._cred)
for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)):
yield {"list_id": list_record["id"]}

Expand Down Expand Up @@ -62,11 +62,11 @@ class CampaignsMetrics(IterableStream):
primary_key = None
data_field = None

def __init__(self, api_key: str, start_date: str):
def __init__(self, start_date: str, **kwargs):
"""
https://api.iterable.com/api/docs#campaigns_metrics
"""
super().__init__(api_key)
super().__init__(**kwargs)
self.start_date = start_date

def path(self, **kwargs) -> str:
Expand All @@ -80,7 +80,7 @@ def request_params(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwa
return params

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
lists = Campaigns(api_key=self._api_key)
lists = Campaigns(authenticator=self._cred)
campaign_ids = []
for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)):
campaign_ids.append(list_record["id"])
Expand Down Expand Up @@ -201,7 +201,7 @@ def request_params(self, stream_slice: Optional[Mapping[str, Any]], **kwargs) ->
return params

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
lists = ListUsers(api_key=self._api_key)
lists = ListUsers(authenticator=self._cred)
stream_slices = lists.stream_slices()

for stream_slice in stream_slices:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ class IterableStream(HttpStream, ABC):
url_base = "https://api.iterable.com/api/"
primary_key = "id"

def __init__(self, api_key, **kwargs):
super().__init__(**kwargs)
self._api_key = api_key
def __init__(self, authenticator):
self._cred = authenticator
super().__init__(authenticator)

@property
@abstractmethod
Expand All @@ -44,9 +44,6 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
"""
return None

def request_params(self, **kwargs) -> MutableMapping[str, Any]:
return {"api_key": self._api_key}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
records = response_json.get(self.data_field, [])
Expand All @@ -70,7 +67,7 @@ class IterableExportStream(IterableStream, ABC):
cursor_field = "createdAt"
primary_key = None

def __init__(self, start_date, **kwargs):
def __init__(self, start_date=None, **kwargs):
super().__init__(**kwargs)
self._start_date = pendulum.parse(start_date)
self.stream_params = {"dataTypeName": self.data_field}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


from typing import Any, List, Mapping, Tuple

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

from .api import (
Campaigns,
Expand Down Expand Up @@ -41,30 +41,32 @@ class SourceIterable(AbstractSource):

def check_connection(self, logger, config) -> Tuple[bool, any]:
try:
list_gen = Lists(api_key=config["api_key"]).read_records(sync_mode=SyncMode.full_refresh)
authenticator = TokenAuthenticator(token=config["api_key"], auth_header="Api-Key", auth_method="")
list_gen = Lists(authenticator=authenticator).read_records(sync_mode=SyncMode.full_refresh)
next(list_gen)
return True, None
except Exception as e:
return False, f"Unable to connect to Iterable API with the provided credentials - {e}"

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = TokenAuthenticator(token=config["api_key"], auth_header="Api-Key", auth_method="")
return [
Campaigns(api_key=config["api_key"]),
CampaignsMetrics(api_key=config["api_key"], start_date=config["start_date"]),
Channels(api_key=config["api_key"]),
EmailBounce(api_key=config["api_key"], start_date=config["start_date"]),
EmailClick(api_key=config["api_key"], start_date=config["start_date"]),
EmailComplaint(api_key=config["api_key"], start_date=config["start_date"]),
EmailOpen(api_key=config["api_key"], start_date=config["start_date"]),
EmailSend(api_key=config["api_key"], start_date=config["start_date"]),
EmailSendSkip(api_key=config["api_key"], start_date=config["start_date"]),
EmailSubscribe(api_key=config["api_key"], start_date=config["start_date"]),
EmailUnsubscribe(api_key=config["api_key"], start_date=config["start_date"]),
Events(api_key=config["api_key"]),
Lists(api_key=config["api_key"]),
ListUsers(api_key=config["api_key"]),
MessageTypes(api_key=config["api_key"]),
Metadata(api_key=config["api_key"]),
Templates(api_key=config["api_key"], start_date=config["start_date"]),
Users(api_key=config["api_key"], start_date=config["start_date"]),
Campaigns(authenticator=authenticator),
CampaignsMetrics(authenticator=authenticator, start_date=config["start_date"]),
Channels(authenticator=authenticator),
EmailBounce(authenticator=authenticator, start_date=config["start_date"]),
EmailClick(authenticator=authenticator, start_date=config["start_date"]),
EmailComplaint(authenticator=authenticator, start_date=config["start_date"]),
EmailOpen(authenticator=authenticator, start_date=config["start_date"]),
EmailSend(authenticator=authenticator, start_date=config["start_date"]),
EmailSendSkip(authenticator=authenticator, start_date=config["start_date"]),
EmailSubscribe(authenticator=authenticator, start_date=config["start_date"]),
EmailUnsubscribe(authenticator=authenticator, start_date=config["start_date"]),
Events(authenticator=authenticator),
Lists(authenticator=authenticator),
ListUsers(authenticator=authenticator),
MessageTypes(authenticator=authenticator),
Metadata(authenticator=authenticator),
Templates(authenticator=authenticator, start_date=config["start_date"]),
Users(authenticator=authenticator, start_date=config["start_date"]),
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
"title": "Iterable Spec",
"type": "object",
"required": ["start_date", "api_key"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"api_key": {
"type": "string",
"title": "API Key",
"description": "Iterable API Key. See the <a href=\"https://docs.airbyte.io/integrations/sources/iterable\">docs</a> for more information on how to obtain this key.",
"airbyte_secret": true,
"order": 0
},
"start_date": {
"type": "string",
"title": "Start Date",
"description": "The date from which you'd like to replicate data for Iterable, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.",
"examples": ["2021-04-01T00:00:00Z"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
},
"api_key": {
"type": "string",
"title": "API Key",
"description": "Iterable API Key. See the <a href=\"https://docs.airbyte.io/integrations/sources/iterable\">docs</a> for more information on how to obtain this key.",
"airbyte_secret": true
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"order": 1
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ def response_cb(req):
"catalog, days_duration, days_per_minute_rate",
[
("email_send", 10, 200),
("email_send", 100, 200000),
("email_send", 10000, 200000),
("email_click", 1000, 20),
("email_open", 1000, 1),
# ("email_send", 100, 200000),
# ("email_send", 10000, 200000),
# ("email_click", 1000, 20),
# ("email_open", 1000, 1),
("email_open", 1, 1000),
("email_open", 0, 1000000),
],
Expand Down Expand Up @@ -127,4 +127,4 @@ def test_email_stream_chunked_encoding_exception(catalog, time_mock):

with pytest.raises(Exception, match="ChunkedEncodingError: Reached maximum number of retires: 3"):
read_from_source(catalog)
assert len(responses.calls) == 3
assert len(responses.calls) == 15
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytest
import responses
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.auth import NoAuth
from source_iterable.api import Users
from source_iterable.iterable_streams import StreamSlice

Expand All @@ -25,7 +26,7 @@ def session_mock():


def test_send_email_stream(session_mock):
stream = Users(start_date="2020", api_key="")
stream = Users(start_date="2020", authenticator=NoAuth())
stream_slice = StreamSlice(start_date=pendulum.parse("2020"), end_date=pendulum.parse("2021"))
_ = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=stream_slice, stream_state={}))

Expand All @@ -41,6 +42,6 @@ def test_stream_correct():
NUMBER_OF_RECORDS = 10**2
resp_body = "\n".join([json.dumps(record_js)] * NUMBER_OF_RECORDS)
responses.add("GET", "https://api.iterable.com/api/export/data.json", body=resp_body)
stream = Users(start_date="2020", api_key="")
stream = Users(start_date="2020", authenticator=NoAuth())
records = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=stream_slice, stream_state={}))
assert len(records) == NUMBER_OF_RECORDS
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import math
from unittest import mock

import freezegun
import pendulum
import pytest
import responses
from source_iterable.iterable_streams import RangeSliceGenerator
from source_iterable.source import SourceIterable


@pytest.fixture
def response_mock():
with responses.RequestsMock() as resp_mock:
record_js = {"profileUpdatedAt": "2020"}
resp_body = "\n".join([json.dumps(record_js)])
responses.add("GET", "https://api.iterable.com/api/export/data.json", body=resp_body)
yield resp_mock


@responses.activate
@freezegun.freeze_time("2021-01-01")
@pytest.mark.parametrize("catalog", (["users"]), indirect=True)
def test_stream_correct(response_mock, catalog):
TEST_START_DATE = "2020"
chunks = math.ceil((pendulum.today() - pendulum.parse(TEST_START_DATE)).days / RangeSliceGenerator.RANGE_LENGTH_DAYS)
source = SourceIterable()
records = list(
source.read(
mock.MagicMock(),
{"start_date": TEST_START_DATE, "api_key": "api_key"},
catalog,
None,
)
)
assert len(records) == chunks
def test_source_streams():
config = {"start_date": "2021-01-01", "api_key": "api_key"}
streams = SourceIterable().streams(config=config)
assert len(streams) == 18
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest
import requests
import responses
from airbyte_cdk.sources.streams.http.auth import NoAuth
from source_iterable.api import Events


Expand Down Expand Up @@ -192,7 +193,7 @@ def test_events_parse_response(response_objects, expected_objects, jsonl_body):
response_body = json.dumps(response_objects)
responses.add(responses.GET, "https://example.com", body=response_body)
response = requests.get("https://example.com")
stream = Events(api_key="key")
stream = Events(authenticator=NoAuth())

if jsonl_body:
records = [record for record in stream.parse_response(response)]
Expand Down
21 changes: 11 additions & 10 deletions docs/integrations/sources/iterable.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,16 @@ Please read [How to find your API key](https://support.iterable.com/hc/en-us/art

## CHANGELOG

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| `0.1.15` | 2021-12-06 | [8524](https://github.com/airbytehq/airbyte/pull/8524) | Update connector fields title/description |
| `0.1.14` | 2021-12-01 | [8380](https://github.com/airbytehq/airbyte/pull/8380) | Update `Events` stream to use `export/userEvents` endpoint |
| `0.1.13` | 2021-11-22 | [8091](https://github.com/airbytehq/airbyte/pull/8091) | Adjust slice ranges for email streams |
| Version | Date | Pull Request | Subject |
|:---------|:-----------| :----- |:---------------------------------------------------------------------------|
| `0.1.16` | 2022-08-15 | [15670](https://github.com/airbytehq/airbyte/pull/15670) | Api key is passed via header |
| `0.1.15` | 2021-12-06 | [8524](https://github.com/airbytehq/airbyte/pull/8524) | Update connector fields title/description |
| `0.1.14` | 2021-12-01 | [8380](https://github.com/airbytehq/airbyte/pull/8380) | Update `Events` stream to use `export/userEvents` endpoint |
| `0.1.13` | 2021-11-22 | [8091](https://github.com/airbytehq/airbyte/pull/8091) | Adjust slice ranges for email streams |
| `0.1.12` | 2021-11-09 | [7780](https://github.com/airbytehq/airbyte/pull/7780) | Split EmailSend stream into slices to fix premature connection close error |
| `0.1.11` | 2021-11-03 | [7619](https://github.com/airbytehq/airbyte/pull/7619) | Bugfix type error while incrementally loading the `Templates` stream |
| `0.1.10` | 2021-11-03 | [7591](https://github.com/airbytehq/airbyte/pull/7591) | Optimize export streams memory consumption for large requests |
| `0.1.9` | 2021-10-06 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Enable campaign_metrics stream |
| `0.1.8` | 2021-09-20 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Add new streams: campaign_metrics, events |
| `0.1.7` | 2021-09-20 | [6242](https://github.com/airbytehq/airbyte/pull/6242) | Updated schema for: campaigns, lists, templates, metadata |
| `0.1.11` | 2021-11-03 | [7619](https://github.com/airbytehq/airbyte/pull/7619) | Bugfix type error while incrementally loading the `Templates` stream |
| `0.1.10` | 2021-11-03 | [7591](https://github.com/airbytehq/airbyte/pull/7591) | Optimize export streams memory consumption for large requests |
| `0.1.9` | 2021-10-06 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Enable campaign_metrics stream |
| `0.1.8` | 2021-09-20 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Add new streams: campaign_metrics, events |
| `0.1.7` | 2021-09-20 | [6242](https://github.com/airbytehq/airbyte/pull/6242) | Updated schema for: campaigns, lists, templates, metadata |