Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#32167][Prism] Use the worker-id gRPC metadata #33438

Merged
merged 18 commits into from
Jan 3, 2025

Conversation

damondouglas
Copy link
Contributor

@damondouglas damondouglas commented Dec 23, 2024

This PR closes #32167 via implementation of a worker.MultiplexW to forward FnAPI gRPC requests to *W stored by id that matches the worker-id gRPC context metadata. In addition to typical go test workflow, to validate this PR ./gradlew :runners:portability:java:ulrLoopbackValidatesRunnerTests -PjobEndpoint=localhost:8073 was ran on a few initial tests. The idle_shutdown_timeout was tested by visual inspection to validate the additional service does not block the executable from shutting down.

Depends on PR #33453


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@damondouglas
Copy link
Contributor Author

damondouglas commented Dec 24, 2024

See #33450 and its resolving PR #33453:

At 3636a3c, Python sends an empty string for worker_id key in the gRPC metadata. Debugging the workerFromMetadataCtx method, I see a worker_id sent over when running python apache_beam/examples/wordcount.py, except for a single instance.

The error worker id in ctx metadata is an empty string is specific to id == "". This is a conditional after grpcx.ReadWorkerID successfully passes all checks for available metadata.FromIncomingContext, and the metadata.MD has the worker_id key, and finally that the length of the metadata.MD[worker_id] is 1.

worker id in ctx metadata is an empty string

``` grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = "worker id in ctx metadata is an empty string" debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"worker id in ctx metadata is an empty string", grpc_status:2, created_time:"2024-12-23T16:08:00.205761402-08:00"}" >

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
response = task()
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 386, in
lambda: self.create_worker().do_instruction(request), request)
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 657, in do_instruction
return getattr(self, request_type)(
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 694, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "sdks/python/apache_beam/runners/worker/bundle_processor.py", line 1274, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "sdks/python/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 567, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 260, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 1159, in apache_beam.runners.worker.operations.CombineOperation.process
File "apache_beam/runners/worker/operations.py", line 1163, in apache_beam.runners.worker.operations.CombineOperation.process
File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 260, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 1159, in apache_beam.runners.worker.operations.CombineOperation.process
File "apache_beam/runners/worker/operations.py", line 1163, in apache_beam.runners.worker.operations.CombineOperation.process
File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 260, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1591, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 689, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1686, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1799, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1591, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 917, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 1059, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1686, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1799, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1612, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 917, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 1000, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "sdks/python/apache_beam/runners/worker/bundle_processor.py", line 499, in getitem
self._cache[target_window] = self._side_input_data.view_fn(raw_view)
File "sdks/python/apache_beam/pvalue.py", line 389, in
lambda iterable: from_runtime_iterable(iterable, view_options))
File "sdks/python/apache_beam/pvalue.py", line 509, in _from_runtime_iterable
head = list(itertools.islice(it, 2))
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1289, in _lazy_iterator
input_stream, continuation_token = self._get_raw(
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1307, in _get_raw
self._underlying.get_raw(state_key, continuation_token))
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1094, in get_raw
response = self._blocking_request(
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1134, in _blocking_request
raise self._exception
File ".pyenv/versions/3.10.15/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File ".pyenv/versions/3.10.15/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1069, in pull_responses
for response in responses:
File "sdks/python/.venv/lib/python3.10/site-packages/grpc/_channel.py", line 543, in next
return self._next()
File "sdks/python/.venv/lib/python3.10/site-packages/grpc/_channel.py", line 969, in _next
raise self
RuntimeError: grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "worker id in ctx metadata is an empty string"
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"worker id in ctx metadata is an empty string", grpc_status:2, created_time:"2024-12-23T16:08:00.205761402-08:00"}"

[while running 'Write/Write/WriteImpl/WriteBundles']


</details>

</summary>

Copy link

codecov bot commented Dec 30, 2024

Codecov Report

Attention: Patch coverage is 0% with 1 line in your changes missing coverage. Please review.

Project coverage is 59.02%. Comparing base (18ec331) to head (8680593).
Report is 10 commits behind head on master.

Files with missing lines Patch % Lines
...go/pkg/beam/runners/prism/internal/environments.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #33438      +/-   ##
============================================
- Coverage     59.02%   59.02%   -0.01%     
  Complexity     3185     3185              
============================================
  Files          1146     1147       +1     
  Lines        176085   176108      +23     
  Branches       3368     3368              
============================================
+ Hits         103942   103944       +2     
- Misses        68787    68804      +17     
- Partials       3356     3360       +4     
Flag Coverage Δ
python 81.23% <ø> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@damondouglas damondouglas marked this pull request as ready for review December 30, 2024 21:15
@damondouglas
Copy link
Contributor Author

R: @lostluck

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@lostluck lostluck changed the title [Prism] Use the worker-id gRPC metadata [#32167][Prism] Use the worker-id gRPC metadata Dec 30, 2024
Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close but we can improve it! This top comment is a summary of what the more pointed comments are mentioning.

As it stands, we should not introduce a package global (the worker pool). That will only make things harder for later maintainers, so getting the plumbing right now is a good idea.

The main adjustment would be to not make it so we need to add 4 new locations for creating the net.Listen calls. We shouldn't need to be adding more of those as a result of this work. In principle, we should be getting fewer of them.

First, make the worker pool a struct. You already have this type in the form of the MultiplexW type. Move the map, and the lock onto that, and out of the package namespace.

Then we add a method to register onto a provided GRPC server, and stop making it's own. This lets us re-use an existing GRPC server, such as the one for JobManagement.

prism.CreateJobServer can be updated to create the pool and pass it to jobservices.NewServer. For it's part, the jobservices package can have an interface for accepting this worker factory, without even refering directly to any of the types from the worker package. This lets the Job spit out prefixed workers.

The main gap is that worker production method would need to be part of the interface, so it can be exposed via the Job type. That method can return an any, that we can then cast to a *worker.W to do the remaining worker set up. I don't love this, but it's not the end of the world. If that type is wrong, everything breaks very early, which is good.

sdks/go/cmd/prism/prism.go Outdated Show resolved Hide resolved
sdks/go/cmd/prism/prism.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/runners/prism/internal/execute.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/runners/prism/internal/execute.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go Outdated Show resolved Hide resolved
@damondouglas damondouglas requested a review from lostluck January 1, 2025 17:11
Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some largely minor comments I trust you can change without an additional review gate. Please make them before merging.

Please let me know if you have questions or want a subsequent final review.

@github-actions github-actions bot removed the python label Jan 3, 2025
@damondouglas damondouglas merged commit f27547d into apache:master Jan 3, 2025
10 checks passed
@damondouglas damondouglas deleted the prism-multiplex-fnapi branch January 3, 2025 18:12
stankiewicz pushed a commit to stankiewicz/beam that referenced this pull request Jan 16, 2025
* Implement MultiplexW and Pool

* Add missing license header

* Add multiplex worker to prism execute

* remove unused props

* Fix Prism python precommit

* Handle worker_id is empty string error

* Fix python worker id interceptor

* default empty _worker_id

* Revert defaulting worker id

* Fix worker_id in docker env

* Update per PR comments

* Add lock/unlock to MultiplexW

* Delegate W deletion via MW

* Remove unnecessary guard

* Small fixes after PR review

* Add code comment to MakeWorker

* clean up commented out code

* Revert portable/common changes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Prism] Use the worker-id gRPC metadata
2 participants