From a1810537c454ea8bec51d83274b08311d2a5048f Mon Sep 17 00:00:00 2001 From: andoni-guzman <89603073+andoni-guzman@users.noreply.github.com> Date: Wed, 11 May 2022 08:29:32 -0500 Subject: [PATCH] [BEAM-5492] Python Dataflow integration tests should export the pipeline console output to Jenkins Test Result section (#17530) --- .../apache_beam/runners/dataflow/dataflow_runner.py | 9 +++++++-- .../apache_beam/runners/dataflow/test_dataflow_runner.py | 8 +++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 13cbec6dc022..49f7251c0559 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1640,7 +1640,10 @@ def wait_until_finish(self, duration=None): if not self.is_in_terminal_state(): if not self.has_job: raise IOError('Failed to get the Dataflow job id.') - + consoleUrl = ( + "Console URL: https://console.cloud.google.com/" + f"dataflow/jobs//{self.job_id()}" + "?project=") thread = threading.Thread( target=DataflowRunner.poll_for_job_completion, args=(self._runner, self, duration)) @@ -1657,13 +1660,15 @@ def wait_until_finish(self, duration=None): # is_in_terminal_state. terminated = self.is_in_terminal_state() assert duration or terminated, ( - 'Job did not reach to a terminal state after waiting indefinitely.') + 'Job did not reach to a terminal state after waiting indefinitely. ' + '{}'.format(consoleUrl)) # TODO(BEAM-14291): Also run this check if wait_until_finish was called # after the pipeline completed. if terminated and self.state != PipelineState.DONE: # TODO(BEAM-1290): Consider converting this to an error log based on # theresolution of the issue. + _LOGGER.error(consoleUrl) raise DataflowRuntimeException( 'Dataflow pipeline failed. State: %s, Error:\n%s' % (self.state, getattr(self._runner, 'last_error_msg', None)), diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index d4743a558f3e..58bc05c39509 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -55,6 +55,8 @@ def run_pipeline(self, pipeline, options): # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. print('Worker logs: %s' % self.build_console_url(options)) + _LOGGER.info('Console log: ') + _LOGGER.info(self.build_console_url(options)) try: self.wait_until_in_state(PipelineState.RUNNING) @@ -84,7 +86,11 @@ def build_console_url(self, options): def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): """Wait until Dataflow pipeline enters a certain state.""" + consoleUrl = ( + "Console URL: https://console.cloud.google.com/dataflow/" + f"/{self.result.job_id()}?project=") if not self.result.has_job: + _LOGGER.error(consoleUrl) raise IOError('Failed to get the Dataflow job id.') start_time = time.time() @@ -93,7 +99,7 @@ def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT): if self.result.is_in_terminal_state() or job_state == expected_state: return job_state time.sleep(5) - + _LOGGER.error(consoleUrl) raise RuntimeError( 'Timeout after %d seconds while waiting for job %s ' 'enters expected state %s. Current state is %s.' %