diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 706e1e61e239..53cdbad5d71d 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -69,11 +69,6 @@ def _import_beam_plugins(plugins): def create_harness(environment, dry_run=False): """Creates SDK Fn Harness.""" - pipeline_options_dict = _load_pipeline_options( - environment.get('PIPELINE_OPTIONS')) - default_log_level = _get_log_level_from_options_dict(pipeline_options_dict) - logging.getLogger().setLevel(default_log_level) - _set_log_level_overrides(pipeline_options_dict) if 'LOGGING_API_SERVICE_DESCRIPTOR' in environment: try: @@ -94,6 +89,12 @@ def create_harness(environment, dry_run=False): else: fn_log_handler = None + pipeline_options_dict = _load_pipeline_options( + environment.get('PIPELINE_OPTIONS')) + default_log_level = _get_log_level_from_options_dict(pipeline_options_dict) + logging.getLogger().setLevel(default_log_level) + _set_log_level_overrides(pipeline_options_dict) + # These are used for dataflow templates. RuntimeValueProvider.set_runtime_options(pipeline_options_dict) sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict) @@ -256,8 +257,8 @@ def _get_log_level_from_options_dict(options_dict: dict) -> int: """Get log level from options dict's entry `default_sdk_harness_log_level`. If not specified, default log level is logging.INFO. """ - log_level = options_dict.get('default_sdk_harness_log_level', 'INFO') - + dict_level = options_dict.get('default_sdk_harness_log_level', 'INFO') + log_level = dict_level if log_level.isdecimal(): log_level = int(log_level) else: @@ -265,7 +266,7 @@ def _get_log_level_from_options_dict(options_dict: dict) -> int: log_level = getattr(logging, log_level, None) if not isinstance(log_level, int): # unknown log level. - _LOGGER.error("Unknown log level. Use default value INFO.", exc_info=True) + _LOGGER.error("Unknown log level %s. Use default value INFO.", dict_level) log_level = logging.INFO return log_level diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py index fe15b579d3c5..1e976bc00016 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py @@ -19,6 +19,7 @@ # pytype: skip-file +import io import logging import unittest @@ -100,6 +101,30 @@ def test_runtime_values(self): self.assertTrue(test_runtime_provider.is_accessible()) self.assertEqual(test_runtime_provider.get(), 37) + def test_create_sdk_harness_log_handler_received_log(self): + # tests that the log handler created in create_harness() does not miss + # logs emitted from create_harness() itself. + logstream = io.StringIO() + + class InMemoryHandler(logging.StreamHandler): + def __init__(self, *unused): + super().__init__(stream=logstream) + + with unittest.mock.patch( + 'apache_beam.runners.worker.sdk_worker_main.FnApiLogRecordHandler', + InMemoryHandler): + sdk_worker_main.create_harness({ + 'LOGGING_API_SERVICE_DESCRIPTOR': '', + 'CONTROL_API_SERVICE_DESCRIPTOR': '', + 'PIPELINE_OPTIONS': '{"default_sdk_harness_log_level":"INVALID",' + '"sdk_harness_log_level_overrides":"{INVALID_JSON}"}', + }, + dry_run=True) + logstream.seek(0) + logs = logstream.read() + self.assertIn('Unknown log level', logs) + self.assertIn('Unable to parse sdk_harness_log_level_overrides', logs) + def test_import_beam_plugins(self): sdk_worker_main._import_beam_plugins(BeamPlugin.get_all_plugin_paths())