Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend wait_for on workflow context to take multiple futures and a condition block #111

Merged
merged 1 commit into from
Nov 12, 2021

Conversation

jeffschoner-stripe
Copy link
Contributor

@jeffschoner-stripe jeffschoner-stripe commented Oct 29, 2021

Summary

Extends the wait_for method on workflow context to take multiple futures and a condition block. This provides the equivalent to Workflow.await in Java and workflow.Await in Go and a "wait for any" function. This function blocks workflow progress until one of the futures is finished or the block returns true. It can be used for a variety of cases where a workflow needs to wait for the completion of any combination of activities or timer completion, or receipt of a signal.

Also included:

  • A new WaitForWorkflow example that demonstrates all three of these cases, along with an integration test to run it and verify its behavior
  • Support in local workflow context testing mode
  • Unit tests for changes to the dispatcher

Testing

New new dispatcher unit tests: rspec spec/unit/lib/temporal/workflow/dispatcher_spec.rb:78
New integration test: pushd examples && rspec spec/integration/wait_for_workflow_spec.rb && popd (need to run pushd examples && bin/worker in another terminal)

Copy link
Contributor

@antstorm antstorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @jeffschoner-stripe, this is a really cool one!

I left a few inline comments. Couple bigger points to think about:

  1. I understand that await matches Java & Go SDKs, but it doesn't match our wait_for and wait_for_all. Not necessarily a problem, but wonder how you think about it
  2. We can probably add other argument types to await (just a straight boolean and a future), it doesn't always have to be a block and can then replace wait_for. Doesn't have to be this PR, but let's make sure APIs are compatible
  3. We need to make the example a bit clearer, maybe we can break it down into smaller methods? Plus a few inline comments on it
  4. Dispatcher changes

Please let me know if all that makes sense


future = EchoActivity.execute("hi #{i}")
future.done do
activity_futures.delete(i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it's never going to happen, this is technically a race condition — a callback depends on a statement executed on line 45

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what to do about this. It is a race condition in threaded code, but it's not based on how workflow code is currently executed. Ideas on how to make this more proper? I don't have much expertise with multi-threaded Ruby code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just move the line activity_futures[I] = future to happen before you're setting the callback :) The reason it's not causing a race is because EchoActivity.execute doesn't start the activity straight away, but rather at the next blocking point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it now. I think as written, it would fail if run in unit test mode, although it does in practice work fine running against a Temporal server.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works against the Temporal server because the commands are never dispatched before a blocking call (in this case it would be the next iteration's .await). But still a good idea to keep these things free from races

10.times do |i|
workflow.await do
workflow.logger.info("Activities in flight #{activity_futures.length}")
activity_futures.length < 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment on what this does? Wasn't obvious at first glance that it's a concurrency limiter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, 2 deserves to be a constant or an argument :)

activity_futures = {}
echos_completed = 0

10.times do |i|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is referenced at least 2, let's move it into a constant or a input argument

end

def register_handler(target, event_name, &handler)
handlers[target] << [event_name, handler]
end

def register_await_handler(&handler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a testing-only code and should not be on a call path in production. We can either move it to one of the Testing extensions and apply when require 'temporal/testing' or figure out a way when it's not needed (similar to production execution, but might require a bigger rewrite)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function is used by the workflow context to register a global, non-target handler. I could refactor this into register_handler where, say, if target is nil we treat it as a wildcard that affects all targets. What do you think about that approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, I think I mixed local and main contexts. The approach for using a wildcard handler target works 👍

def register_await_handler(&handler)
# This should only happen in situations where multithreading is being used
# in workflow code which is unsupported.
raise 'Await handler already active' unless await_handler.nil?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen if an await call is placed in a callback to a future/signal/timer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a test for this. I originally had an array of await handlers, but convinced myself there should only be one active at a time. It'll be easy to restore that original approach, but I need to make sure the behavior is as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know what you mean. When programming a reactor you either need to disable certain behaviours or be prepared for any amount of context entanglement. I found the latter to be the simplest and most elegant solution

# Invoking the await handler needs to be done after target-specific handlers
# because an activity, sleep, or signal may have completed that affects the state
# evaluated in an await condition.
await_handler.call(unset_await_handler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this code is slightly misplaced. Dispatcher is a generic event dispatching mechanism and should not be aware of more specific logic. This implementation suggests that an await call is coupled to the dispatch, which may not be the case.

I have a sense that await actually might not require a dispatch at all — it's always called synchronously (from the current Fiber) and is not itself a callback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, I need to check all await conditions whenever something potentially being waited on has completed (signal, activity, timer). I'm not sure how to do this without hooking into the dispatcher, as this is the central place where those completions trigger callbacks.

@@ -126,4 +126,23 @@ def execute
# Heartbeat doesn't do anything in local mode, but at least it can be called.
workflow_context.execute_activity!(TestHeartbeatingActivity)
end

it 'can await with false condition' do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this needs to be in describe '#await' do block for clarify and to match our current testing structure

@jeffschoner-stripe
Copy link
Contributor Author

Thanks for the PR, @jeffschoner-stripe, this is a really cool one!

I left a few inline comments. Couple bigger points to think about:

  1. I understand that await matches Java & Go SDKs, but it doesn't match our wait_for and wait_for_all. Not necessarily a problem, but wonder how you think about it

Would you prefer that I merge this logic into wait_for where it behaves this wait if a block is passed? Or maybe naming it something like wait_until?

  1. We can probably add other argument types to await (just a straight boolean and a future), it doesn't always have to be a block and can then replace wait_for. Doesn't have to be this PR, but let's make sure APIs are compatible
  2. We need to make the example a bit clearer, maybe we can break it down into smaller methods? Plus a few inline comments on it
  3. Dispatcher changes

Please let me know if all that makes sense

These make sense. I'll work on some revisions.

@jeffschoner-stripe
Copy link
Contributor Author

@antstorm I still have more to do, but take a look at how I've refactored the dispatcher to be more generic, as I think this is the most fundamental change requested.

I've tried testing an await within a signal block, and the fiber seems to just end up never being resumed. I still need to experiment more with this. I've seen similar problems synchronously starting activities (i.e., waiting on a future) within a signal handler. Folks at Temporal have recommended against doing this generally, but I need to figure out if I can fix this, or at least have confidence I'm making this no worse than the current state.

@antstorm
Copy link
Contributor

antstorm commented Nov 2, 2021

@jeffschoner-stripe thank you for the changes, I really like how it's shaping up. Please let me know if you need help with figuring out the issues you've mentioned.

There's definitely some overlap between wait_for and await. I'll think a bit more about this one, but I don't want to block your PR too much by this.

@jeffschoner-stripe
Copy link
Contributor Author

@antstorm, I believe I've addressed all your feedback now

I merged await into wait_for. wait_for can now take any number of futures or a condition block. This does need to be a block returning a boolean value since it must be evaluated each time through.

Copy link
Contributor

@antstorm antstorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, just a few more wrinkles to iron out, but it's almost there 👍

else
should_yield = true

dispatcher.register_handler(Dispatcher::WILDCARD, Dispatcher::WILDCARD) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a slight problem here — we're not de-registering the handler afterwards, which means that it will be called for every event until the end of a workflow. It won't be doing anything because of that blocked variable, but something we might want to address. It wasn't an issue previously because each target was expected to fire each event only once, but still keeps these references in memory.

This can be done in a follow-up, I might have a look at it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose once it's unblocked, it can be unregistered. The interplay on this is messy because it needs a callback. I'll see if I can do this cleanly or if it's best to revise later.

else
should_yield = true
futures.each do |future|
dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we actually need to separate between the futures and unblock condition cases here? There are probably arguments to both approaches:

  1. Only using a single wildcard handler for both cases means that it might get called more than needed, but it simplifies the code (only need to register one handler)
  2. Using future-specific and a wildcard handlers is a bit of an optimisation for when only futures are passed, but is actually less optimal when both are passed

I wonder what do you think about this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this code path is used when invoking activities synchronously or sleeps, which are almost certainly the most common cases. I could register separate handlers here, rather than handling the separate cases inside the dispatcher. I'll give this alternative a try to see how it looks once implemented, especially with the unregistration in place.

Copy link
Contributor

@antstorm antstorm Nov 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good 👍 But also happy to rearrange things in later PRs to keep this one contained and ship the feature. As long as public-facing APIs are stable, which I believe they are

blocked = true

if futures.any?
if futures.all?(&:finished?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be futures.any?(&:finished?), right? At least judging by the example and a test implementation it waits for a single future before proceeding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found another similar issue here where if the condition already evaluates to true, we should not block on the futures, yield, or register any dispatch handlers for them

@jeffschoner-stripe
Copy link
Contributor Author

I've fixed the any?/all? mix-up, as well as a related case, and covered both in the integration test. Agree saving unregistration for a subsequent PR is worthwhile to improve performance.

@jeffschoner-stripe
Copy link
Contributor Author

I've also tidied my commits for a cleaner merge

@jeffschoner-stripe jeffschoner-stripe changed the title Workflow await Extend wait_for on workflow context to take multiple futures and a condition block Nov 9, 2021
Copy link
Contributor

@antstorm antstorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thank you @jeffschoner-stripe, this is a big step forward!

@antstorm antstorm merged commit 8c36d90 into coinbase:master Nov 12, 2021
@antstorm antstorm added the sync pending Needs to be ported to cadence-ruby label Nov 12, 2021
gvieira pushed a commit to clearbit/temporal-ruby that referenced this pull request Dec 21, 2021
* Pass config to the error handler instead of using the global config

* Fix example tests

* Make the config property in the workflow context publicly readable

* [Fix] Retryer GRPC error lookup (coinbase#109)

* Fix issue with GRPC error lookup in Retryer

* Rename spec file for retryer to contain _spec

* [Feature] Add id and domain to workflow context's metadata (coinbase#110)

* Start syncing id and domain on workflow context metadata

* Fixed tests

Co-authored-by: DeRauk Gibble <[email protected]>

* Explicit docker-compose project name (coinbase#114)

* Add YARD documentation for Temporal::Client (coinbase#113)

* Add YARD documentation for Temporal::Client

* Add yard gem

* Fix @option tag

* Typo fix

* Add signal arguments to start_workflow (support for signal_with_start) (coinbase#112)

* Add signal arguments to start_workflow (to support signal_with_start)

* Move signal arguments to the options hash

* PR feedback

* Fix merge error

* Extend #wait_for to take multiple futures and a condition block (coinbase#111)

* Differentiate TARGET_WILDCARD and WILDCARD, allow comparison with EventTarget objects (coinbase#118)

* Turn off schedule_to_start activity timeout by default (coinbase#119)

* Separate options from keyword args in #start_workflow (coinbase#117)

* Separate options from keyword args in #start_workflow

* fixup! Separate options from keyword args in #start_workflow

* Surface additional workflow metadata on workflow context (coinbase#120)

* Refactor metadata generation

* Make task queue available on workflow metadata, add example test

* Expose workflow start time metadata

* Add memos (coinbase#121)

* Add describe_namespace (coinbase#122)

* Add describe_namespace

* Feedback

* Improve header serialization and propagation (coinbase#124)

* [Fix] Non-started activity cancellation (coinbase#125)

* Fix event target map entry for ACTIVITY_CANCELED event

* Fix cancellation of a non-started activity

* fixup! Fix event target map entry for ACTIVITY_CANCELED event

Co-authored-by: DeRauk Gibble <[email protected]>
Co-authored-by: DeRauk Gibble <[email protected]>
Co-authored-by: Anthony Dmitriyev <[email protected]>
Co-authored-by: nagl-stripe <[email protected]>
Co-authored-by: jeffschoner-stripe <[email protected]>
Co-authored-by: Drew Hoskins <[email protected]>
bryton-stripe added a commit to bryton-stripe/temporal-ruby that referenced this pull request Dec 8, 2022
…ter-bryton-task-queue

Add task queue tag to workflow task failure metrics & task queue time metrics
christopherb-stripe pushed a commit to christopherb-stripe/temporal-ruby that referenced this pull request Jan 9, 2023
…ter-bryton-task-queue

Add task queue tag to workflow task failure metrics & task queue time metrics
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sync pending Needs to be ported to cadence-ruby
Development

Successfully merging this pull request may close these issues.

2 participants