Skip to content

Commit

Permalink
Make LazyLoader thread safe
Browse files Browse the repository at this point in the history
Even when `multiprocessing` is set to `True`, there is a case where
multiple threads in the same process attempt to use the same LazyLoader
object. When using the reactor and reacting to an event that will call a
runner, `salt.utils.reactor.ReactWrap.runner` will invoke
`self.pool.fire_async(self.client_cache['runner'].low, args=(fun, kwargs))`
potentially multiple times, each time using a thread from
`salt.utils.process.ThreadPool`. Each thread will invoke
`salt.client.mixins.SyncClientMixin.low` which in turn will invoke its
`_low` and call `salt.utils.job.store_job`. `salt.utils.job.store_job`
will invoke the LazyLoader object for the returner.

Since the LazyLoader object is not thread safe, occasional failures will
occur which will reduce the reliability of the overall system.

Let's examine why a function such as `LazyLoader._load` is not thread safe.
Any time the GIL is released, it allows another thread to run. There are
various types of operations that could release the GIL, but in this
particular case they are file operations that happen in both
`refresh_file_mapping` and `_load_module`. Note that if you add `print`
statements, those also release the GIL (and make the problem more
frequent). In the failure case, `refresh_file_mapping` releases the
GIL, another thread loads the module, and then when the original thread
runs again it will fail when `_inner_load` runs the second time (after
`refresh_file_mapping`). The failure is because the module is already in
`self.loaded_files`, so it is skipped over and `_inner_load` returns
`False` even though the required `key` is already in `self._dict`. Since
adding in stuff like `print` statements, or other logic also adds points in
the code that allow thread switches, the most robust solution to such a
problem is to use a mutex (as opposed to rechecking if `key` now appears
in `self._dict` at certain checkpoints).

This solution adds such a mutex and uses it in key places to ensure
integrity.

Signed-off-by: Sergey Kizunov <[email protected]>
  • Loading branch information
skizunov committed Mar 23, 2018
1 parent 4e7466a commit c624aa4
Showing 1 changed file with 60 additions and 52 deletions.
112 changes: 60 additions & 52 deletions salt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import inspect
import tempfile
import functools
import threading
import types
from collections import MutableMapping
from zipimport import zipimporter
Expand Down Expand Up @@ -1100,7 +1101,8 @@ def __init__(self,
self.disabled = set(self.opts.get('disable_{0}{1}'.format(
self.tag, '' if self.tag[-1] == 's' else 's'), []))

self.refresh_file_mapping()
self._lock = threading.RLock()
self._refresh_file_mapping()

super(LazyLoader, self).__init__() # late init the lazy loader
# create all of the import namespaces
Expand Down Expand Up @@ -1167,7 +1169,7 @@ def missing_fun_string(self, function_name):
else:
return '\'{0}\' __virtual__ returned False'.format(mod_name)

def refresh_file_mapping(self):
def _refresh_file_mapping(self):
'''
refresh the mapping of the FS on disk
'''
Expand Down Expand Up @@ -1285,15 +1287,16 @@ def clear(self):
'''
Clear the dict
'''
super(LazyLoader, self).clear() # clear the lazy loader
self.loaded_files = set()
self.missing_modules = {}
self.loaded_modules = {}
# if we have been loaded before, lets clear the file mapping since
# we obviously want a re-do
if hasattr(self, 'opts'):
self.refresh_file_mapping()
self.initial_load = False
with self._lock:
super(LazyLoader, self).clear() # clear the lazy loader
self.loaded_files = set()
self.missing_modules = {}
self.loaded_modules = {}
# if we have been loaded before, lets clear the file mapping since
# we obviously want a re-do
if hasattr(self, 'opts'):
self._refresh_file_mapping()
self.initial_load = False

def __prep_mod_opts(self, opts):
'''
Expand Down Expand Up @@ -1504,14 +1507,14 @@ def _load_module(self, name):
virtual_funcs_to_process = ['__virtual__'] + self.virtual_funcs
for virtual_func in virtual_funcs_to_process:
virtual_ret, module_name, virtual_err, virtual_aliases = \
self.process_virtual(mod, module_name)
self._process_virtual(mod, module_name)
if virtual_err is not None:
log.trace(
'Error loading %s.%s: %s',
self.tag, module_name, virtual_err
)

# if process_virtual returned a non-True value then we are
# if _process_virtual returned a non-True value then we are
# supposed to not process this module
if virtual_ret is not True and module_name not in self.missing_modules:
# If a module has information about why it could not be loaded, record it
Expand Down Expand Up @@ -1601,56 +1604,61 @@ def _load(self, key):
if not isinstance(key, six.string_types) or '.' not in key:
raise KeyError
mod_name, _ = key.split('.', 1)
if mod_name in self.missing_modules:
return True
# if the modulename isn't in the whitelist, don't bother
if self.whitelist and mod_name not in self.whitelist:
raise KeyError

def _inner_load(mod_name):
for name in self._iter_files(mod_name):
if name in self.loaded_files:
continue
# if we got what we wanted, we are done
if self._load_module(name) and key in self._dict:
return True
return False
with self._lock:
# It is possible that the key is in the dictionary after
# acquiring the lock due to another thread loading it.
if mod_name in self.missing_modules or key in self._dict:
return True
# if the modulename isn't in the whitelist, don't bother
if self.whitelist and mod_name not in self.whitelist:
raise KeyError

def _inner_load(mod_name):
for name in self._iter_files(mod_name):
if name in self.loaded_files:
continue
# if we got what we wanted, we are done
if self._load_module(name) and key in self._dict:
return True
return False

# try to load the module
ret = None
reloaded = False
# re-scan up to once, IOErrors or a failed load cause re-scans of the
# filesystem
while True:
try:
ret = _inner_load(mod_name)
if not reloaded and ret is not True:
self.refresh_file_mapping()
reloaded = True
# try to load the module
ret = None
reloaded = False
# re-scan up to once, IOErrors or a failed load cause re-scans of the
# filesystem
while True:
try:
ret = _inner_load(mod_name)
if not reloaded and ret is not True:
self._refresh_file_mapping()
reloaded = True
continue
break
except IOError:
if not reloaded:
self._refresh_file_mapping()
reloaded = True
continue
break
except IOError:
if not reloaded:
self.refresh_file_mapping()
reloaded = True
continue

return ret

def _load_all(self):
'''
Load all of them
'''
for name in self.file_mapping:
if name in self.loaded_files or name in self.missing_modules:
continue
self._load_module(name)
with self._lock:
for name in self.file_mapping:
if name in self.loaded_files or name in self.missing_modules:
continue
self._load_module(name)

self.loaded = True
self.loaded = True

def reload_modules(self):
self.loaded_files = set()
self._load_all()
with self._lock:
self.loaded_files = set()
self._load_all()

def _apply_outputter(self, func, mod):
'''
Expand All @@ -1661,7 +1669,7 @@ def _apply_outputter(self, func, mod):
if func.__name__ in outp:
func.__outputter__ = outp[func.__name__]

def process_virtual(self, mod, module_name, virtual_func='__virtual__'):
def _process_virtual(self, mod, module_name, virtual_func='__virtual__'):
'''
Given a loaded module and its default name determine its virtual name
Expand Down

0 comments on commit c624aa4

Please sign in to comment.