Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On start, update git_pillar on second loop #56316

Merged
merged 2 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 46 additions & 53 deletions salt/master.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
# -*- coding: utf-8 -*-
"""
This module contains all of the routines needed to set up a master server, this
involves preparing the three listeners and the workers needed by the master.
"""

# Import python libs
from __future__ import absolute_import, print_function, unicode_literals, with_statement

import collections
import copy
Expand Down Expand Up @@ -103,7 +101,7 @@
log = logging.getLogger(__name__)


class SMaster(object):
class SMaster:
"""
Create a simple salt-master, this will generate the top-level master
"""
Expand Down Expand Up @@ -160,7 +158,7 @@ def __init__(self, opts, **kwargs):

:param dict opts: The salt options
"""
super(Maintenance, self).__init__(**kwargs)
super().__init__(**kwargs)
self.opts = opts
# How often do we perform the maintenance tasks
self.loop_interval = int(self.opts["loop_interval"])
Expand Down Expand Up @@ -244,7 +242,8 @@ def run(self):

# Make Start Times
last = int(time.time())
last_git_pillar_update = last
# update git_pillar on first loop
last_git_pillar_update = 0

git_pillar_update_interval = self.opts.get("git_pillar_update_interval", 0)
old_present = set()
Expand Down Expand Up @@ -285,16 +284,10 @@ def handle_key_cache(self):
keys.append(fn_)
log.debug("Writing master key cache")
# Write a temporary file securely
if six.PY2:
with salt.utils.atomicfile.atomic_open(
os.path.join(self.opts["pki_dir"], acc, ".key_cache")
) as cache_file:
self.serial.dump(keys, cache_file)
else:
with salt.utils.atomicfile.atomic_open(
os.path.join(self.opts["pki_dir"], acc, ".key_cache"), mode="wb"
) as cache_file:
self.serial.dump(keys, cache_file)
with salt.utils.atomicfile.atomic_open(
os.path.join(self.opts["pki_dir"], acc, ".key_cache"), mode="wb"
) as cache_file:
self.serial.dump(keys, cache_file)

def handle_key_rotate(self, now):
"""
Expand Down Expand Up @@ -324,14 +317,14 @@ def handle_key_rotate(self, now):

if to_rotate:
log.info("Rotating master AES key")
for secret_key, secret_map in six.iteritems(SMaster.secrets):
for secret_key, secret_map in SMaster.secrets.items():
# should be unnecessary-- since no one else should be modifying
with secret_map["secret"].get_lock():
secret_map["secret"].value = salt.utils.stringutils.to_bytes(
secret_map["reload"]()
)
self.event.fire_event(
{"rotate_{0}_key".format(secret_key): True}, tag="key"
{"rotate_{}_key".format(secret_key): True}, tag="key"
)
self.rotate = now
if self.opts.get("ping_on_rotate"):
Expand Down Expand Up @@ -389,7 +382,7 @@ class FileserverUpdate(salt.utils.process.SignalHandlingProcess):
"""

def __init__(self, opts, **kwargs):
super(FileserverUpdate, self).__init__(**kwargs)
super().__init__(**kwargs)
self.opts = opts
self.update_threads = {}
# Avoid circular import
Expand Down Expand Up @@ -421,15 +414,15 @@ def fill_buckets(self):
update_intervals = self.fileserver.update_intervals()
self.buckets = {}
for backend in self.fileserver.backends():
fstr = "{0}.update".format(backend)
fstr = "{}.update".format(backend)
try:
update_func = self.fileserver.servers[fstr]
except KeyError:
log.debug("No update function for the %s filserver backend", backend)
continue
if backend in update_intervals:
# Variable intervals are supported for this backend
for id_, interval in six.iteritems(update_intervals[backend]):
for id_, interval in update_intervals[backend].items():
if not interval:
# Don't allow an interval of 0
interval = DEFAULT_INTERVAL
Expand All @@ -451,7 +444,7 @@ def fill_buckets(self):
# nothing to pass to the backend's update func, so we'll just
# set the value to None.
try:
interval_key = "{0}_update_interval".format(backend)
interval_key = "{}_update_interval".format(backend)
interval = self.opts[interval_key]
except KeyError:
interval = DEFAULT_INTERVAL
Expand All @@ -476,7 +469,7 @@ def _do_update():
"interval of %d",
interval,
)
for backend, update_args in six.iteritems(backends):
for backend, update_args in backends.items():
backend_name, update_func = backend
try:
if update_args:
Expand Down Expand Up @@ -622,7 +615,7 @@ def _pre_flight(self):
try:
os.chdir("/")
except OSError as err:
errors.append("Cannot change to root directory ({0})".format(err))
errors.append("Cannot change to root directory ({})".format(err))

if self.opts.get("fileserver_verify_config", True):
# Avoid circular import
Expand All @@ -632,15 +625,15 @@ def _pre_flight(self):
if not fileserver.servers:
errors.append(
"Failed to load fileserver backends, the configured backends "
"are: {0}".format(", ".join(self.opts["fileserver_backend"]))
"are: {}".format(", ".join(self.opts["fileserver_backend"]))
)
else:
# Run init() for all backends which support the function, to
# double-check configuration
try:
fileserver.init()
except salt.exceptions.FileserverConfigError as exc:
critical_errors.append("{0}".format(exc))
critical_errors.append("{}".format(exc))

if not self.opts["fileserver_backend"]:
errors.append("No fileserver backends are configured")
Expand All @@ -660,7 +653,7 @@ def _pre_flight(self):
git_pillars = [
x
for x in self.opts.get("ext_pillar", [])
if "git" in x and not isinstance(x["git"], six.string_types)
if "git" in x and not isinstance(x["git"], str)
]
except TypeError:
git_pillars = []
Expand Down Expand Up @@ -862,7 +855,7 @@ def __init__(self, hopts, **kwargs):

:param dict hopts: The halite options
"""
super(Halite, self).__init__(**kwargs)
super().__init__(**kwargs)
self.hopts = hopts

# __setstate__ and __getstate__ are only used on Windows.
Expand Down Expand Up @@ -907,7 +900,7 @@ def __init__(self, opts, key, mkey, secrets=None, **kwargs):
:rtype: ReqServer
:returns: Request server
"""
super(ReqServer, self).__init__(**kwargs)
super().__init__(**kwargs)
self.opts = opts
self.master_key = mkey
# Prepare the AES key
Expand Down Expand Up @@ -939,7 +932,7 @@ def __getstate__(self):

def _handle_signals(self, signum, sigframe): # pylint: disable=unused-argument
self.destroy(signum)
super(ReqServer, self)._handle_signals(signum, sigframe)
super()._handle_signals(signum, sigframe)

def __bind(self):
"""
Expand Down Expand Up @@ -1003,7 +996,7 @@ def __bind(self):
# signal handlers
with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM):
for ind in range(int(self.opts["worker_threads"])):
name = "MWorker-{0}".format(ind)
name = "MWorker-{}".format(ind)
self.process_manager.add_process(
MWorker,
args=(self.opts, self.master_key, self.key, req_channels, name),
Expand Down Expand Up @@ -1050,7 +1043,7 @@ def __init__(self, opts, mkey, key, req_channels, name, **kwargs):
"""
kwargs["name"] = name
self.name = name
super(MWorker, self).__init__(**kwargs)
super().__init__(**kwargs)
self.opts = opts
self.req_channels = req_channels

Expand All @@ -1066,7 +1059,7 @@ def __init__(self, opts, mkey, key, req_channels, name, **kwargs):
# These methods are only used when pickling so will not be used on
# non-Windows platforms.
def __setstate__(self, state):
super(MWorker, self).__init__(
super().__init__(
log_queue=state["log_queue"], log_queue_level=state["log_queue_level"]
)
self.opts = state["opts"]
Expand All @@ -1091,7 +1084,7 @@ def __getstate__(self):
def _handle_signals(self, signum, sigframe):
for channel in getattr(self, "req_channels", ()):
channel.close()
super(MWorker, self)._handle_signals(signum, sigframe)
super()._handle_signals(signum, sigframe)

def __bind(self):
"""
Expand Down Expand Up @@ -1251,7 +1244,7 @@ def run(self):
self.__bind()


class TransportMethods(object):
class TransportMethods:
"""
Expose methods to the transport layer, methods with their names found in
the class attribute 'expose_methods' will be exposed to the transport layer
Expand Down Expand Up @@ -1369,7 +1362,7 @@ def __verify_minion(self, id_, token):

try:
pub = salt.crypt.get_rsa_pub_key(pub_path)
except (IOError, OSError):
except OSError:
log.warning(
"Salt minion claiming to be %s attempted to communicate with "
"master, but key could not be read and verification was denied.",
Expand Down Expand Up @@ -1715,7 +1708,7 @@ def _pillar(self, load):
self.fs_.update_opts()
if self.opts.get("minion_data_cache", False):
self.masterapi.cache.store(
"minions/{0}".format(load["id"]),
"minions/{}".format(load["id"]),
"data",
{"grains": load["grains"], "pillar": data},
)
Expand Down Expand Up @@ -1790,7 +1783,7 @@ def _return(self, load):
log.trace("Verifying signed event publish from minion")
sig = load.pop("sig")
this_minion_pubkey = os.path.join(
self.opts["pki_dir"], "minions/{0}".format(load["id"])
self.opts["pki_dir"], "minions/{}".format(load["id"])
)
serialized_load = salt.serializers.msgpack.serialize(load)
if not salt.crypt.verify_signature(
Expand Down Expand Up @@ -1833,7 +1826,7 @@ def _syndic_return(self, load):
continue
# if we have a load, save it
if load.get("load"):
fstr = "{0}.save_load".format(self.opts["master_job_cache"])
fstr = "{}.save_load".format(self.opts["master_job_cache"])
self.mminion.returners[fstr](load["jid"], load["load"])

# Register the syndic
Expand All @@ -1848,7 +1841,7 @@ def _syndic_return(self, load):
wfh.write("")

# Format individual return loads
for key, item in six.iteritems(load["return"]):
for key, item in load["return"].items():
ret = {"jid": load["jid"], "id": key}
ret.update(item)
if "master_id" in load:
Expand Down Expand Up @@ -1895,7 +1888,7 @@ def pub_ret(self, load):
auth_cache = os.path.join(self.opts["cachedir"], "publish_auth")
if not os.path.isdir(auth_cache):
os.makedirs(auth_cache)
jid_fn = os.path.join(auth_cache, six.text_type(load["jid"]))
jid_fn = os.path.join(auth_cache, str(load["jid"]))
with salt.utils.files.fopen(jid_fn, "r") as fp_:
if not load["id"] == fp_.read():
return {}
Expand Down Expand Up @@ -2111,8 +2104,8 @@ def runner(self, clear_load):
return {
"error": {
"name": err_name,
"message": 'Authentication failure of type "{0}" occurred for '
"user {1}.".format(auth_type, username),
"message": 'Authentication failure of type "{}" occurred for '
"user {}.".format(auth_type, username),
}
}
elif isinstance(runner_check, dict) and "error" in runner_check:
Expand Down Expand Up @@ -2143,7 +2136,7 @@ def runner(self, clear_load):
"error": {
"name": exc.__class__.__name__,
"args": exc.args,
"message": six.text_type(exc),
"message": str(exc),
}
}

Expand Down Expand Up @@ -2174,8 +2167,8 @@ def wheel(self, clear_load):
return {
"error": {
"name": err_name,
"message": 'Authentication failure of type "{0}" occurred for '
"user {1}.".format(auth_type, username),
"message": 'Authentication failure of type "{}" occurred for '
"user {}.".format(auth_type, username),
}
}
elif isinstance(wheel_check, dict) and "error" in wheel_check:
Expand All @@ -2199,7 +2192,7 @@ def wheel(self, clear_load):
fun = clear_load.pop("fun")
tag = tagify(jid, prefix="wheel")
data = {
"fun": "wheel.{0}".format(fun),
"fun": "wheel.{}".format(fun),
"jid": jid,
"tag": tag,
"user": username,
Expand All @@ -2213,7 +2206,7 @@ def wheel(self, clear_load):
return {"tag": tag, "data": data}
except Exception as exc: # pylint: disable=broad-except
log.error("Exception occurred while introspecting %s: %s", fun, exc)
data["return"] = "Exception occurred in wheel {0}: {1}: {2}".format(
data["return"] = "Exception occurred in wheel {}: {}: {}".format(
fun, exc.__class__.__name__, exc,
)
data["success"] = False
Expand Down Expand Up @@ -2285,7 +2278,7 @@ def publish(self, clear_load):

# Setup authorization list variable and error information
auth_list = auth_check.get("auth_list", [])
err_msg = 'Authentication failure of type "{0}" occurred.'.format(auth_type)
err_msg = 'Authentication failure of type "{}" occurred.'.format(auth_type)

if auth_check.get("error"):
# Authentication error occurred: do not continue.
Expand Down Expand Up @@ -2351,7 +2344,7 @@ def publish(self, clear_load):
"load": {
"jid": None,
"minions": minions,
"error": "Master could not resolve minions for target {0}".format(
"error": "Master could not resolve minions for target {}".format(
clear_load["tgt"]
),
},
Expand Down Expand Up @@ -2398,14 +2391,14 @@ def _prep_jid(self, clear_load, extra):
nocache = extra.get("nocache", False)

# Retrieve the jid
fstr = "{0}.prep_jid".format(self.opts["master_job_cache"])
fstr = "{}.prep_jid".format(self.opts["master_job_cache"])
try:
# Retrieve the jid
jid = self.mminion.returners[fstr](nocache=nocache, passed_jid=passed_jid)
except (KeyError, TypeError):
# The returner is not present
msg = (
"Failed to allocate a jid. The requested returner '{0}' "
"Failed to allocate a jid. The requested returner '{}' "
"could not be loaded.".format(fstr.split(".")[0])
)
log.error(msg)
Expand Down Expand Up @@ -2462,7 +2455,7 @@ def _prep_pub(self, minions, jid, clear_load, extra, missing):
self.event.fire_event(new_job_load, tagify([clear_load["jid"], "new"], "job"))

if self.opts["ext_job_cache"]:
fstr = "{0}.save_load".format(self.opts["ext_job_cache"])
fstr = "{}.save_load".format(self.opts["ext_job_cache"])
save_load_func = True

# Get the returner's save_load arg_spec.
Expand Down Expand Up @@ -2500,7 +2493,7 @@ def _prep_pub(self, minions, jid, clear_load, extra, missing):

# always write out to the master job caches
try:
fstr = "{0}.save_load".format(self.opts["master_job_cache"])
fstr = "{}.save_load".format(self.opts["master_job_cache"])
self.mminion.returners[fstr](clear_load["jid"], clear_load, minions)
except KeyError:
log.critical(
Expand Down
Loading