Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread safe loader 2 #48598

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions salt/cloud/clouds/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
import binascii
import datetime
import base64
import msgpack
import re
import decimal

Expand All @@ -100,6 +99,7 @@
import salt.utils.files
import salt.utils.hashutils
import salt.utils.json
import salt.utils.msgpack
import salt.utils.stringutils
import salt.utils.yaml
from salt._compat import ElementTree as ET
Expand Down Expand Up @@ -4997,7 +4997,7 @@ def _parse_pricing(url, name):
__opts__['cachedir'], 'ec2-pricing-{0}.p'.format(name)
)
with salt.utils.files.fopen(outfile, 'w') as fho:
msgpack.dump(regions, fho)
salt.utils.msgpack.dump(regions, fho)

return True

Expand Down Expand Up @@ -5065,7 +5065,8 @@ def show_pricing(kwargs=None, call=None):
update_pricing({'type': name}, 'function')

with salt.utils.files.fopen(pricefile, 'r') as fhi:
ec2_price = salt.utils.stringutils.to_unicode(msgpack.load(fhi))
ec2_price = salt.utils.stringutils.to_unicode(
salt.utils.msgpack.load(fhi))

region = get_location(profile)
size = profile.get('size', None)
Expand Down
6 changes: 3 additions & 3 deletions salt/cloud/clouds/gce.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import re
import pprint
import logging
import msgpack
from ast import literal_eval
from salt.utils.versions import LooseVersion as _LooseVersion

Expand Down Expand Up @@ -91,6 +90,7 @@
import salt.utils.cloud
import salt.utils.files
import salt.utils.http
import salt.utils.msgpack
import salt.config as config
from salt.cloud.libcloudfuncs import * # pylint: disable=redefined-builtin,wildcard-import,unused-wildcard-import
from salt.exceptions import (
Expand Down Expand Up @@ -2642,7 +2642,7 @@ def update_pricing(kwargs=None, call=None):
__opts__['cachedir'], 'gce-pricing.p'
)
with salt.utils.files.fopen(outfile, 'w') as fho:
msgpack.dump(price_json['dict'], fho)
salt.utils.msgpack.dump(price_json['dict'], fho)

return True

Expand Down Expand Up @@ -2681,7 +2681,7 @@ def show_pricing(kwargs=None, call=None):
update_pricing()

with salt.utils.files.fopen(pricefile, 'r') as fho:
sizes = msgpack.load(fho)
sizes = salt.utils.msgpack.load(fho)

per_hour = float(sizes['gcp_price_list'][size][region])

Expand Down
6 changes: 3 additions & 3 deletions salt/engines/stalekey.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import salt.key
import salt.utils.files
import salt.utils.minions
import salt.utils.msgpack
import salt.wheel

# Import 3rd-party libs
from salt.ext import six
import msgpack

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -60,7 +60,7 @@ def start(interval=3600, expire=604800):
if os.path.exists(presence_file):
try:
with salt.utils.files.fopen(presence_file, 'r') as f:
minions = msgpack.load(f)
minions = salt.utils.msgpack.load(f)
except IOError as e:
log.error('Could not open presence file %s: %s', presence_file, e)
time.sleep(interval)
Expand Down Expand Up @@ -95,7 +95,7 @@ def start(interval=3600, expire=604800):

try:
with salt.utils.files.fopen(presence_file, 'w') as f:
msgpack.dump(minions, f)
salt.utils.msgpack.dump(minions, f)
except IOError as e:
log.error('Could not write to presence file %s: %s', presence_file, e)
time.sleep(interval)
7 changes: 7 additions & 0 deletions salt/key.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
from salt.ext.six.moves import input, zip_longest
# pylint: enable=import-error,no-name-in-module,redefined-builtin

# We do not always need msgpack, so we do not want to fail here if msgpack is
# not available.
try:
import salt.utils.msgpack
except ImportError:
pass

log = logging.getLogger(__name__)


Expand Down
74 changes: 73 additions & 1 deletion salt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging
import inspect
import tempfile
import threading
import functools
import threading
import traceback
Expand All @@ -33,6 +34,7 @@
import salt.utils.lazy
import salt.utils.odict
import salt.utils.platform
import salt.utils.thread_local_proxy
import salt.utils.versions
import salt.utils.stringutils
from salt.exceptions import LoaderError
Expand Down Expand Up @@ -1064,6 +1066,76 @@ def _mod_type(module_path):
return 'ext'


def _inject_into_mod(mod, name, value, force_lock=False):
'''
Inject a variable into a module. This is used to inject "globals" like
``__salt__``, ``__pillar``, or ``grains``.

Instead of injecting the value directly, a ``ThreadLocalProxy`` is created.
If such a proxy is already present under the specified name, it is updated
with the new value. This update only affects the current thread, so that
the same name can refer to different values depending on the thread of
execution.

This is important for data that is not truly global. For example, pillar
data might be dynamically overriden through function parameters and thus
the actual values available in pillar might depend on the thread that is
calling a module.

mod:
module object into which the value is going to be injected.

name:
name of the variable that is injected into the module.

value:
value that is injected into the variable. The value is not injected
directly, but instead set as the new reference of the proxy that has
been created for the variable.

force_lock:
whether the lock should be acquired before checking whether a proxy
object for the specified name has already been injected into the
module. If ``False`` (the default), this function checks for the
module's variable without acquiring the lock and only acquires the lock
if a new proxy has to be created and injected.
'''
from salt.utils.thread_local_proxy import ThreadLocalProxy
old_value = getattr(mod, name, None)
# We use a double-checked locking scheme in order to avoid taking the lock
# when a proxy object has already been injected.
# In most programming languages, double-checked locking is considered
# unsafe when used without explicit memory barriers because one might read
# an uninitialized value. In CPython it is safe due to the global
# interpreter lock (GIL). In Python implementations that do not have the
# GIL, it could be unsafe, but at least Jython also guarantees that (for
# Python objects) memory is not corrupted when writing and reading without
# explicit synchronization
# (http://www.jython.org/jythonbook/en/1.0/Concurrency.html).
# Please note that in order to make this code safe in a runtime environment
# that does not make this guarantees, it is not sufficient. The
# ThreadLocalProxy must also be created with fallback_to_shared set to
# False or a lock must be added to the ThreadLocalProxy.
if force_lock:
with _inject_into_mod.lock:
if isinstance(old_value, ThreadLocalProxy):
ThreadLocalProxy.set_reference(old_value, value)
else:
setattr(mod, name, ThreadLocalProxy(value, True))
else:
if isinstance(old_value, ThreadLocalProxy):
ThreadLocalProxy.set_reference(old_value, value)
else:
_inject_into_mod(mod, name, value, True)


# Lock used when injecting globals. This is needed to avoid a race condition
# when two threads try to load the same module concurrently. This must be
# outside the loader because there might be more than one loader for the same
# namespace.
_inject_into_mod.lock = threading.RLock()


# TODO: move somewhere else?
class FilterDictWrapper(MutableMapping):
'''
Expand Down Expand Up @@ -1617,7 +1689,7 @@ def _load_module(self, name):

# pack whatever other globals we were asked to
for p_name, p_value in six.iteritems(self.pack):
setattr(mod, p_name, p_value)
_inject_into_mod(mod, p_name, p_value)

module_name = mod.__name__.rsplit('.', 1)[-1]

Expand Down
5 changes: 4 additions & 1 deletion salt/log/handlers/fluent_mod.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,17 @@
try:
# Attempt to import msgpack
import msgpack
import salt.utils.msgpack
# There is a serialization issue on ARM and potentially other platforms
# for some msgpack bindings, check for it
if msgpack.loads(msgpack.dumps([1, 2, 3]), use_list=True) is None:
raise ImportError
import salt.utils.msgpack
except ImportError:
# Fall back to msgpack_pure
try:
import msgpack_pure as msgpack
import salt.utils.msgpack
except ImportError:
# TODO: Come up with a sane way to get a configured logfile
# and write to the logfile when this error is hit also
Expand Down Expand Up @@ -455,7 +458,7 @@ def _make_packet(self, label, timestamp, data):
packet = (tag, timestamp, data)
if self.verbose:
print(packet)
return msgpack.packb(packet)
return salt.utils.msgpack.packb(packet, _msgpack_module=msgpack)

def _send(self, bytes_):
self.lock.acquire()
Expand Down
2 changes: 1 addition & 1 deletion salt/modules/saltcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@
import logging
import os
import time
from json import loads, dumps
from salt.utils.json import loads, dumps

# Import Salt libs
import salt.utils.files
Expand Down
10 changes: 5 additions & 5 deletions salt/modules/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import salt.utils.hashutils
import salt.utils.jid
import salt.utils.json
import salt.utils.msgpack
import salt.utils.platform
import salt.utils.state
import salt.utils.stringutils
Expand All @@ -45,7 +46,6 @@

# Import 3rd-party libs
from salt.ext import six
import msgpack

__proxyenabled__ = ['*']

Expand Down Expand Up @@ -185,7 +185,7 @@ def _get_pause(jid, state_id=None):
data[state_id] = {}
if os.path.exists(pause_path):
with salt.utils.files.fopen(pause_path, 'rb') as fp_:
data = msgpack.loads(fp_.read())
data = salt.utils.msgpack.loads(fp_.read())
return data, pause_path


Expand Down Expand Up @@ -256,7 +256,7 @@ def soft_kill(jid, state_id=None):
data, pause_path = _get_pause(jid, state_id)
data[state_id]['kill'] = True
with salt.utils.files.fopen(pause_path, 'wb') as fp_:
fp_.write(msgpack.dumps(data))
fp_.write(salt.utils.msgpack.dumps(data))


def pause(jid, state_id=None, duration=None):
Expand Down Expand Up @@ -291,7 +291,7 @@ def pause(jid, state_id=None, duration=None):
if duration:
data[state_id]['duration'] = int(duration)
with salt.utils.files.fopen(pause_path, 'wb') as fp_:
fp_.write(msgpack.dumps(data))
fp_.write(salt.utils.msgpack.dumps(data))


def resume(jid, state_id=None):
Expand Down Expand Up @@ -325,7 +325,7 @@ def resume(jid, state_id=None):
if state_id == '__all__':
data = {}
with salt.utils.files.fopen(pause_path, 'wb') as fp_:
fp_.write(msgpack.dumps(data))
fp_.write(salt.utils.msgpack.dumps(data))


def orchestrate(mods,
Expand Down
5 changes: 1 addition & 4 deletions salt/modules/win_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@
GLOBAL_ONLY
)
from salt.ext import six
try:
import msgpack
except ImportError:
import msgpack_pure as msgpack # pylint: disable=import-error
import salt.utils.gitfs
import salt.utils.msgpack
# pylint: enable=unused-import

log = logging.getLogger(__name__)
Expand Down
36 changes: 27 additions & 9 deletions salt/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
#sys.exit(salt.defaults.exitcodes.EX_GENERIC)


if HAS_MSGPACK:
import salt.utils.msgpack


if HAS_MSGPACK and not hasattr(msgpack, 'exceptions'):
class PackValueError(Exception):
'''
Expand All @@ -79,14 +83,15 @@ def package(payload):
This method for now just wraps msgpack.dumps, but it is here so that
we can make the serialization a custom option in the future with ease.
'''
return msgpack.dumps(payload)
return salt.utils.msgpack.dumps(payload, _msgpack_module=msgpack)


def unpackage(package_):
'''
Unpackages a payload
'''
return msgpack.loads(package_, use_list=True)
return salt.utils.msgpack.loads(package_, use_list=True,
_msgpack_module=msgpack)


def format_payload(enc, **kwargs):
Expand Down Expand Up @@ -146,12 +151,19 @@ def ext_type_decoder(code, data):
# that under Python 2 we can still work with older versions
# of msgpack.
try:
ret = msgpack.loads(msg, use_list=True, ext_hook=ext_type_decoder, encoding=encoding)
ret = salt.utils.msgpack.loads(msg, use_list=True,
ext_hook=ext_type_decoder,
encoding=encoding,
_msgpack_module=msgpack)
except UnicodeDecodeError:
# msg contains binary data
ret = msgpack.loads(msg, use_list=True, ext_hook=ext_type_decoder)
ret = salt.utils.msgpack.loads(msg, use_list=True,
ext_hook=ext_type_decoder,
_msgpack_module=msgpack)
else:
ret = msgpack.loads(msg, use_list=True, ext_hook=ext_type_decoder)
ret = salt.utils.msgpack.loads(msg, use_list=True,
ext_hook=ext_type_decoder,
_msgpack_module=msgpack)
if six.PY3 and encoding is None and not raw:
ret = salt.transport.frame.decode_embedded_strs(ret)
except Exception as exc:
Expand Down Expand Up @@ -218,9 +230,12 @@ def ext_type_encoder(obj):
# Due to this, if we don't need it, don't pass it at all so
# that under Python 2 we can still work with older versions
# of msgpack.
return msgpack.dumps(msg, default=ext_type_encoder, use_bin_type=use_bin_type)
return salt.utils.msgpack.dumps(msg, default=ext_type_encoder,
use_bin_type=use_bin_type,
_msgpack_module=msgpack)
else:
return msgpack.dumps(msg, default=ext_type_encoder)
return salt.utils.msgpack.dumps(msg, default=ext_type_encoder,
_msgpack_module=msgpack)
except (OverflowError, msgpack.exceptions.PackValueError):
# msgpack<=0.4.6 don't call ext encoder on very long integers raising the error instead.
# Convert any very long longs to strings and call dumps again.
Expand All @@ -243,9 +258,12 @@ def verylong_encoder(obj):

msg = verylong_encoder(msg)
if msgpack.version >= (0, 4, 0):
return msgpack.dumps(msg, default=ext_type_encoder, use_bin_type=use_bin_type)
return salt.utils.msgpack.dumps(msg, default=ext_type_encoder,
use_bin_type=use_bin_type,
_msgpack_module=msgpack)
else:
return msgpack.dumps(msg, default=ext_type_encoder)
return salt.utils.msgpack.dumps(msg, default=ext_type_encoder,
_msgpack_module=msgpack)

def dump(self, msg, fn_):
'''
Expand Down
Loading