Skip to content

Commit

Permalink
Fail when main session cannot be loaded. (#25617)
Browse files Browse the repository at this point in the history
  • Loading branch information
tvalentyn authored Mar 7, 2023
1 parent b3a9aa7 commit 2011cfc
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 22 deletions.
14 changes: 1 addition & 13 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@
## New Features / Improvements

* The Flink runner now supports Flink 1.16.x ([#25046](https://github.com/apache/beam/issues/25046)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* If a main session fails to load, the pipeline will now fail at worker startup. ([#25401](https://github.com/apache/beam/issues/25401)).

## Deprecations

Expand Down Expand Up @@ -97,7 +96,6 @@

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Added in JmsIO a retry policy for failed publications (Java) ([#24971](https://github.com/apache/beam/issues/24971)).
* Support for `LZMA` compression/decompression of text files added to the Python SDK ([#25316](https://github.com/apache/beam/issues/25316))
* Added ReadFrom/WriteTo Csv/Json as top-level transforms to the Python SDK.
Expand Down Expand Up @@ -139,11 +137,6 @@

# [2.45.0] - 2023-02-15

## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down Expand Up @@ -174,11 +167,6 @@

# [2.44.0] - 2023-01-12

## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).

## I/Os

* Support for Bigtable sink (Write and WriteBatch) added (Go) ([#23324](https://github.com/apache/beam/issues/23324)).
Expand Down
7 changes: 6 additions & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
_LOGGER = logging.getLogger(__name__)

DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S = 60

# The number of ProcessBundleRequest instruction ids the BundleProcessorCache
# will remember for not running instructions.
MAX_KNOWN_NOT_RUNNING_INSTRUCTIONS = 1000
Expand Down Expand Up @@ -172,12 +171,16 @@ def __init__(
# Heap dump through status api is disabled by default
enable_heap_dump=False, # type: bool
data_sampler=None, # type: Optional[data_sampler.DataSampler]
# Unrecoverable SDK harness initialization error (if any)
# that should be reported to the runner when proocessing the first bundle.
deferred_exception=None, # type: Optional[Exception]
):
# type: (...) -> None
self._alive = True
self._worker_index = 0
self._worker_id = worker_id
self._state_cache = StateCache(state_cache_size)
self._deferred_exception = deferred_exception
options = [('grpc.max_receive_message_length', -1),
('grpc.max_send_message_length', -1)]
if credentials is None:
Expand Down Expand Up @@ -308,6 +311,8 @@ def _request_register(self, request):

def _request_process_bundle(self, request):
# type: (beam_fn_api_pb2.InstructionRequest) -> None
if self._deferred_exception:
raise self._deferred_exception
self._bundle_processor_cache.activate(request.instruction_id)
self._request_execute(request)

Expand Down
26 changes: 18 additions & 8 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def _import_beam_plugins(plugins):
def create_harness(environment, dry_run=False):
"""Creates SDK Fn Harness."""

deferred_exception = None
if 'LOGGING_API_SERVICE_DESCRIPTOR' in environment:
try:
logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor()
Expand Down Expand Up @@ -114,15 +115,23 @@ def create_harness(environment, dry_run=False):
if pickle_library != pickler.USE_CLOUDPICKLE:
try:
_load_main_session(semi_persistent_directory)
except CorruptMainSessionException:
except LoadMainSessionException:
exception_details = traceback.format_exc()
_LOGGER.error(
'Could not load main session: %s', exception_details, exc_info=True)
raise
except Exception: # pylint: disable=broad-except
summary = (
"Could not load main session. Inspect which external dependencies "
"are used in the main module of your pipeline. Verify that "
"corresponding packages are installed in the pipeline runtime "
"environment and their installed versions match the versions used in "
"pipeline submission environment. For more information, see: https://"
"beam.apache.org/documentation/sdks/python-pipeline-dependencies/")
_LOGGER.error(summary, exc_info=True)
exception_details = traceback.format_exc()
_LOGGER.error(
'Could not load main session: %s', exception_details, exc_info=True)
deferred_exception = LoadMainSessionException(
f"{summary} {exception_details}")

_LOGGER.info(
'Pipeline_options: %s',
Expand Down Expand Up @@ -159,7 +168,8 @@ def create_harness(environment, dry_run=False):
profiler_factory=profiler.Profile.factory_from_options(
sdk_pipeline_options.view_as(ProfilingOptions)),
enable_heap_dump=enable_heap_dump,
data_sampler=data_sampler)
data_sampler=data_sampler,
deferred_exception=deferred_exception)
return fn_log_handler, sdk_harness, sdk_pipeline_options


Expand Down Expand Up @@ -306,10 +316,9 @@ def _set_log_level_overrides(options_dict: dict) -> None:
"Error occurred when setting log level for %s: %s", module_name, e)


class CorruptMainSessionException(Exception):
class LoadMainSessionException(Exception):
"""
Used to crash this worker if a main session file was provided but
is not valid.
Used to crash this worker if a main session file failed to load.
"""
pass

Expand All @@ -325,7 +334,8 @@ def _load_main_session(semi_persistent_directory):
# This can happen if the worker fails to download the main session.
# Raise a fatal error and crash this worker, forcing a restart.
if os.path.getsize(session_file) == 0:
raise CorruptMainSessionException(
# Potenitally transient error, unclear if still happening.
raise LoadMainSessionException(
'Session file found, but empty: %s. Functions defined in __main__ '
'(interactive session) will almost certainly fail.' %
(session_file, ))
Expand Down

0 comments on commit 2011cfc

Please sign in to comment.