-
Notifications
You must be signed in to change notification settings - Fork 81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Required wait update stage, update polling improvements, and other update changes #521
Changes from 2 commits
e6efd5c
b19370e
1a88641
1a2acd5
1b2d411
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1155,8 +1155,9 @@ def id(self) -> str: | |
|
||
@property | ||
def run_id(self) -> Optional[str]: | ||
"""Run ID used for :py:meth:`signal` and :py:meth:`query` calls if | ||
present to ensure the query or signal happen on this exact run. | ||
"""Run ID used for :py:meth:`signal`, :py:meth:`query`, and | ||
:py:meth:`update` calls if present to ensure the signal/query/update | ||
happen on this exact run. | ||
|
||
This is only created via :py:meth:`Client.get_workflow_handle`. | ||
:py:meth:`Client.start_workflow` will not set this value. | ||
|
@@ -1843,7 +1844,7 @@ async def execute_update( | |
update: Update function or name on the workflow. | ||
arg: Single argument to the update. | ||
args: Multiple arguments to the update. Cannot be set if arg is. | ||
id: ID of the update. If not set, the server will set a UUID as the ID. | ||
id: ID of the update. If not set, the default is a new UUID. | ||
result_type: For string updates, this can set the specific result | ||
type hint to deserialize into. | ||
rpc_metadata: Headers used on the RPC call. Keys here override | ||
|
@@ -1858,8 +1859,8 @@ async def execute_update( | |
update, | ||
arg, | ||
args=args, | ||
wait_for_stage=WorkflowUpdateWaitStage.COMPLETED, | ||
id=id, | ||
wait_for_stage=temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED, | ||
result_type=result_type, | ||
rpc_metadata=rpc_metadata, | ||
rpc_timeout=rpc_timeout, | ||
|
@@ -1872,6 +1873,7 @@ async def start_update( | |
self, | ||
update: temporalio.workflow.UpdateMethodMultiParam[[SelfType], LocalReturnType], | ||
*, | ||
wait_for_stage: WorkflowUpdateWaitStage, | ||
id: Optional[str] = None, | ||
rpc_metadata: Mapping[str, str] = {}, | ||
rpc_timeout: Optional[timedelta] = None, | ||
|
@@ -1887,6 +1889,7 @@ async def start_update( | |
], | ||
arg: ParamType, | ||
*, | ||
wait_for_stage: WorkflowUpdateWaitStage, | ||
id: Optional[str] = None, | ||
rpc_metadata: Mapping[str, str] = {}, | ||
rpc_timeout: Optional[timedelta] = None, | ||
|
@@ -1902,6 +1905,7 @@ async def start_update( | |
], | ||
*, | ||
args: MultiParamSpec.args, | ||
wait_for_stage: WorkflowUpdateWaitStage, | ||
id: Optional[str] = None, | ||
rpc_metadata: Mapping[str, str] = {}, | ||
rpc_timeout: Optional[timedelta] = None, | ||
|
@@ -1915,6 +1919,7 @@ async def start_update( | |
update: str, | ||
arg: Any = temporalio.common._arg_unset, | ||
*, | ||
wait_for_stage: WorkflowUpdateWaitStage, | ||
args: Sequence[Any] = [], | ||
id: Optional[str] = None, | ||
result_type: Optional[Type] = None, | ||
|
@@ -1928,6 +1933,7 @@ async def start_update( | |
update: Union[str, Callable], | ||
arg: Any = temporalio.common._arg_unset, | ||
*, | ||
wait_for_stage: WorkflowUpdateWaitStage, | ||
args: Sequence[Any] = [], | ||
id: Optional[str] = None, | ||
result_type: Optional[Type] = None, | ||
|
@@ -1950,8 +1956,10 @@ async def start_update( | |
Args: | ||
update: Update function or name on the workflow. | ||
arg: Single argument to the update. | ||
wait_for_stage: Required stage to wait until returning. ADMITTED is | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first sentence of this comment is self-referential and adds no value beyond the name of the field. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The headline
I think there's an opportunity to headline its semantics (like a glance at what you'd find in the docs) and link out to overview docs. Context: A natural thing I've done when not finding any good description in the code is to find docs online. But I might find python-specific docs which also don't provide a good overview of semantics. As a n00b I might not even be aware that there are overview docs, as I don't have the ontology of temporal or its docs site in my head. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is often the case with required docs. When you are forced to document self-documenting things, the sentences become a bit redundant. But it's preferred in Python libraries over the opposite which is optional documentation.
We try to avoid linking out to docs since those links are rarely stable (we do sometimes though). But we do encourage using the general docs which are meant to expound on the purpose of these options more than we can reasonably do in a non-stale way in every SDK. Want to take a stab at providing suggested documentation here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think nearly as many things are self-documenting as engineers think are self-documenting. Pretend like you're newish to temporal workflows and reading the comments about update. What questions would you have? What gotchas would you like to be made aware of? I think you go too fast for me to keep up with you and write comments, so maybe better for you to suggest and I'll take a look. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My suggestion is what's already in this PR, heh, because it's consistent with the rest of the SDK. This has been sitting for many days already so I'm guessing there's no rush. The request is a bit too subjective, so If you can provide a suggestion then we'll know it's correct instead of guess-and-check. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per off-PR discussion, added link to https://docs.temporal.io/workflows#update |
||
not currently supported. | ||
args: Multiple arguments to the update. Cannot be set if arg is. | ||
id: ID of the update. If not set, the server will set a UUID as the ID. | ||
id: ID of the update. If not set, the default is a new UUID. | ||
result_type: For string updates, this can set the specific result | ||
type hint to deserialize into. | ||
rpc_metadata: Headers used on the RPC call. Keys here override | ||
|
@@ -1964,9 +1972,9 @@ async def start_update( | |
return await self._start_update( | ||
update, | ||
arg, | ||
wait_for_stage=wait_for_stage, | ||
args=args, | ||
id=id, | ||
wait_for_stage=temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED, | ||
result_type=result_type, | ||
rpc_metadata=rpc_metadata, | ||
rpc_timeout=rpc_timeout, | ||
|
@@ -1977,13 +1985,15 @@ async def _start_update( | |
update: Union[str, Callable], | ||
arg: Any = temporalio.common._arg_unset, | ||
*, | ||
wait_for_stage: WorkflowUpdateWaitStage, | ||
args: Sequence[Any] = [], | ||
id: Optional[str] = None, | ||
wait_for_stage: temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.ValueType = temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED, | ||
result_type: Optional[Type] = None, | ||
rpc_metadata: Mapping[str, str] = {}, | ||
rpc_timeout: Optional[timedelta] = None, | ||
) -> WorkflowUpdateHandle[Any]: | ||
if wait_for_stage == WorkflowUpdateWaitStage.ADMITTED: | ||
raise ValueError("ADMITTED wait stage not supported") | ||
update_name: str | ||
ret_type = result_type | ||
if isinstance(update, temporalio.workflow.UpdateMethodMultiParam): | ||
|
@@ -2011,6 +2021,68 @@ async def _start_update( | |
) | ||
) | ||
|
||
def get_update_handle( | ||
self, | ||
id: str, | ||
*, | ||
workflow_run_id: Optional[str] = None, | ||
result_type: Optional[Type] = None, | ||
) -> WorkflowUpdateHandle[Any]: | ||
"""Get a handle for an update. The handle can be used to wait on the | ||
update result. | ||
|
||
Users may prefer the more typesafe :py:meth:`get_update_handle_for` | ||
which accepts an update definition. | ||
|
||
.. warning:: | ||
This API is experimental | ||
|
||
Args: | ||
id: Update ID to get a handle to. | ||
workflow_run_id: Run ID to tie the handle to. If this is not set, | ||
the :py:attr:`run_id` will be used. | ||
result_type: The result type to deserialize into if known. | ||
|
||
Returns: | ||
The update handle. | ||
""" | ||
return WorkflowUpdateHandle( | ||
self._client, | ||
id, | ||
self._id, | ||
workflow_run_id=workflow_run_id or self._run_id, | ||
result_type=result_type, | ||
) | ||
|
||
def get_update_handle_for( | ||
self, | ||
update: temporalio.workflow.UpdateMethodMultiParam[Any, LocalReturnType], | ||
id: str, | ||
*, | ||
workflow_run_id: Optional[str] = None, | ||
) -> WorkflowUpdateHandle[LocalReturnType]: | ||
"""Get a typed handle for an update. The handle can be used to wait on | ||
the update result. | ||
|
||
This is the same as :py:meth:`get_update_handle` but typed. | ||
|
||
.. warning:: | ||
This API is experimental | ||
|
||
Args: | ||
update: The update method to use for typing the handle. | ||
id: Update ID to get a handle to. | ||
workflow_run_id: Run ID to tie the handle to. If this is not set, | ||
the :py:attr:`run_id` will be used. | ||
result_type: The result type to deserialize into if known. | ||
|
||
Returns: | ||
The update handle. | ||
""" | ||
return self.get_update_handle( | ||
id, workflow_run_id=workflow_run_id, result_type=update._defn.ret_type | ||
) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class AsyncActivityIDReference: | ||
|
@@ -4235,15 +4307,38 @@ async def result( | |
WorkflowUpdateFailedError: If the update failed | ||
RPCError: Update result could not be fetched for some other reason. | ||
""" | ||
if self._known_outcome is not None: | ||
outcome = self._known_outcome | ||
return await _update_outcome_to_result( | ||
outcome, | ||
self.id, | ||
self._client.data_converter, | ||
self._result_type, | ||
# Poll until outcome reached | ||
await self._poll_until_outcome( | ||
rpc_metadata=rpc_metadata, rpc_timeout=rpc_timeout | ||
) | ||
|
||
# Convert outcome to failure or value | ||
assert self._known_outcome | ||
if self._known_outcome.HasField("failure"): | ||
raise WorkflowUpdateFailedError( | ||
await self._client.data_converter.decode_failure( | ||
self._known_outcome.failure | ||
), | ||
) | ||
if not self._known_outcome.success.payloads: | ||
return None # type: ignore | ||
type_hints = [self._result_type] if self._result_type else None | ||
results = await self._client.data_converter.decode( | ||
self._known_outcome.success.payloads, type_hints | ||
) | ||
if not results: | ||
return None # type: ignore | ||
elif len(results) > 1: | ||
warnings.warn(f"Expected single update result, got {len(results)}") | ||
return results[0] | ||
|
||
async def _poll_until_outcome( | ||
self, | ||
rpc_metadata: Mapping[str, str] = {}, | ||
rpc_timeout: Optional[timedelta] = None, | ||
) -> None: | ||
if self._known_outcome: | ||
return | ||
req = temporalio.api.workflowservice.v1.PollWorkflowExecutionUpdateRequest( | ||
namespace=self._client.namespace, | ||
update_ref=temporalio.api.update.v1.UpdateRef( | ||
|
@@ -4259,27 +4354,33 @@ async def result( | |
), | ||
) | ||
|
||
# Continue polling as long as we have either an empty response, or an *rpc* timeout | ||
# Continue polling as long as we have no outcome | ||
while True: | ||
try: | ||
res = ( | ||
await self._client.workflow_service.poll_workflow_execution_update( | ||
req, | ||
retry=True, | ||
metadata=rpc_metadata, | ||
timeout=rpc_timeout, | ||
) | ||
) | ||
if res.HasField("outcome"): | ||
return await _update_outcome_to_result( | ||
res.outcome, | ||
self.id, | ||
self._client.data_converter, | ||
self._result_type, | ||
) | ||
except RPCError as err: | ||
if err.status != RPCStatusCode.DEADLINE_EXCEEDED: | ||
raise | ||
res = await self._client.workflow_service.poll_workflow_execution_update( | ||
req, | ||
retry=True, | ||
metadata=rpc_metadata, | ||
timeout=rpc_timeout, | ||
) | ||
if res.HasField("outcome"): | ||
self._known_outcome = res.outcome | ||
return | ||
|
||
|
||
class WorkflowUpdateWaitStage(IntEnum): | ||
"""Stage to wait for workflow update to reach before returning from | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another comment that mostly restates the name of the enum. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is commonly the case with all forced-documentation docs, which includes most of the Python docs. We can't rely on self-documenting names. Similar thing exists in Go. If there is a general different posture we want to take with the Python API docs, I wonder if we can do that generally. I am totally supportive of such a general posture change, just not necessarily for this one enum in this one PR. |
||
``start_update``. | ||
""" | ||
|
||
ADMITTED = int( | ||
temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED | ||
) | ||
ACCEPTED = int( | ||
temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED | ||
) | ||
COMPLETED = int( | ||
temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED | ||
) | ||
|
||
|
||
class WorkflowFailureError(temporalio.exceptions.TemporalError): | ||
|
@@ -4508,9 +4609,7 @@ class StartWorkflowUpdateInput: | |
update_id: Optional[str] | ||
update: str | ||
args: Sequence[Any] | ||
wait_for_stage: Optional[ | ||
temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.ValueType | ||
] | ||
wait_for_stage: WorkflowUpdateWaitStage | ||
headers: Mapping[str, temporalio.api.common.v1.Payload] | ||
ret_type: Optional[Type] | ||
rpc_metadata: Mapping[str, str] | ||
|
@@ -5125,11 +5224,7 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None: | |
async def start_workflow_update( | ||
self, input: StartWorkflowUpdateInput | ||
) -> WorkflowUpdateHandle[Any]: | ||
wait_policy = ( | ||
temporalio.api.update.v1.WaitPolicy(lifecycle_stage=input.wait_for_stage) | ||
if input.wait_for_stage is not None | ||
else None | ||
) | ||
# Build request | ||
req = temporalio.api.workflowservice.v1.UpdateWorkflowExecutionRequest( | ||
namespace=self._client.namespace, | ||
workflow_execution=temporalio.api.common.v1.WorkflowExecution( | ||
|
@@ -5138,14 +5233,18 @@ async def start_workflow_update( | |
), | ||
request=temporalio.api.update.v1.Request( | ||
meta=temporalio.api.update.v1.Meta( | ||
update_id=input.update_id or "", | ||
update_id=input.update_id or str(uuid.uuid4()), | ||
identity=self._client.identity, | ||
), | ||
input=temporalio.api.update.v1.Input( | ||
name=input.update, | ||
), | ||
), | ||
wait_policy=wait_policy, | ||
wait_policy=temporalio.api.update.v1.WaitPolicy( | ||
lifecycle_stage=temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.ValueType( | ||
input.wait_for_stage | ||
) | ||
), | ||
) | ||
if input.args: | ||
req.request.input.args.payloads.extend( | ||
|
@@ -5155,25 +5254,36 @@ async def start_workflow_update( | |
temporalio.common._apply_headers( | ||
input.headers, req.request.input.header.fields | ||
) | ||
try: | ||
|
||
# Repeatedly try to invoke start until the update reaches user-provided | ||
# wait stage or is at least ACCEPTED (as of the time of this writing, | ||
# the user cannot specify sooner than ACCEPTED) | ||
resp: temporalio.api.workflowservice.v1.UpdateWorkflowExecutionResponse | ||
while True: | ||
resp = await self._client.workflow_service.update_workflow_execution( | ||
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout | ||
) | ||
except RPCError as err: | ||
raise | ||
if ( | ||
resp.stage >= req.wait_policy.lifecycle_stage | ||
or resp.stage | ||
>= temporalio.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED | ||
): | ||
break | ||
|
||
determined_id = resp.update_ref.update_id | ||
update_handle: WorkflowUpdateHandle[Any] = WorkflowUpdateHandle( | ||
# Build the handle. If the user's wait stage is COMPLETED, make sure we | ||
# poll for result. | ||
handle: WorkflowUpdateHandle[Any] = WorkflowUpdateHandle( | ||
client=self._client, | ||
id=determined_id, | ||
id=req.request.meta.update_id, | ||
workflow_id=input.id, | ||
workflow_run_id=input.run_id, | ||
result_type=input.ret_type, | ||
) | ||
if resp.HasField("outcome"): | ||
update_handle._known_outcome = resp.outcome | ||
|
||
return update_handle | ||
handle._known_outcome = resp.outcome | ||
if input.wait_for_stage == WorkflowUpdateWaitStage.COMPLETED: | ||
await handle._poll_until_outcome() | ||
return handle | ||
|
||
### Async activity calls | ||
|
||
|
@@ -5700,27 +5810,6 @@ def _fix_history_enum(prefix: str, parent: Dict[str, Any], *attrs: str) -> None: | |
_fix_history_enum(prefix, child_item, *attrs[1:]) | ||
|
||
|
||
async def _update_outcome_to_result( | ||
outcome: temporalio.api.update.v1.Outcome, | ||
id: str, | ||
converter: temporalio.converter.DataConverter, | ||
rtype: Optional[Type], | ||
) -> Any: | ||
if outcome.HasField("failure"): | ||
raise WorkflowUpdateFailedError( | ||
await converter.decode_failure(outcome.failure), | ||
) | ||
if not outcome.success.payloads: | ||
return None | ||
type_hints = [rtype] if rtype else None | ||
results = await converter.decode(outcome.success.payloads, type_hints) | ||
if not results: | ||
return None | ||
elif len(results) > 1: | ||
warnings.warn(f"Expected single update result, got {len(results)}") | ||
return results[0] | ||
|
||
|
||
@dataclass(frozen=True) | ||
class WorkerBuildIdVersionSets: | ||
"""Represents the sets of compatible Build ID versions associated with some Task Queue, as | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to double-check we're sure
WorkflowUpdateWaitStage
is the right name for the enum here. The underlying API entity is called "Lifecycle Stage". Calling it "Wait Stage" bakes the concept of waiting into it -- is there a chance that that will be annoying in the future because we find ourselves wanting to refer to a lifecycle stage without any notion of "waiting" being involved?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I have no strong opinion here and I now that I think about it, I like
WorkflowUpdateLifecycleStage
because, when we someday get adescribe()
call onWorkflowUpdateHandle
, we'll want this stage to be there too.I'll change this enum name to
WorkflowUpdateLifecycleStage
(but keep the parameter name aswait_for_stage
) assuming there's no objection. Anyone else have an opinion on what to name this?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we should consider the future
describe()
call, which we can implement today, so calling the enumWorkflowUpdateLifecycleStage
makes a lot more sense.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to
WorkflowUpdateLifecycleStage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems legit to me. Or just leave out
Lifecycle
, not sure it's carrying much weight here. Up to y'all.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Lifecycle
isn't needed --WorkflowUpdateStage
is sufficient.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me, @Quinn-With-Two-Ns thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enum name updated