Skip to content

Commit

Permalink
feat: Working state for conversations stream (#45)
Browse files Browse the repository at this point in the history
* Conversation state working

* [pre-commit.ci] auto fixes

* Fixing accidental removal/add I think this came from precommit but I'm not sure

* Capabilities update

* Apply suggestions from code review

Log wording update, and efficiency increase

Co-authored-by: Edgar R. M. <[email protected]>

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Edgar R. M. <[email protected]>
  • Loading branch information
3 people authored May 3, 2023
1 parent 73b8afc commit 2c2fb52
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 27 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).
## Capabilities

* `catalog`
# `state`
* `discover`
* `about`
* `stream-maps`
Expand Down
3 changes: 3 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ plugins:
- discover
- about
- stream-maps
- state
settings:
- name: api_key
kind: password
- name: start_date
config:
start_date: "2023-04-30"
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ vcs = "git"
style = "semver"

[tool.ruff]
ignore = ["ANN101", "PLR2004", "N818"]
ignore = ["ANN101", "PLR2004", "N818", "G004", "EM101"]
select = ["ALL"]
target-version = "py37"

Expand Down
2 changes: 1 addition & 1 deletion scripts/generate_20_conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import requests

if not (dotenv.load_dotenv()):
raise Exception("Need dotenv to load auth key") # noqa: EM101, TRY002, TRY003
raise Exception("Need dotenv to load auth key") # noqa: TRY002, TRY003


def start_conversation_then_archive_it(
Expand Down
21 changes: 14 additions & 7 deletions tap_messagebird/schemas/conversation.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@
}
},
"createdDatetime": {
"type": "string"
"type": "string",
"format": "date-time"
},
"updatedDatetime": {
"type": [
"null",
"string"
]
],
"format": "date-time"
}
}
},
Expand All @@ -71,10 +73,12 @@
"type": "string"
},
"createdDatetime": {
"type": "string"
"type": "string",
"format": "date-time"
},
"updatedDatetime": {
"type": "string"
"type": "string",
"format": "date-time"
}
}
}
Expand All @@ -83,13 +87,16 @@
"type": "string"
},
"createdDatetime": {
"type": "string"
"type": "string",
"format": "date-time"
},
"updatedDatetime": {
"type": "string"
"type": "string",
"format": "date-time"
},
"lastReceivedDatetime": {
"type": "string"
"type": "string",
"format": "date-time"
},
"lastUsedChannelId": {
"type": "string"
Expand Down
69 changes: 51 additions & 18 deletions tap_messagebird/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterable
from typing import TYPE_CHECKING, Any, Iterable, cast

import pendulum

from tap_messagebird.client import MessagebirdOffsetPaginator, MessagebirdStream

Expand Down Expand Up @@ -53,7 +55,7 @@ class ConversationsStream(MessagebirdConversations):
name = "conversation"
path = "/conversations"
primary_keys = ["id"]
replication_key = None
replication_key = "lastReceivedDatetime"
# Optionally, you may also use `schema_filepath` in place of `schema`:
schema_filepath = SCHEMAS_DIR / "conversation.json"

Expand All @@ -70,11 +72,53 @@ def get_child_context(
"_sdc_conversations_id": record["id"],
}

def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
"""Return a generator of record-type dictionary objects.
Each record emitted should be a dictionary of property names to their values.
Args:
context: Stream partition or context dictionary.
Yields:
One item per (possibly processed) record in the API.
"""
starting_replication_key_value = self.get_starting_timestamp(context)
if starting_replication_key_value is None:
# Shouldn't be possible, but mypy complains
raise ValueError( # noqa: TRY003
"No starting replication key value found.",
)
starting_replication_key_value = pendulum.instance(
starting_replication_key_value,
)

for record in self.request_records(context):
transformed_record = self.post_process(record, context)
if transformed_record is None:
# Record filtered out during post_process()
continue
record_last_received_datetime: pendulum.DateTime = cast(
pendulum.DateTime,
pendulum.parse(record[self.replication_key]),
)
# Conversations are returned in descending order, so we can stop
# There's no filtering paramater just this default ordering
if record_last_received_datetime < starting_replication_key_value:
self.logger.info(
"Breaking after hitting a record with replication key %s < %s",
record_last_received_datetime,
starting_replication_key_value,
)
break
yield transformed_record


class ConversationMessagesStream(MessagebirdConversations):
"""Conversation Messages stream.
Messages stream doesn't pull all messages.
This stream pulls WhatsApp messages, while the messages stream
doesn't seem to pull them.
"""

name = "conversation_message"
Expand All @@ -84,20 +128,8 @@ class ConversationMessagesStream(MessagebirdConversations):
# Optionally, you may also use `schema_filepath` in place of `schema`:
schema_filepath = SCHEMAS_DIR / "conversation_message.json"
parent_stream_type = ConversationsStream

def get_url_params(
self,
context: dict | None,
next_page_token: Any | None,
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params = super().get_url_params(
context=context,
next_page_token=next_page_token,
)
if params.get("from") is None:
params["from"] = self.config["start_date"]
return params
# We don't need to track state per conversation
state_partitioning_keys: list[str] = []

def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
"""Return a generator of record-type dictionary objects.
Expand Down Expand Up @@ -156,7 +188,8 @@ def get_url_params(
next_page_token=next_page_token,
)
if params.get("from") is None:
params["from"] = self.config["start_date"]
from_date = pendulum.parse(self.config["start_date"]).to_iso8601_string()
params["from"] = from_date
return params


Expand Down

0 comments on commit 2c2fb52

Please sign in to comment.