Skip to content

Commit

Permalink
Fix custom_converter and infrequent_polling samples (#95)
Browse files Browse the repository at this point in the history
Fixes #93
Fixes #90
  • Loading branch information
cretz authored Nov 14, 2023
1 parent ba5a87f commit 68ba233
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 88 deletions.
66 changes: 66 additions & 0 deletions custom_converter/shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import dataclasses
from typing import Any, Optional, Type

import temporalio.converter
from temporalio.api.common.v1 import Payload
from temporalio.converter import (
CompositePayloadConverter,
DefaultPayloadConverter,
EncodingPayloadConverter,
)


class GreetingInput:
def __init__(self, name: str) -> None:
self.name = name


class GreetingOutput:
def __init__(self, result: str) -> None:
self.result = result


class GreetingEncodingPayloadConverter(EncodingPayloadConverter):
@property
def encoding(self) -> str:
return "text/my-greeting-encoding"

def to_payload(self, value: Any) -> Optional[Payload]:
if isinstance(value, GreetingInput):
return Payload(
metadata={"encoding": self.encoding.encode(), "is_input": b"true"},
data=value.name.encode(),
)
elif isinstance(value, GreetingOutput):
return Payload(
metadata={"encoding": self.encoding.encode()},
data=value.result.encode(),
)
else:
return None

def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
if payload.metadata.get("is_input") == b"true":
# Confirm proper type hint if present
assert not type_hint or type_hint is GreetingInput
return GreetingInput(payload.data.decode())
else:
assert not type_hint or type_hint is GreetingOutput
return GreetingOutput(payload.data.decode())


class GreetingPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Just add ours as first before the defaults
super().__init__(
GreetingEncodingPayloadConverter(),
# TODO(cretz): Make this list available without instantiation - https://github.com/temporalio/sdk-python/issues/139
*DefaultPayloadConverter().converters.values(),
)


# Use the default data converter, but change the payload converter.
greeting_data_converter = dataclasses.replace(
temporalio.converter.default(),
payload_converter_class=GreetingPayloadConverter,
)
14 changes: 4 additions & 10 deletions custom_converter/starter.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
import asyncio
import dataclasses

import temporalio.converter
from temporalio.client import Client

from custom_converter.worker import (
from custom_converter.shared import (
GreetingInput,
GreetingOutput,
GreetingPayloadConverter,
GreetingWorkflow,
greeting_data_converter,
)
from custom_converter.workflow import GreetingWorkflow


async def main():
# Connect client
client = await Client.connect(
"localhost:7233",
# Use the default data converter, but change the payload converter.
# Without this we get:
# TypeError: Object of type GreetingInput is not JSON serializable
data_converter=dataclasses.replace(
temporalio.converter.default(),
payload_converter_class=GreetingPayloadConverter,
),
data_converter=greeting_data_converter,
)

# Run workflow
Expand Down
74 changes: 3 additions & 71 deletions custom_converter/worker.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,10 @@
import asyncio
import dataclasses
from typing import Any, Optional, Type

import temporalio.converter
from temporalio import workflow
from temporalio.api.common.v1 import Payload
from temporalio.client import Client
from temporalio.converter import (
CompositePayloadConverter,
DefaultPayloadConverter,
EncodingPayloadConverter,
)
from temporalio.worker import Worker


class GreetingInput:
def __init__(self, name: str) -> None:
self.name = name


class GreetingOutput:
def __init__(self, result: str) -> None:
self.result = result


@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, input: GreetingInput) -> GreetingOutput:
return GreetingOutput(f"Hello, {input.name}")


class GreetingEncodingPayloadConverter(EncodingPayloadConverter):
@property
def encoding(self) -> str:
return "text/my-greeting-encoding"

def to_payload(self, value: Any) -> Optional[Payload]:
if isinstance(value, GreetingInput):
return Payload(
metadata={"encoding": self.encoding.encode(), "is_input": b"true"},
data=value.name.encode(),
)
elif isinstance(value, GreetingOutput):
return Payload(
metadata={"encoding": self.encoding.encode()},
data=value.result.encode(),
)
else:
return None

def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
if payload.metadata.get("is_input") == b"true":
# Confirm proper type hint if present
assert not type_hint or type_hint is GreetingInput
return GreetingInput(payload.data.decode())
else:
assert not type_hint or type_hint is GreetingOutput
return GreetingOutput(payload.data.decode())


class GreetingPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Just add ours as first before the defaults
super().__init__(
GreetingEncodingPayloadConverter(),
# TODO(cretz): Make this list available without instantiation - https://github.com/temporalio/sdk-python/issues/139
*DefaultPayloadConverter().converters.values(),
)

from custom_converter.shared import greeting_data_converter
from custom_converter.workflow import GreetingWorkflow

interrupt_event = asyncio.Event()

Expand All @@ -77,13 +13,9 @@ async def main():
# Connect client
client = await Client.connect(
"localhost:7233",
# Use the default data converter, but change the payload converter.
# Without this, when trying to run a workflow, we get:
# KeyError: 'Unknown payload encoding my-greeting-encoding
data_converter=dataclasses.replace(
temporalio.converter.default(),
payload_converter_class=GreetingPayloadConverter,
),
data_converter=greeting_data_converter,
)

# Run a worker for the workflow
Expand Down
11 changes: 11 additions & 0 deletions custom_converter/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from custom_converter.shared import GreetingInput, GreetingOutput


@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, input: GreetingInput) -> GreetingOutput:
return GreetingOutput(f"Hello, {input.name}")
10 changes: 3 additions & 7 deletions polling/infrequent/activities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from dataclasses import dataclass

from temporalio import activity
Expand All @@ -15,9 +14,6 @@ class ComposeGreetingInput:
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
test_service = TestService()
while True:
try:
result = test_service.get_service_result(input)
return result
except Exception:
activity.heartbeat("Invoking activity")
# If this raises an exception because it's not done yet, the activity will
# continually be scheduled for retry
return await test_service.get_service_result(input)
Empty file.
28 changes: 28 additions & 0 deletions tests/custom_converter/workflow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import uuid

from temporalio.client import Client
from temporalio.worker import Worker

from custom_converter.shared import (
GreetingInput,
GreetingOutput,
greeting_data_converter,
)
from custom_converter.workflow import GreetingWorkflow


async def test_workflow_with_custom_converter(client: Client):
# Replace data converter in client
new_config = client.config()
new_config["data_converter"] = greeting_data_converter
client = Client(**new_config)
task_queue = f"tq-{uuid.uuid4()}"
async with Worker(client, task_queue=task_queue, workflows=[GreetingWorkflow]):
result = await client.execute_workflow(
GreetingWorkflow.run,
GreetingInput("Temporal"),
id=f"wf-{uuid.uuid4()}",
task_queue=task_queue,
)
assert isinstance(result, GreetingOutput)
assert result.result == "Hello, Temporal"

0 comments on commit 68ba233

Please sign in to comment.