Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coalesce sources until compressed serialized bundles under API limit #26267

Merged
merged 2 commits into from
Apr 18, 2023

Conversation

Abacn
Copy link
Contributor

@Abacn Abacn commented Apr 13, 2023

Fixes #26264

The original limitNumberOfBundles logic has problems that it always limits to 100 bundles if the split bundle is greater than this number and did not check the resulting response size. It could result in IllegalArgumentException immediately below.

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@Abacn
Copy link
Contributor Author

Abacn commented Apr 13, 2023

Verified that the unit test fails in current master and suceeds with fix in.

Current master:

Apr 13, 2023 4:25:34 PM org.apache.beam.runners.dataflow.worker.WorkerCustomSources performSplitTyped
WARNING: Splitting source <unknown> into bundles of estimated size 67108864 bytes produced 200 bundles, which have
total serialized size 116265 bytes. As this is too large for the Google Cloud Dataflow API, retrying splitting once with
increased desiredBundleSizeBytes 1560482414 to reduce the number of splits.
Apr 13, 2023 4:25:34 PM org.apache.beam.runners.dataflow.worker.WorkerCustomSources performSplitTyped
INFO: Splitting with desiredBundleSizeBytes 1560482414 produced 200 bundles with total serialized size 116265 bytes
Apr 13, 2023 4:25:34 PM org.apache.beam.runners.dataflow.worker.WorkerCustomSources performSplitTyped
WARNING: Splitting source <unknown> into bundles of estimated size 1560482414 bytes produced 200 bundles. Rebundling into 100 bundles.
Apr 13, 2023 4:25:34 PM org.apache.beam.runners.dataflow.worker.WorkerCustomSources performSplitTyped
INFO: Splitting source <unknown> produced 100 bundles with total serialized response size 58815

Total size of the BoundedSource objects generated by split() operation is larger than the allowable limit. When splitting
<unknown> into bundles of 1560482414 bytes it generated 200 BoundedSource objects with total serialized size of 58815
bytes which is larger than the limit 10000. For more information, please check the corresponding FAQ entry at
https://cloud.google.com/dataflow/pipelines/troubleshooting-your-pipeline
java.lang.IllegalArgumentException: Total size of the BoundedSource objects generated by split() operation is larger than the
allowable limit. When splitting <unknown> into bundles of 1560482414 bytes it generated 200 BoundedSource objects with
total serialized size of 58815 bytes which is larger than the limit 10000. For more information, please check the
corresponding FAQ entry at https://cloud.google.com/dataflow/pipelines/troubleshooting-your-pipeline
	at org.apache.beam.runners.dataflow.worker.WorkerCustomSources.performSplitTyped(WorkerCustomSources.java:286)
	at org.apache.beam.runners.dataflow.worker.WorkerCustomSources.performSplitWithApiLimit(WorkerCustomSources.java:201)
	at org.apache.beam.runners.dataflow.worker.WorkerCustomSourcesTest.testSplittingProducedResponseUnderLimit(WorkerCustomSourcesTest.java:242)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)

With the fix:

Apr 13, 2023 4:27:05 PM org.apache.beam.runners.dataflow.worker.WorkerCustomSources performSplitTyped
WARNING: Splitting source <unknown> into bundles of estimated size 67108864 bytes produced 200 bundles, which have
total serialized size 116265 bytes. As this is too large for the Google Cloud Dataflow API, retrying splitting once with
increased desiredBundleSizeBytes 1560482414 to reduce the number of splits.
Apr 13, 2023 4:27:05 PM org.apache.beam.runners.dataflow.worker.WorkerCustomSources performSplitTyped
INFO: Splitting with desiredBundleSizeBytes 1560482414 produced 200 bundles with total serialized size 116265 bytes
Apr 13, 2023 4:27:05 PM org.apache.beam.runners.dataflow.worker.WorkerCustomSources performSplitTyped
WARNING: Re-bundle source <unknown> into bundles of estimated size 13330 bytes produced 16 bundles.
Apr 13, 2023 4:27:06 PM org.apache.beam.runners.dataflow.worker.WorkerCustomSources performSplitTyped
WARNING: Re-bundle source <unknown> into bundles of estimated size 9814 bytes produced 11 bundles.
Apr 13, 2023 4:27:06 PM org.apache.beam.runners.dataflow.worker.WorkerCustomSources performSplitTyped
INFO: Splitting source <unknown> produced 11 bundles with total serialized response size 9824

@Abacn
Copy link
Contributor Author

Abacn commented Apr 13, 2023

R: @chamikaramj @kennknowles per file history

internal tracker: 277995484

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

+ "of %d bytes is still larger than the limit %d. For more information, please "
+ "check the corresponding FAQ entry at "
+ "https://cloud.google.com/dataflow/pipelines/troubleshooting-your-pipeline",
source, bundles.size(), serializedSize, apiByteLimit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we suggest this workaround with withTemplateCompatiblity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option is BigQueryIO.read only while the bug is generic to dataflow worker. With that option the read teansform will expand differently and likely avoided the problematic code path here (haven't verify)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workaround will be using Runner v2. That will convert sources into SDFs that do not run into this source split limit. We should suggest that here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chamikaramj is this code path only for Dataflow runner v1? If so we could document the recommendation of runner v2 also in https://cloud.google.com/dataflow/docs/guides/common-errors#boundedsource-objects-splitintobundles

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this codepath (source split operation) is only get hit by Runner v1. +1 for documenting in the Website.

String.format(
"Unable to coalesce the sources into compressed serialized bundles to satisfy the "
+ "allowable limit when splitting %s. With %d bundles, total serialized size "
+ "of %d bytes is still larger than the limit %d. For more information, please "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use this https://cloud.google.com/dataflow/docs/guides/common-errors#boundedsource-objects-splitintobundles? Please update the previous LOG.info with this link as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, this url is more accurate

@Abacn
Copy link
Contributor Author

Abacn commented Apr 17, 2023

Run Java PreCommit

@chamikaramj
Copy link
Contributor

LGTM

@Abacn
Copy link
Contributor Author

Abacn commented Apr 18, 2023

Thanks for suggestions. The linked cloud documentation is going to be updated. Merging the fix for now.

@Abacn Abacn merged commit 51d857f into apache:master Apr 18, 2023
@Abacn Abacn deleted the fixresplit branch April 18, 2023 18:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants