Skip to content

Commit

Permalink
Override log levels after log handler is created (#22191)
Browse files Browse the repository at this point in the history
Co-authored-by: tvalentyn <[email protected]>
  • Loading branch information
Abacn and tvalentyn authored Jul 11, 2022
1 parent 6e16941 commit eb071fa
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
17 changes: 9 additions & 8 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ def _import_beam_plugins(plugins):

def create_harness(environment, dry_run=False):
"""Creates SDK Fn Harness."""
pipeline_options_dict = _load_pipeline_options(
environment.get('PIPELINE_OPTIONS'))
default_log_level = _get_log_level_from_options_dict(pipeline_options_dict)
logging.getLogger().setLevel(default_log_level)
_set_log_level_overrides(pipeline_options_dict)

if 'LOGGING_API_SERVICE_DESCRIPTOR' in environment:
try:
Expand All @@ -94,6 +89,12 @@ def create_harness(environment, dry_run=False):
else:
fn_log_handler = None

pipeline_options_dict = _load_pipeline_options(
environment.get('PIPELINE_OPTIONS'))
default_log_level = _get_log_level_from_options_dict(pipeline_options_dict)
logging.getLogger().setLevel(default_log_level)
_set_log_level_overrides(pipeline_options_dict)

# These are used for dataflow templates.
RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
Expand Down Expand Up @@ -256,16 +257,16 @@ def _get_log_level_from_options_dict(options_dict: dict) -> int:
"""Get log level from options dict's entry `default_sdk_harness_log_level`.
If not specified, default log level is logging.INFO.
"""
log_level = options_dict.get('default_sdk_harness_log_level', 'INFO')

dict_level = options_dict.get('default_sdk_harness_log_level', 'INFO')
log_level = dict_level
if log_level.isdecimal():
log_level = int(log_level)
else:
# labeled log level
log_level = getattr(logging, log_level, None)
if not isinstance(log_level, int):
# unknown log level.
_LOGGER.error("Unknown log level. Use default value INFO.", exc_info=True)
_LOGGER.error("Unknown log level %s. Use default value INFO.", dict_level)
log_level = logging.INFO

return log_level
Expand Down
25 changes: 25 additions & 0 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

# pytype: skip-file

import io
import logging
import unittest

Expand Down Expand Up @@ -100,6 +101,30 @@ def test_runtime_values(self):
self.assertTrue(test_runtime_provider.is_accessible())
self.assertEqual(test_runtime_provider.get(), 37)

def test_create_sdk_harness_log_handler_received_log(self):
# tests that the log handler created in create_harness() does not miss
# logs emitted from create_harness() itself.
logstream = io.StringIO()

class InMemoryHandler(logging.StreamHandler):
def __init__(self, *unused):
super().__init__(stream=logstream)

with unittest.mock.patch(
'apache_beam.runners.worker.sdk_worker_main.FnApiLogRecordHandler',
InMemoryHandler):
sdk_worker_main.create_harness({
'LOGGING_API_SERVICE_DESCRIPTOR': '',
'CONTROL_API_SERVICE_DESCRIPTOR': '',
'PIPELINE_OPTIONS': '{"default_sdk_harness_log_level":"INVALID",'
'"sdk_harness_log_level_overrides":"{INVALID_JSON}"}',
},
dry_run=True)
logstream.seek(0)
logs = logstream.read()
self.assertIn('Unknown log level', logs)
self.assertIn('Unable to parse sdk_harness_log_level_overrides', logs)

def test_import_beam_plugins(self):
sdk_worker_main._import_beam_plugins(BeamPlugin.get_all_plugin_paths())

Expand Down

0 comments on commit eb071fa

Please sign in to comment.