-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend wait_for on workflow context to take multiple futures and a condition block #111
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @jeffschoner-stripe, this is a really cool one!
I left a few inline comments. Couple bigger points to think about:
- I understand that
await
matches Java & Go SDKs, but it doesn't match ourwait_for
andwait_for_all
. Not necessarily a problem, but wonder how you think about it - We can probably add other argument types to
await
(just a straight boolean and a future), it doesn't always have to be a block and can then replacewait_for
. Doesn't have to be this PR, but let's make sure APIs are compatible - We need to make the example a bit clearer, maybe we can break it down into smaller methods? Plus a few inline comments on it
- Dispatcher changes
Please let me know if all that makes sense
examples/workflows/await_workflow.rb
Outdated
|
||
future = EchoActivity.execute("hi #{i}") | ||
future.done do | ||
activity_futures.delete(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While it's never going to happen, this is technically a race condition — a callback depends on a statement executed on line 45
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure what to do about this. It is a race condition in threaded code, but it's not based on how workflow code is currently executed. Ideas on how to make this more proper? I don't have much expertise with multi-threaded Ruby code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just move the line activity_futures[I] = future
to happen before you're setting the callback :) The reason it's not causing a race is because EchoActivity.execute
doesn't start the activity straight away, but rather at the next blocking point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it now. I think as written, it would fail if run in unit test mode, although it does in practice work fine running against a Temporal server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it works against the Temporal server because the commands are never dispatched before a blocking call (in this case it would be the next iteration's .await
). But still a good idea to keep these things free from races
examples/workflows/await_workflow.rb
Outdated
10.times do |i| | ||
workflow.await do | ||
workflow.logger.info("Activities in flight #{activity_futures.length}") | ||
activity_futures.length < 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a comment on what this does? Wasn't obvious at first glance that it's a concurrency limiter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, 2
deserves to be a constant or an argument :)
examples/workflows/await_workflow.rb
Outdated
activity_futures = {} | ||
echos_completed = 0 | ||
|
||
10.times do |i| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is referenced at least 2, let's move it into a constant or a input argument
lib/temporal/workflow/dispatcher.rb
Outdated
end | ||
|
||
def register_handler(target, event_name, &handler) | ||
handlers[target] << [event_name, handler] | ||
end | ||
|
||
def register_await_handler(&handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a testing-only code and should not be on a call path in production. We can either move it to one of the Testing extensions and apply when require 'temporal/testing'
or figure out a way when it's not needed (similar to production execution, but might require a bigger rewrite)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function is used by the workflow context to register a global, non-target handler. I could refactor this into register_handler
where, say, if target
is nil
we treat it as a wildcard that affects all targets. What do you think about that approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see, I think I mixed local and main contexts. The approach for using a wildcard handler target works 👍
lib/temporal/workflow/dispatcher.rb
Outdated
def register_await_handler(&handler) | ||
# This should only happen in situations where multithreading is being used | ||
# in workflow code which is unsupported. | ||
raise 'Await handler already active' unless await_handler.nil? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can happen if an await
call is placed in a callback to a future/signal/timer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a test for this. I originally had an array of await handlers, but convinced myself there should only be one active at a time. It'll be easy to restore that original approach, but I need to make sure the behavior is as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I know what you mean. When programming a reactor you either need to disable certain behaviours or be prepared for any amount of context entanglement. I found the latter to be the simplest and most elegant solution
lib/temporal/workflow/dispatcher.rb
Outdated
# Invoking the await handler needs to be done after target-specific handlers | ||
# because an activity, sleep, or signal may have completed that affects the state | ||
# evaluated in an await condition. | ||
await_handler.call(unset_await_handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like this code is slightly misplaced. Dispatcher is a generic event dispatching mechanism and should not be aware of more specific logic. This implementation suggests that an await call is coupled to the dispatch
, which may not be the case.
I have a sense that await actually might not require a dispatch at all — it's always called synchronously (from the current Fiber) and is not itself a callback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, I need to check all await conditions whenever something potentially being waited on has completed (signal, activity, timer). I'm not sure how to do this without hooking into the dispatcher, as this is the central place where those completions trigger callbacks.
@@ -126,4 +126,23 @@ def execute | |||
# Heartbeat doesn't do anything in local mode, but at least it can be called. | |||
workflow_context.execute_activity!(TestHeartbeatingActivity) | |||
end | |||
|
|||
it 'can await with false condition' do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this needs to be in describe '#await' do
block for clarify and to match our current testing structure
Would you prefer that I merge this logic into
These make sense. I'll work on some revisions. |
@antstorm I still have more to do, but take a look at how I've refactored the dispatcher to be more generic, as I think this is the most fundamental change requested. I've tried testing an await within a signal block, and the fiber seems to just end up never being resumed. I still need to experiment more with this. I've seen similar problems synchronously starting activities (i.e., waiting on a future) within a signal handler. Folks at Temporal have recommended against doing this generally, but I need to figure out if I can fix this, or at least have confidence I'm making this no worse than the current state. |
@jeffschoner-stripe thank you for the changes, I really like how it's shaping up. Please let me know if you need help with figuring out the issues you've mentioned. There's definitely some overlap between |
@antstorm, I believe I've addressed all your feedback now I merged |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, just a few more wrinkles to iron out, but it's almost there 👍
else | ||
should_yield = true | ||
|
||
dispatcher.register_handler(Dispatcher::WILDCARD, Dispatcher::WILDCARD) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a slight problem here — we're not de-registering the handler afterwards, which means that it will be called for every event until the end of a workflow. It won't be doing anything because of that blocked
variable, but something we might want to address. It wasn't an issue previously because each target was expected to fire each event only once, but still keeps these references in memory.
This can be done in a follow-up, I might have a look at it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose once it's unblocked, it can be unregistered. The interplay on this is messy because it needs a callback. I'll see if I can do this cleanly or if it's best to revise later.
else | ||
should_yield = true | ||
futures.each do |future| | ||
dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we actually need to separate between the futures and unblock condition cases here? There are probably arguments to both approaches:
- Only using a single wildcard handler for both cases means that it might get called more than needed, but it simplifies the code (only need to register one handler)
- Using future-specific and a wildcard handlers is a bit of an optimisation for when only futures are passed, but is actually less optimal when both are passed
I wonder what do you think about this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this code path is used when invoking activities synchronously or sleeps, which are almost certainly the most common cases. I could register separate handlers here, rather than handling the separate cases inside the dispatcher. I'll give this alternative a try to see how it looks once implemented, especially with the unregistration in place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good 👍 But also happy to rearrange things in later PRs to keep this one contained and ship the feature. As long as public-facing APIs are stable, which I believe they are
lib/temporal/workflow/context.rb
Outdated
blocked = true | ||
|
||
if futures.any? | ||
if futures.all?(&:finished?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be futures.any?(&:finished?)
, right? At least judging by the example and a test implementation it waits for a single future before proceeding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found another similar issue here where if the condition already evaluates to true, we should not block on the futures, yield, or register any dispatch handlers for them
31a6c8f
to
1a00f11
Compare
I've fixed the |
I've also tidied my commits for a cleaner merge |
1a00f11
to
0779681
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, thank you @jeffschoner-stripe, this is a big step forward!
* Pass config to the error handler instead of using the global config * Fix example tests * Make the config property in the workflow context publicly readable * [Fix] Retryer GRPC error lookup (coinbase#109) * Fix issue with GRPC error lookup in Retryer * Rename spec file for retryer to contain _spec * [Feature] Add id and domain to workflow context's metadata (coinbase#110) * Start syncing id and domain on workflow context metadata * Fixed tests Co-authored-by: DeRauk Gibble <[email protected]> * Explicit docker-compose project name (coinbase#114) * Add YARD documentation for Temporal::Client (coinbase#113) * Add YARD documentation for Temporal::Client * Add yard gem * Fix @option tag * Typo fix * Add signal arguments to start_workflow (support for signal_with_start) (coinbase#112) * Add signal arguments to start_workflow (to support signal_with_start) * Move signal arguments to the options hash * PR feedback * Fix merge error * Extend #wait_for to take multiple futures and a condition block (coinbase#111) * Differentiate TARGET_WILDCARD and WILDCARD, allow comparison with EventTarget objects (coinbase#118) * Turn off schedule_to_start activity timeout by default (coinbase#119) * Separate options from keyword args in #start_workflow (coinbase#117) * Separate options from keyword args in #start_workflow * fixup! Separate options from keyword args in #start_workflow * Surface additional workflow metadata on workflow context (coinbase#120) * Refactor metadata generation * Make task queue available on workflow metadata, add example test * Expose workflow start time metadata * Add memos (coinbase#121) * Add describe_namespace (coinbase#122) * Add describe_namespace * Feedback * Improve header serialization and propagation (coinbase#124) * [Fix] Non-started activity cancellation (coinbase#125) * Fix event target map entry for ACTIVITY_CANCELED event * Fix cancellation of a non-started activity * fixup! Fix event target map entry for ACTIVITY_CANCELED event Co-authored-by: DeRauk Gibble <[email protected]> Co-authored-by: DeRauk Gibble <[email protected]> Co-authored-by: Anthony Dmitriyev <[email protected]> Co-authored-by: nagl-stripe <[email protected]> Co-authored-by: jeffschoner-stripe <[email protected]> Co-authored-by: Drew Hoskins <[email protected]>
…ter-bryton-task-queue Add task queue tag to workflow task failure metrics & task queue time metrics
…ter-bryton-task-queue Add task queue tag to workflow task failure metrics & task queue time metrics
Summary
Extends the
wait_for
method on workflow context to take multiple futures and a condition block. This provides the equivalent toWorkflow.await
in Java andworkflow.Await
in Go and a "wait for any" function. This function blocks workflow progress until one of the futures is finished or the block returns true. It can be used for a variety of cases where a workflow needs to wait for the completion of any combination of activities or timer completion, or receipt of a signal.Also included:
WaitForWorkflow
example that demonstrates all three of these cases, along with an integration test to run it and verify its behaviorTesting
New new dispatcher unit tests:
rspec spec/unit/lib/temporal/workflow/dispatcher_spec.rb:78
New integration test:
pushd examples && rspec spec/integration/wait_for_workflow_spec.rb && popd
(need to runpushd examples && bin/worker
in another terminal)