Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Required wait update stage, update polling improvements, and other update changes #521

Merged
merged 5 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 163 additions & 74 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,8 +1155,9 @@ def id(self) -> str:

@property
def run_id(self) -> Optional[str]:
"""Run ID used for :py:meth:`signal` and :py:meth:`query` calls if
present to ensure the query or signal happen on this exact run.
"""Run ID used for :py:meth:`signal`, :py:meth:`query`, and
:py:meth:`update` calls if present to ensure the signal/query/update
happen on this exact run.

This is only created via :py:meth:`Client.get_workflow_handle`.
:py:meth:`Client.start_workflow` will not set this value.
Expand Down Expand Up @@ -1843,7 +1844,7 @@ async def execute_update(
update: Update function or name on the workflow.
arg: Single argument to the update.
args: Multiple arguments to the update. Cannot be set if arg is.
id: ID of the update. If not set, the server will set a UUID as the ID.
id: ID of the update. If not set, the default is a new UUID.
result_type: For string updates, this can set the specific result
type hint to deserialize into.
rpc_metadata: Headers used on the RPC call. Keys here override
Expand All @@ -1858,8 +1859,8 @@ async def execute_update(
update,
arg,
args=args,
wait_for_stage=WorkflowUpdateWaitStage.COMPLETED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to double-check we're sure WorkflowUpdateWaitStage is the right name for the enum here. The underlying API entity is called "Lifecycle Stage". Calling it "Wait Stage" bakes the concept of waiting into it -- is there a chance that that will be annoying in the future because we find ourselves wanting to refer to a lifecycle stage without any notion of "waiting" being involved?

Copy link
Member Author

@cretz cretz May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I have no strong opinion here and I now that I think about it, I like WorkflowUpdateLifecycleStage because, when we someday get a describe() call on WorkflowUpdateHandle, we'll want this stage to be there too.

I'll change this enum name to WorkflowUpdateLifecycleStage (but keep the parameter name as wait_for_stage) assuming there's no objection. Anyone else have an opinion on what to name this?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we should consider the future describe() call, which we can implement today, so calling the enum WorkflowUpdateLifecycleStage makes a lot more sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to WorkflowUpdateLifecycleStage

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems legit to me. Or just leave out Lifecycle, not sure it's carrying much weight here. Up to y'all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Lifecycle isn't needed -- WorkflowUpdateStage is sufficient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me, @Quinn-With-Two-Ns thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enum name updated

id=id,
wait_for_stage=temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
result_type=result_type,
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
Expand All @@ -1872,6 +1873,7 @@ async def start_update(
self,
update: temporalio.workflow.UpdateMethodMultiParam[[SelfType], LocalReturnType],
*,
wait_for_stage: WorkflowUpdateWaitStage,
id: Optional[str] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
Expand All @@ -1887,6 +1889,7 @@ async def start_update(
],
arg: ParamType,
*,
wait_for_stage: WorkflowUpdateWaitStage,
id: Optional[str] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
Expand All @@ -1902,6 +1905,7 @@ async def start_update(
],
*,
args: MultiParamSpec.args,
wait_for_stage: WorkflowUpdateWaitStage,
id: Optional[str] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
Expand All @@ -1915,6 +1919,7 @@ async def start_update(
update: str,
arg: Any = temporalio.common._arg_unset,
*,
wait_for_stage: WorkflowUpdateWaitStage,
args: Sequence[Any] = [],
id: Optional[str] = None,
result_type: Optional[Type] = None,
Expand All @@ -1928,6 +1933,7 @@ async def start_update(
update: Union[str, Callable],
arg: Any = temporalio.common._arg_unset,
*,
wait_for_stage: WorkflowUpdateWaitStage,
args: Sequence[Any] = [],
id: Optional[str] = None,
result_type: Optional[Type] = None,
Expand All @@ -1950,8 +1956,10 @@ async def start_update(
Args:
update: Update function or name on the workflow.
arg: Single argument to the update.
wait_for_stage: Required stage to wait until returning. ADMITTED is

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first sentence of this comment is self-referential and adds no value beyond the name of the field.
Generally, I'd like there to be more clues in the python SDK about what these wait stages mean, in particular flagging that the worker must be present. I'd suggest adding comments to the enum and/or linking out to docs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The headline

Send an update request to the workflow and return a handle to it
also just echoes what we already know from the name of the method and the return type.

I think there's an opportunity to headline its semantics (like a glance at what you'd find in the docs) and link out to overview docs.

Context: A natural thing I've done when not finding any good description in the code is to find docs online. But I might find python-specific docs which also don't provide a good overview of semantics. As a n00b I might not even be aware that there are overview docs, as I don't have the ontology of temporal or its docs site in my head.
Giving me the basics here and then linking out to the right docs page would save a lot of time and help me learn.

Copy link
Member Author

@cretz cretz May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first sentence of this comment is self-referential and adds no value beyond the name of the field.

This is often the case with required docs. When you are forced to document self-documenting things, the sentences become a bit redundant. But it's preferred in Python libraries over the opposite which is optional documentation.

Generally, I'd like there to be more clues in the python SDK about what these wait stages mean, in particular flagging that the worker must be present. I'd suggest adding comments to the enum and/or linking out to docs.

We try to avoid linking out to docs since those links are rarely stable (we do sometimes though). But we do encourage using the general docs which are meant to expound on the purpose of these options more than we can reasonably do in a non-stale way in every SDK. Want to take a stab at providing suggested documentation here?

Copy link

@drewhoskins-temporal drewhoskins-temporal May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think nearly as many things are self-documenting as engineers think are self-documenting. Pretend like you're newish to temporal workflows and reading the comments about update. What questions would you have? What gotchas would you like to be made aware of?

I think you go too fast for me to keep up with you and write comments, so maybe better for you to suggest and I'll take a look.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion is what's already in this PR, heh, because it's consistent with the rest of the SDK. This has been sitting for many days already so I'm guessing there's no rush. The request is a bit too subjective, so If you can provide a suggestion then we'll know it's correct instead of guess-and-check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per off-PR discussion, added link to https://docs.temporal.io/workflows#update

not currently supported.
args: Multiple arguments to the update. Cannot be set if arg is.
id: ID of the update. If not set, the server will set a UUID as the ID.
id: ID of the update. If not set, the default is a new UUID.
result_type: For string updates, this can set the specific result
type hint to deserialize into.
rpc_metadata: Headers used on the RPC call. Keys here override
Expand All @@ -1964,9 +1972,9 @@ async def start_update(
return await self._start_update(
update,
arg,
wait_for_stage=wait_for_stage,
args=args,
id=id,
wait_for_stage=temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
result_type=result_type,
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
Expand All @@ -1977,13 +1985,15 @@ async def _start_update(
update: Union[str, Callable],
arg: Any = temporalio.common._arg_unset,
*,
wait_for_stage: WorkflowUpdateWaitStage,
args: Sequence[Any] = [],
id: Optional[str] = None,
wait_for_stage: temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.ValueType = temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED,
result_type: Optional[Type] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> WorkflowUpdateHandle[Any]:
if wait_for_stage == WorkflowUpdateWaitStage.ADMITTED:
raise ValueError("ADMITTED wait stage not supported")
update_name: str
ret_type = result_type
if isinstance(update, temporalio.workflow.UpdateMethodMultiParam):
Expand Down Expand Up @@ -2011,6 +2021,68 @@ async def _start_update(
)
)

def get_update_handle(
self,
id: str,
*,
workflow_run_id: Optional[str] = None,
result_type: Optional[Type] = None,
) -> WorkflowUpdateHandle[Any]:
"""Get a handle for an update. The handle can be used to wait on the
update result.

Users may prefer the more typesafe :py:meth:`get_update_handle_for`
which accepts an update definition.

.. warning::
This API is experimental

Args:
id: Update ID to get a handle to.
workflow_run_id: Run ID to tie the handle to. If this is not set,
the :py:attr:`run_id` will be used.
result_type: The result type to deserialize into if known.

Returns:
The update handle.
"""
return WorkflowUpdateHandle(
self._client,
id,
self._id,
workflow_run_id=workflow_run_id or self._run_id,
result_type=result_type,
)

def get_update_handle_for(
self,
update: temporalio.workflow.UpdateMethodMultiParam[Any, LocalReturnType],
id: str,
*,
workflow_run_id: Optional[str] = None,
) -> WorkflowUpdateHandle[LocalReturnType]:
"""Get a typed handle for an update. The handle can be used to wait on
the update result.

This is the same as :py:meth:`get_update_handle` but typed.

.. warning::
This API is experimental

Args:
update: The update method to use for typing the handle.
id: Update ID to get a handle to.
workflow_run_id: Run ID to tie the handle to. If this is not set,
the :py:attr:`run_id` will be used.
result_type: The result type to deserialize into if known.

Returns:
The update handle.
"""
return self.get_update_handle(
id, workflow_run_id=workflow_run_id, result_type=update._defn.ret_type
)


@dataclass(frozen=True)
class AsyncActivityIDReference:
Expand Down Expand Up @@ -4235,15 +4307,38 @@ async def result(
WorkflowUpdateFailedError: If the update failed
RPCError: Update result could not be fetched for some other reason.
"""
if self._known_outcome is not None:
outcome = self._known_outcome
return await _update_outcome_to_result(
outcome,
self.id,
self._client.data_converter,
self._result_type,
# Poll until outcome reached
await self._poll_until_outcome(
rpc_metadata=rpc_metadata, rpc_timeout=rpc_timeout
)

# Convert outcome to failure or value
assert self._known_outcome
if self._known_outcome.HasField("failure"):
raise WorkflowUpdateFailedError(
await self._client.data_converter.decode_failure(
self._known_outcome.failure
),
)
if not self._known_outcome.success.payloads:
return None # type: ignore
type_hints = [self._result_type] if self._result_type else None
results = await self._client.data_converter.decode(
self._known_outcome.success.payloads, type_hints
)
if not results:
return None # type: ignore
elif len(results) > 1:
warnings.warn(f"Expected single update result, got {len(results)}")
return results[0]

async def _poll_until_outcome(
self,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
if self._known_outcome:
return
req = temporalio.api.workflowservice.v1.PollWorkflowExecutionUpdateRequest(
namespace=self._client.namespace,
update_ref=temporalio.api.update.v1.UpdateRef(
Expand All @@ -4259,27 +4354,33 @@ async def result(
),
)

# Continue polling as long as we have either an empty response, or an *rpc* timeout
# Continue polling as long as we have no outcome
while True:
try:
res = (
await self._client.workflow_service.poll_workflow_execution_update(
req,
retry=True,
metadata=rpc_metadata,
timeout=rpc_timeout,
)
)
if res.HasField("outcome"):
return await _update_outcome_to_result(
res.outcome,
self.id,
self._client.data_converter,
self._result_type,
)
except RPCError as err:
if err.status != RPCStatusCode.DEADLINE_EXCEEDED:
raise
res = await self._client.workflow_service.poll_workflow_execution_update(
req,
retry=True,
metadata=rpc_metadata,
timeout=rpc_timeout,
)
if res.HasField("outcome"):
self._known_outcome = res.outcome
return


class WorkflowUpdateWaitStage(IntEnum):
"""Stage to wait for workflow update to reach before returning from

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another comment that mostly restates the name of the enum.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is commonly the case with all forced-documentation docs, which includes most of the Python docs. We can't rely on self-documenting names. Similar thing exists in Go. If there is a general different posture we want to take with the Python API docs, I wonder if we can do that generally. I am totally supportive of such a general posture change, just not necessarily for this one enum in this one PR.

``start_update``.
"""

ADMITTED = int(
temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED
)
ACCEPTED = int(
temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
)
COMPLETED = int(
temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
)


class WorkflowFailureError(temporalio.exceptions.TemporalError):
Expand Down Expand Up @@ -4508,9 +4609,7 @@ class StartWorkflowUpdateInput:
update_id: Optional[str]
update: str
args: Sequence[Any]
wait_for_stage: Optional[
temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.ValueType
]
wait_for_stage: WorkflowUpdateWaitStage
headers: Mapping[str, temporalio.api.common.v1.Payload]
ret_type: Optional[Type]
rpc_metadata: Mapping[str, str]
Expand Down Expand Up @@ -5125,11 +5224,7 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
async def start_workflow_update(
self, input: StartWorkflowUpdateInput
) -> WorkflowUpdateHandle[Any]:
wait_policy = (
temporalio.api.update.v1.WaitPolicy(lifecycle_stage=input.wait_for_stage)
if input.wait_for_stage is not None
else None
)
# Build request
req = temporalio.api.workflowservice.v1.UpdateWorkflowExecutionRequest(
namespace=self._client.namespace,
workflow_execution=temporalio.api.common.v1.WorkflowExecution(
Expand All @@ -5138,14 +5233,18 @@ async def start_workflow_update(
),
request=temporalio.api.update.v1.Request(
meta=temporalio.api.update.v1.Meta(
update_id=input.update_id or "",
update_id=input.update_id or str(uuid.uuid4()),
identity=self._client.identity,
),
input=temporalio.api.update.v1.Input(
name=input.update,
),
),
wait_policy=wait_policy,
wait_policy=temporalio.api.update.v1.WaitPolicy(
lifecycle_stage=temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.ValueType(
input.wait_for_stage
)
),
)
if input.args:
req.request.input.args.payloads.extend(
Expand All @@ -5155,25 +5254,36 @@ async def start_workflow_update(
temporalio.common._apply_headers(
input.headers, req.request.input.header.fields
)
try:

# Repeatedly try to invoke start until the update reaches user-provided
# wait stage or is at least ACCEPTED (as of the time of this writing,
# the user cannot specify sooner than ACCEPTED)
resp: temporalio.api.workflowservice.v1.UpdateWorkflowExecutionResponse
while True:
resp = await self._client.workflow_service.update_workflow_execution(
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
)
except RPCError as err:
raise
if (
resp.stage >= req.wait_policy.lifecycle_stage
or resp.stage
>= temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
):
break

determined_id = resp.update_ref.update_id
update_handle: WorkflowUpdateHandle[Any] = WorkflowUpdateHandle(
# Build the handle. If the user's wait stage is COMPLETED, make sure we
# poll for result.
handle: WorkflowUpdateHandle[Any] = WorkflowUpdateHandle(
client=self._client,
id=determined_id,
id=req.request.meta.update_id,
workflow_id=input.id,
workflow_run_id=input.run_id,
result_type=input.ret_type,
)
if resp.HasField("outcome"):
update_handle._known_outcome = resp.outcome

return update_handle
handle._known_outcome = resp.outcome
if input.wait_for_stage == WorkflowUpdateWaitStage.COMPLETED:
await handle._poll_until_outcome()
return handle

### Async activity calls

Expand Down Expand Up @@ -5700,27 +5810,6 @@ def _fix_history_enum(prefix: str, parent: Dict[str, Any], *attrs: str) -> None:
_fix_history_enum(prefix, child_item, *attrs[1:])


async def _update_outcome_to_result(
outcome: temporalio.api.update.v1.Outcome,
id: str,
converter: temporalio.converter.DataConverter,
rtype: Optional[Type],
) -> Any:
if outcome.HasField("failure"):
raise WorkflowUpdateFailedError(
await converter.decode_failure(outcome.failure),
)
if not outcome.success.payloads:
return None
type_hints = [rtype] if rtype else None
results = await converter.decode(outcome.success.payloads, type_hints)
if not results:
return None
elif len(results) > 1:
warnings.warn(f"Expected single update result, got {len(results)}")
return results[0]


@dataclass(frozen=True)
class WorkerBuildIdVersionSets:
"""Represents the sets of compatible Build ID versions associated with some Task Queue, as
Expand Down
Loading
Loading