Skip to content

Commit

Permalink
[Python] Add the "no_wait_until_finish" option without waiting fo the…
Browse files Browse the repository at this point in the history
… job completion
  • Loading branch information
liferoad authored and reeba212 committed Dec 4, 2024
1 parent b5aa5c3 commit a1e7a10
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,14 @@ def _add_argparse_args(cls, parser):
'updating a pipeline or reloading the job state. '
'This is not recommended for streaming jobs.')

parser.add_argument(
'--no_wait_until_finish',
default=False,
action='store_true',
help='By default, the "with" statement waits for the job to '
'complete. Set this flag to bypass this behavior and continue '
'execution immediately')


class StreamingOptions(PipelineOptions):
@classmethod
Expand Down
8 changes: 7 additions & 1 deletion sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,13 @@ def __exit__(
try:
if not exc_type:
self.result = self.run()
self.result.wait_until_finish()
if not self._options.view_as(StandardOptions).no_wait_until_finish:
self.result.wait_until_finish()
else:
logging.info(
'Job execution continues without waiting for completion.'
' Use "wait_until_finish" in PipelineResult to block'
' until finished.')
finally:
self._extra_context.__exit__(exc_type, exc_val, exc_tb)

Expand Down
12 changes: 11 additions & 1 deletion sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ def test_reuse_custom_transform_instance(self):
'reloading the job state. This is not recommended for '
'streaming jobs.')

@mock.patch('logging.info') # Mock the logging.info function
def test_no_wait_until_finish(self, mock_info):
with Pipeline(runner='DirectRunner',
options=PipelineOptions(["--no_wait_until_finish"])) as p:
_ = p | beam.Create(['test'])
mock_info.assert_called_once_with(
'Job execution continues without waiting for completion. '
'Use "wait_until_finish" in PipelineResult to block until finished.')
p.result.wait_until_finish()

def test_auto_unique_labels(self):

opts = PipelineOptions(["--auto_unique_labels"])
Expand Down Expand Up @@ -773,7 +783,7 @@ def test_windowed_value_param(self):
| Map(lambda _, wv=DoFn.WindowedValueParam: (wv.value, wv.windows)))
assert_that(
pcoll,
equal_to([(1, [IntervalWindow(0, 5)]), (7, [IntervalWindow(5, 10)])]))
equal_to([(1, [IntervalWindow(0, 5)]), (7, [IntervalWindow(5, 10)])])) # pylint: disable=too-many-function-args

def test_timestamp_param(self):
class TestDoFn(DoFn):
Expand Down

0 comments on commit a1e7a10

Please sign in to comment.