From 2c2fb5218c468c6f9a068223d9f8f6bf8edb92d3 Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Wed, 3 May 2023 11:41:18 -0400 Subject: [PATCH] feat: Working state for `conversations` stream (#45) * Conversation state working * [pre-commit.ci] auto fixes * Fixing accidental removal/add I think this came from precommit but I'm not sure * Capabilities update * Apply suggestions from code review Log wording update, and efficiency increase Co-authored-by: Edgar R. M. --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Edgar R. M. --- README.md | 1 + meltano.yml | 3 + pyproject.toml | 2 +- scripts/generate_20_conversations.py | 2 +- tap_messagebird/schemas/conversation.json | 21 ++++--- tap_messagebird/streams.py | 69 +++++++++++++++++------ 6 files changed, 71 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 752a26c..67a8107 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com). ## Capabilities * `catalog` +# `state` * `discover` * `about` * `stream-maps` diff --git a/meltano.yml b/meltano.yml index f297c99..2841deb 100644 --- a/meltano.yml +++ b/meltano.yml @@ -14,10 +14,13 @@ plugins: - discover - about - stream-maps + - state settings: - name: api_key kind: password - name: start_date + config: + start_date: "2023-04-30" loaders: - name: target-jsonl variant: andyh1203 diff --git a/pyproject.toml b/pyproject.toml index e0f86b7..77c4e52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ vcs = "git" style = "semver" [tool.ruff] -ignore = ["ANN101", "PLR2004", "N818"] +ignore = ["ANN101", "PLR2004", "N818", "G004", "EM101"] select = ["ALL"] target-version = "py37" diff --git a/scripts/generate_20_conversations.py b/scripts/generate_20_conversations.py index 27ed64b..df2c092 100644 --- a/scripts/generate_20_conversations.py +++ b/scripts/generate_20_conversations.py @@ -5,7 +5,7 @@ import requests if not (dotenv.load_dotenv()): - raise Exception("Need dotenv to load auth key") # noqa: EM101, TRY002, TRY003 + raise Exception("Need dotenv to load auth key") # noqa: TRY002, TRY003 def start_conversation_then_archive_it( diff --git a/tap_messagebird/schemas/conversation.json b/tap_messagebird/schemas/conversation.json index 8321ac0..59e828d 100644 --- a/tap_messagebird/schemas/conversation.json +++ b/tap_messagebird/schemas/conversation.json @@ -43,13 +43,15 @@ } }, "createdDatetime": { - "type": "string" + "type": "string", + "format": "date-time" }, "updatedDatetime": { "type": [ "null", "string" - ] + ], + "format": "date-time" } } }, @@ -71,10 +73,12 @@ "type": "string" }, "createdDatetime": { - "type": "string" + "type": "string", + "format": "date-time" }, "updatedDatetime": { - "type": "string" + "type": "string", + "format": "date-time" } } } @@ -83,13 +87,16 @@ "type": "string" }, "createdDatetime": { - "type": "string" + "type": "string", + "format": "date-time" }, "updatedDatetime": { - "type": "string" + "type": "string", + "format": "date-time" }, "lastReceivedDatetime": { - "type": "string" + "type": "string", + "format": "date-time" }, "lastUsedChannelId": { "type": "string" diff --git a/tap_messagebird/streams.py b/tap_messagebird/streams.py index 340494a..2746021 100644 --- a/tap_messagebird/streams.py +++ b/tap_messagebird/streams.py @@ -3,7 +3,9 @@ from __future__ import annotations from pathlib import Path -from typing import TYPE_CHECKING, Any, Iterable +from typing import TYPE_CHECKING, Any, Iterable, cast + +import pendulum from tap_messagebird.client import MessagebirdOffsetPaginator, MessagebirdStream @@ -53,7 +55,7 @@ class ConversationsStream(MessagebirdConversations): name = "conversation" path = "/conversations" primary_keys = ["id"] - replication_key = None + replication_key = "lastReceivedDatetime" # Optionally, you may also use `schema_filepath` in place of `schema`: schema_filepath = SCHEMAS_DIR / "conversation.json" @@ -70,11 +72,53 @@ def get_child_context( "_sdc_conversations_id": record["id"], } + def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]: + """Return a generator of record-type dictionary objects. + + Each record emitted should be a dictionary of property names to their values. + + Args: + context: Stream partition or context dictionary. + + Yields: + One item per (possibly processed) record in the API. + """ + starting_replication_key_value = self.get_starting_timestamp(context) + if starting_replication_key_value is None: + # Shouldn't be possible, but mypy complains + raise ValueError( # noqa: TRY003 + "No starting replication key value found.", + ) + starting_replication_key_value = pendulum.instance( + starting_replication_key_value, + ) + + for record in self.request_records(context): + transformed_record = self.post_process(record, context) + if transformed_record is None: + # Record filtered out during post_process() + continue + record_last_received_datetime: pendulum.DateTime = cast( + pendulum.DateTime, + pendulum.parse(record[self.replication_key]), + ) + # Conversations are returned in descending order, so we can stop + # There's no filtering paramater just this default ordering + if record_last_received_datetime < starting_replication_key_value: + self.logger.info( + "Breaking after hitting a record with replication key %s < %s", + record_last_received_datetime, + starting_replication_key_value, + ) + break + yield transformed_record + class ConversationMessagesStream(MessagebirdConversations): """Conversation Messages stream. - Messages stream doesn't pull all messages. + This stream pulls WhatsApp messages, while the messages stream + doesn't seem to pull them. """ name = "conversation_message" @@ -84,20 +128,8 @@ class ConversationMessagesStream(MessagebirdConversations): # Optionally, you may also use `schema_filepath` in place of `schema`: schema_filepath = SCHEMAS_DIR / "conversation_message.json" parent_stream_type = ConversationsStream - - def get_url_params( - self, - context: dict | None, - next_page_token: Any | None, - ) -> dict[str, Any]: - """Return a dictionary of values to be used in URL parameterization.""" - params = super().get_url_params( - context=context, - next_page_token=next_page_token, - ) - if params.get("from") is None: - params["from"] = self.config["start_date"] - return params + # We don't need to track state per conversation + state_partitioning_keys: list[str] = [] def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]: """Return a generator of record-type dictionary objects. @@ -156,7 +188,8 @@ def get_url_params( next_page_token=next_page_token, ) if params.get("from") is None: - params["from"] = self.config["start_date"] + from_date = pendulum.parse(self.config["start_date"]).to_iso8601_string() + params["from"] = from_date return params