diff --git a/salt/returners/etcd_return.py b/salt/returners/etcd_return.py index 9afcd335dcd2..ad4d68465cbc 100644 --- a/salt/returners/etcd_return.py +++ b/salt/returners/etcd_return.py @@ -38,7 +38,7 @@ CLI Example: - salt '*' test.ping --return etcd + salt "*" test.ping --return etcd A username and password can be set: @@ -47,13 +47,7 @@ etcd.username: larry # Optional; requires etcd.password to be set etcd.password: 123pass # Optional; requires etcd.username to be set -You can also set a TTL (time to live) value for the returner: - -.. code-block:: yaml - - etcd.ttl: 5 - -Authentication with username and password, and ttl, currently requires the +Authentication with username and password, currently requires the ``master`` branch of ``python-etcd``. You may also specify different roles for read and write operations. First, @@ -63,26 +57,99 @@ etcd.returner_read_profile: my_etcd_read etcd.returner_write_profile: my_etcd_write + +Etcd Returner Schema +-------------------- +The etcd returner has the following schema underneath the path set in the profile: + +job ++++ +The job key contains the jid of each job that has been returned. Underneath this +job are two special keys. One of them is ".load.p" which contains information +about the job when it was created. The other key is ".lock.p" which is responsible +for whether the job is still valid or it is scheduled to be cleaned up. + +The contents if ".lock.p" contains the modifiedIndex of the of the ".load.p" +key and when configured via the "etcd.ttl" or "keep_jobs" will have the ttl +applied to it. When this file is expired via the ttl or explicitly removed by +the administrator, the job will then be scheduled for removal. + +A third key was introduced called ".endtime". This is written alongside both the +load and the lock when the "update_endtime" function is used to store the time +when a job has completed. Similar to the local_cache returner, this job-completion +time can be fetched using the "get_endtime" function. + +event ++++++ +This key is essentially a namespace for all of the events (packages) that are +submitted to the returner. When an event is received, the package for the event +is written under this key using the "tag" parameter for its path. The +modifiedIndex for this key is then cached as the event id although the event id +can actually be arbitrary since the "index" key contains the real modifiedIndex +of the package key. + +minion.job +++++++++++ +Underneath the minion.job key is a list of minions ids. Each minion id contains +the jid of the last job that was returned by the minion. This key is used to +support the external job cache feature of Salt. + +event.cache ++++++++++++ +Underneath this key is a list of all of the events that were received by the +returner. As mentioned before, each event is identified by the modifiedIndex +of the key containing the event package. Underneath each event, there are +three sub-keys. These are the "index" key, the "tag" key, and the "lock" key. + +The "index" key contains the modifiedIndex of the package that was stored under +the event key. This is used to determine the original creator for the event"s +package and is used to keep track of whether the package for the event has +been modified by another event (since event tags can be overwritten preserving +the semantics of the original etcd returner). + +The "lock" key is responsible for informing the maintenance service that the +event is still in use. If the returner is configured via the "etcd.ttl" or +the "keep_jobs" option, this key will have the ttl applied to it. When +the "lock" key has expired or is explicitly removed by the administrator, the +event and its tag will be scheduled for removal. The createdIndex for the +package path is written to this key in case an application wishes to identify +the package path by an index. + +The other key under an event, is the "tag" key. The "tag" key simply contains +the path to the package that was registered as the tag attribute for the event. +The value of the "index" key corresponds to the modifiedIndex of this particular +path. """ from __future__ import absolute_import, print_function, unicode_literals # Import python libs import logging +import time # Import salt libs import salt.utils.jid import salt.utils.json +from salt.ext.six.moves import range try: import salt.utils.etcd_util + from salt.utils.etcd_util import etcd HAS_LIBS = True + except ImportError: HAS_LIBS = False log = logging.getLogger(__name__) -# Define the module's virtual name +Schema = { + "minion-fun": "minion.job", + "package-path": "event", + "event-cache": "event.cache", + "job-cache": "job", +} + +# Define the module"s virtual name __virtualname__ = "etcd" @@ -98,144 +165,1410 @@ def __virtual__(): def _get_conn(opts, profile=None): """ - Establish a connection to etcd + Establish a connection to an etcd profile. """ if profile is None: profile = opts.get("etcd.returner") + + # Grab the returner_root from the options path = opts.get("etcd.returner_root", "/salt/return") - return salt.utils.etcd_util.get_conn(opts, profile), path + + # Calculate the time-to-live for a job while giving etcd.ttl priority. + # The etcd.ttl option specifies the number of seconds, whereas the keep_jobs + # option specifies the number of hours. If any of these values are zero, + # then jobs are forever persistent. + + ttl = opts.get("etcd.ttl", int(opts.get("keep_jobs", 0)) * 60 * 60) + + # Grab a connection using etcd_util, and then return the EtcdClient + # from one of its attributes + wrapper = salt.utils.etcd_util.get_conn(opts, profile) + return wrapper.client, path, ttl def returner(ret): """ - Return data to an etcd server or cluster + Return data to an etcd profile. """ write_profile = __opts__.get("etcd.returner_write_profile") - if write_profile: - ttl = __opts__.get(write_profile, {}).get("etcd.ttl") - else: - ttl = __opts__.get("etcd.ttl") + client, path, ttl = _get_conn(__opts__, write_profile) - client, path = _get_conn(__opts__, write_profile) - # Make a note of this minion for the external job cache - client.set( - "/".join((path, "minions", ret["id"])), ret["jid"], ttl=ttl, + # If a minion is returning a standalone job, update the jid for the load + # when it"s saved since this job came directly from a minion. + if ret["jid"] == "req": + new_jid = prep_jid(nocache=ret.get("nocache", False)) + log.debug( + "sdstack_etcd returner satisfying a new job id request ({jid:s}) with id {new:s} for {data}".format( + jid=ret["jid"], new=new_jid, data=ret + ) + ) + ret["jid"] = new_jid + save_load(new_jid, ret) + + # Update the given minion in the external job cache with the current (latest job) + # This is used by get_fun() to return the last function that was called + minionp = "/".join([path, Schema["minion-fun"], ret["id"]]) + + # We can use the ttl here because our minionp is actually linked to the job + # which will expire according to the ttl anyways.. + log.debug( + "sdstack_etcd returner updating (last) job id of {minion:s} at {path:s} with job {jid:s}".format( + jid=ret["jid"], minion=ret["id"], path=minionp + ) + ) + res = client.write(minionp, ret["jid"], ttl=ttl if ttl > 0 else None) + if not res.newKey: + log.debug( + "sdstack_etcd returner the previous job id {old:s} for {minion:s} at {path:s} was set to {new:s}".format( + old=res._prev_node.value, minion=ret["id"], path=minionp, new=res.value + ) + ) + + # Figure out the path for the specified job and minion + jobp = "/".join([path, Schema["job-cache"], ret["jid"], ret["id"]]) + log.debug( + "sdstack_etcd returner writing job data for {jid:s} to {path:s} with {data}".format( + jid=ret["jid"], path=jobp, data=ret + ) ) + # Iterate through all the fields in the return dict and dump them under the + # jobs/$jid/id/$field key. We aggregate all the exceptions so that if an + # error happens, the rest of the fields will still be written. + exceptions = [] for field in ret: - # Not using os.path.join because we're not dealing with file paths - dest = "/".join((path, "jobs", ret["jid"], ret["id"], field)) - client.set(dest, salt.utils.json.dumps(ret[field]), ttl=ttl) + fieldp = "/".join([jobp, field]) + data = salt.utils.json.dumps(ret[field]) + try: + res = client.write(fieldp, data) + except Exception as E: # pylint: disable=broad-except + log.trace( + "sdstack_etcd returner unable to set field {field:s} for job {jid:s} at {path:s} to {result} due to exception ({exception})".format( + field=field, + jid=ret["jid"], + path=fieldp, + result=ret[field], + exception=E, + ) + ) + exceptions.append((E, field, ret[field])) + continue + log.trace( + "sdstack_etcd returner set field {field:s} for job {jid:s} at {path:s} to {result}".format( + field=field, jid=ret["jid"], path=res.key, result=ret[field] + ) + ) + + # Go back through all the exceptions that occurred while trying to write the + # fields and log them. + for E, field, value in exceptions: + log.exception( + "sdstack_etcd returner exception ({exception}) was raised while trying to set the field {field:s} for job {jid:s} to {value}".format( + exception=E, field=field, jid=ret["jid"], value=value + ) + ) + return def save_load(jid, load, minions=None): """ - Save the load to the specified jid + Save the load to the specified jid. """ - log.debug("sdstack_etcd returner called jid: {0}".format(jid)) write_profile = __opts__.get("etcd.returner_write_profile") - client, path = _get_conn(__opts__, write_profile) - if write_profile: - ttl = __opts__.get(write_profile, {}).get("etcd.ttl") - else: - ttl = __opts__.get("etcd.ttl") - client.set( - "/".join((path, "jobs", jid, ".load.p")), salt.utils.json.dumps(load), ttl=ttl, + client, path, ttl = _get_conn(__opts__, write_profile) + + # Check if the specified jid is "req", as only incorrect code will do this + if jid == "req": + log.debug( + "sdstack_etcd returner was called using a request job id ({jid:s}) with {data}".format( + jid=jid, data=load + ) + ) + + # Build the paths that we"ll use for registration of our job + loadp = "/".join([path, Schema["job-cache"], jid, ".load.p"]) + lockp = "/".join([path, Schema["job-cache"], jid, ".lock.p"]) + + ## Now we can just store the current load + json = salt.utils.json.dumps(load) + + log.debug( + "sdstack_etcd returner storing load data for job {jid:s} to {path:s} with {data}".format( + jid=jid, path=loadp, data=load + ) + ) + try: + res = client.write(loadp, json, prevExist=False) + + # If the key already exists, then warn the user and update the key. There + # isn"t anything we can really do about this because it"s up to Salt really. + except etcd.EtcdAlreadyExist as E: + node = client.read(loadp) + node.value = json + + log.debug( + "sdstack_etcd returner updating load data for job {jid:s} at {path:s} with {data}".format( + jid=jid, path=loadp, data=load + ) + ) + res = client.update(node) + + # If we failed here, it"s okay because the lock won"t get written so this + # essentially means the job will get scheduled for deletion. + except Exception as E: # pylint: disable=broad-except + log.trace( + "sdstack_etcd returner unable to store load for job {jid:s} to the path {path:s} due to exception ({exception}) being raised".format( + jid=jid, path=loadp, exception=E + ) + ) + return + + # Check if the previous node value and the current node value are different + # so we can let the user know that something changed and that some data + # might"ve been lost. + try: + if not res.newKey: + d1, d2 = ( + salt.utils.json.loads(res.value), + salt.utils.json.loads(res._prev_node.value), + ) + j1, j2 = ( + salt.utils.json.dumps(res.value, sort_keys=True), + salt.utils.json.dumps(res._prev_node.value, sort_keys=True), + ) + if j1 != j2: + log.warning( + "sdstack_etcd returner overwrote the load data for job {jid:s} at {path:s} with {data}. Old data was {old}".format( + jid=jid, path=res.key, data=d1, old=d2 + ) + ) + except Exception as E: # pylint: disable=broad-except + log.debug( + "sdstack_etcd returner unable to compare load data for job {jid:s} at {path:s} due to exception ({exception}) being raised".format( + jid=jid, path=loadp, exception=E + ) + ) + if not res.newKey: + log.trace( + "sdstack_etcd returner -- old load data for job {jid:s}: {data}".format( + jid=jid, data=res._prev_node.value + ) + ) + log.trace( + "sdstack_etcd returner -- new load data for job {jid:s}: {data}".format( + jid=jid, data=res.value + ) + ) + + # Since this is when a job is being created, create a lock that we can + # check to see if the job has expired. This allows a user to signal to + # salt that it"s okay to remove the entire key by removing this lock. + log.trace( + "sdstack_etcd returner writing lock file for job {jid:s} at {path:s} using index {index:d}".format( + jid=jid, path=lockp, index=res.modifiedIndex + ) ) + try: + res = client.write(lockp, res.modifiedIndex, ttl=ttl if ttl > 0 else None) + if res.ttl is not None: + log.trace( + "sdstack_etcd returner job {jid:s} at {path:s} will expire in {ttl:d} seconds".format( + jid=jid, path=res.key, ttl=res.ttl + ) + ) + + except Exception as E: # pylint: disable=broad-except + log.trace( + "sdstack_etcd returner unable to write lock for job {jid:s} to the path {path:s} due to exception ({exception}) being raised".format( + jid=jid, path=lockp, exception=E + ) + ) + + return + def save_minions(jid, minions, syndic_id=None): # pylint: disable=unused-argument """ - Included for API consistency + Save/update the minion list for a given jid. The syndic_id argument is + included for API compatibility only. """ + write_profile = __opts__.get("etcd.returner_write_profile") + client, path, _ = _get_conn(__opts__, write_profile) + + # Check if the specified jid is "req", as only incorrect code will do that + if jid == "req": + log.debug( + "sdstack_etcd returner was called with a request job id ({jid:s}) for minions {minions:s}".format( + jid=jid, minions=repr(minions) + ) + ) + + # Figure out the path that our job should be at + jobp = "/".join([path, Schema["job-cache"], jid]) + loadp = "/".join([jobp, ".load.p"]) + + # Now we can try and read the load out of it. + try: + load = client.read(loadp) + + # If it doesn"t exist, then bitch and complain because somebody lied to us + except etcd.EtcdKeyNotFound as E: + log.error( + "sdstack_etcd returner was called with an invalid job id ({jid:s}) for minions {minions:s}".format( + jid=jid, minions=repr(minions) + ) + ) + return + + log.debug( + "sdstack_etcd returner adding minions {syndics:s} for job {jid:s} to {path:s}".format( + jid=jid, + path=jobp, + syndics="" if syndic_id is None else " from syndic {0}".format(syndic_id), + ) + ) + + # Iterate through all of the minions we received and update both the job + # and minion-fun cache with what we know. Most of the other returners + # don"t do this, but it is definitely being called and is necessary for + # syndics to actually work. + exceptions = [] + for minion in minions: + minionp = "/".join([path, Schema["minion-fun"], minion]) + + # Go ahead and write the job to the minion-fun cache and log if we"ve + # overwritten an already existing job id. + log.debug( + "sdstack_etcd returner writing (last) job id of {minion:s}{syndics:s} at {path:s} with job {jid:s}".format( + jid=jid, + path=minionp, + minion=minion, + syndics="" + if syndic_id is None + else " from syndic {0}".format(syndic_id), + ) + ) + try: + client.write(minionp, jid, ttl=load.ttl, prevExist=False) + + # If the minion already exists, then that"s fine as we"ll just update it + # and move on. + except etcd.EtcdAlreadyExist as E: + node = client.read(minionp) + + log.debug( + "sdstack_etcd returner updating previous job id ({old:s}) of {minion:s}{syndics:s} at {path:s} with job {jid:s}".format( + old=node.value, + minion=minion, + jid=jid, + path=node.key, + syndics="" + if syndic_id is None + else " from syndic {0}".format(syndic_id), + ) + ) + + node.value = jid + client.update(node) + + except Exception as E: # pylint: disable=broad-except + log.trace( + "sdstack_etcd returner unable to write job id {jid:s} for minion {minion:s} to {path:s} due to exception ({exception})".format( + jid=jid, minion=minion, path=minionp, exception=E + ) + ) + exceptions.append((E, "job", minion)) + + # Now we can try and update the job. We don"t have much, just the jid, + # the minion, and the master id (syndic_id) + resultp = "/".join([jobp, minion]) + + # One... (jid) + try: + res = client.write("/".join([resultp, "jid"]), jid) + + except Exception as E: # pylint: disable=broad-except + log.trace( + "sdstack_etcd returner unable to write job id {jid:s} to the result for the minion {minion:s} at {path:s} due to exception ({exception})".format( + jid=jid, minion=minion, path="/".join([resultp, "jid"]), exception=E + ) + ) + exceptions.append((E, "result.jid", minion)) + + # Two... (id) + try: + res = client.write("/".join([resultp, "id"]), minion) + + except Exception as E: # pylint: disable=broad-except + log.trace( + "sdstack_etcd returner unable to write minion id {minion:s} to the result for job {jid:s} at {path:s} due to exception ({exception})".format( + jid=jid, minion=minion, path="/".join([resultp, "id"]), exception=E + ) + ) + exceptions.append((E, "result.id", minion)) + + # Three... (master_id) + try: + if syndic_id is not None: + res = client.write("/".join([resultp, "master_id"]), syndic_id) + + except Exception as E: # pylint: disable=broad-except + log.trace( + "sdstack_etcd returner unable to write master_id {syndic:s} to the result for job {jid:s} at {path:s} due to exception ({exception})".format( + jid=jid, + path="/".join([resultp, "master_id"]), + syndic=syndic_id, + exception=E, + ) + ) + exceptions.append((E, "result.master_id", minion)) + + # Crruuunch. + + # Go back through all the exceptions that occurred while trying to write the + # fields and log them. + for E, field, minion in exceptions: + if field == "job": + log.exception( + "sdstack_etcd returner exception ({exception}) was raised while trying to update the function cache for minion {minion:s} to job {jid:s}".format( + exception=E, minion=minion, jid=jid + ) + ) + continue + log.exception( + "sdstack_etcd returner exception ({exception}) was raised while trying to update the {field:s} field in the result for job {jid:s} belonging to minion {minion:s}".format( + exception=E, field=field, minion=minion, jid=jid + ) + ) + return + + +def _purge_jobs(): + write_profile = __opts__.get("etcd.returner_write_profile") + client, path, _ = _get_conn(__opts__, write_profile) + + # Figure out the path that our jobs should exist at + jobp = "/".join([path, Schema["job-cache"]]) + + # Try and read the job directory. If we have a missing key exception then no + # minions have returned anything yet and so we can simply leave. + log.trace( + "sdstack_etcd returner <_purge_jobs> reading jobs at {path:s}".format(path=jobp) + ) + try: + jobs = client.read(jobp) + except etcd.EtcdKeyNotFound as E: + log.debug( + "sdstack_etcd returner <_purge_jobs> no jobs were found at {path:s}".format( + path=jobp + ) + ) + return 0 + + # Iterate through all of the children at our job path while looking for + # the .lock.p key. If one isn"t found, then we can remove this job because + # it has expired. + count = 0 + for job in jobs.leaves: + if not job.dir: + log.warning( + "sdstack_etcd returner <_purge_jobs> found a non-job at {path:s} {expire:s}".format( + path=job.key, + expire="that will need to be manually removed" + if job.ttl is None + else "that will expire in {ttl:d} seconds".format(ttl=job.ttl), + ) + ) + continue + jid = job.key.split("/")[-1] + + # Build our lock path that we"ll use to see if the job is alive + lockp = "/".join([job.key, ".lock.p"]) + + # Ping it to see if it"s alive + log.trace( + "sdstack_etcd returner <_purge_jobs> checking lock for job {jid:s} at {path:s}".format( + jid=jid, path=lockp + ) + ) + try: + res = client.read(lockp) + + log.debug( + "sdstack_etcd returner <_purge_jobs> job {jid:s} at {path:s} is still alive and {expire:s}".format( + jid=jid, + path=res.key, + expire="will need to be manually removed" + if res.ttl is None + else "will expire in {ttl:d} seconds".format(ttl=res.ttl), + ) + ) + + # It"s not, so the job is dead and we can remove it + except etcd.EtcdKeyNotFound as E: + log.debug( + "sdstack_etcd returner <_purge_jobs> job {jid:s} at {path:s} has expired".format( + jid=jid, path=lockp + ) + ) + + res = client.delete(job.key, recursive=True) + count += 1 + continue + log.trace( + "sdstack_etcd returner <_purge_jobs> purged {count:d} jobs".format(count=count) + ) + return count + + +def _purge_events(): + write_profile = __opts__.get("etcd.returner_write_profile") + client, path, _ = _get_conn(__opts__, write_profile) + + # Figure out the path that our event cache should exist at + cachep = "/".join([path, Schema["event-cache"]]) + + # Try and read the event cache directory. If we have a missing key exception then no + # events have been cached and so we can simply leave. + log.trace( + "sdstack_etcd returner <_purge_events> reading event cache at {path:s}".format( + path=cachep + ) + ) + try: + cache = client.read(cachep) + except etcd.EtcdKeyNotFound as E: + return 0 + + # Iterate through all of the children at our cache path while looking for + # the id key. If one isn"t found, then we can remove this event because + # it has expired. + count = 0 + for ev in cache.leaves: + if not ev.dir: + log.warning( + "sdstack_etcd returner <_purge_events> found a non-event at {path:s} {expire:s}".format( + path=ev.key, + expire="that will need to be manually removed" + if ev.ttl is None + else "that will expire in {ttl:d} seconds".format(ttl=ev.ttl), + ) + ) + continue + + # Figure out the event id from the key path + try: + event = int(ev.key.split("/")[-1]) + except ValueError: + log.warning( + "sdstack_etcd returner <_purge_events> found an incorrectly structured event at {path:s} {expire:s}".format( + path=ev.key, + expire="that will need to be manually removed" + if ev.ttl is None + else "that will expire in {ttl:d} seconds".format(ttl=ev.ttl), + ) + ) + continue + + # Build all of the event paths that we"re going to use + ev_lockp = "/".join([ev.key, "lock"]) + ev_indexp = "/".join([ev.key, "index"]) + ev_tagp = "/".join([ev.key, "tag"]) + + # Ping the event lock to see if it"s actually alive + try: + ev_lock = client.read(ev_lockp) + + log.debug( + "sdstack_etcd returner <_purge_events> event {event:d} at {path:s} is still alive and {expire:s}".format( + event=event, + path=ev.key, + expire="will need to be manually removed" + if ev_lock.ttl is None + else "will expire in {ttl:d} seconds".format(ttl=ev_lock.ttl), + ) + ) + continue + + except etcd.EtcdKeyNotFound as E: + log.debug( + "sdstack_etcd returner <_purge_events> event {event:d} at {path:s} has expired and will be removed".format( + event=event, path=ev.key + ) + ) + + # Now that we know the event is dead, we can read the index so that + # we can check it against the actual event later. + log.trace( + "sdstack_etcd returner <_purge_events> reading modifiedIndex for event {event:d} at {path:s}".format( + event=event, path=ev_indexp + ) + ) + try: + ev_index = client.read(ev_indexp) + + # If we can"t find the index here, then we just remove the event because + # we have no way to detect whether the event tag actually belongs to us. + # So in this case, we"re done. + except etcd.EtcdKeyNotFound as E: + log.warning( + "sdstack_etcd returner <_purge_events> event {event:d} at {path:s} is corrupt (missing id) and will be removed".format( + event=event, path=ev.key + ) + ) + + log.debug( + "sdstack_etcd returner <_purge_events> removing corrupt event {event:d} at {path:s}".format( + event=event, path=ev.key + ) + ) + res = client.delete(ev.key, recursive=True) + + count += 1 + continue + + # Now we grab the tag because this is what we"ll check the ev_index against + log.trace( + "sdstack_etcd returner <_purge_events> reading tag for event {event:d} at {path:s}".format( + event=event, path=ev_tagp + ) + ) + try: + ev_tag = client.read(ev_tagp) + + # If the tag key doesn"t exist, then the current entry in our cache doesn"t + # even matter because we can"t do anything without a tag. So similar to + # before, we just remove it and cycle to the next event. + except etcd.EtcdKeyNotFound as E: + log.warning( + "sdstack_etcd returner <_purge_events> event {event:d} at {path:s} is corrupt (missing tag) and will be removed".format( + event=event, path=ev.key + ) + ) + + log.debug( + "sdstack_etcd returner <_purge_events> removing corrupt event {event:d} at {path:s}".format( + event=event, path=ev.key + ) + ) + client.delete(ev.key, recursive=True) + + count += 1 + continue + + ## Everything is valid, so now we can properly remove the package (if the + ## current event is the owner), and then we can remove the cache entry. + + # Remove the package associated with the current event index + log.trace( + "sdstack_etcd returner <_purge_events> removing package for event {event:d} at {path:s}".format( + event=event, path=ev_tag.value + ) + ) + packagep = ev_tag.value.split("/") + + # Try and remove the package path that was cached while checking that + # its modifiedIndex is what we expect. If it"s not, then we know that + # we"re not the only person that"s using this event and so we don"t + # need to delete it yet, because another event will do it eventually. + basep = [path, Schema["package-path"]] + try: + res = client.delete("/".join(basep + packagep), prevIndex=ev_index.value) + + # Our package is in use by someone else, so we can simply remove the cache + # entry and then cycle to the next event. + except etcd.EtcdCompareFailed as E: + log.debug( + "sdstack_etcd returner <_purge_events> refusing to remove package for event {event:d} at {path:s} as it is still in use".format( + event=event, path="/".join(basep + packagep[:]) + ) + ) + count += 1 + + # Remove the whole event cache entry + log.debug( + "sdstack_etcd returner <_purge_events> removing (duplicate) event {event:d} at {path:s}".format( + event=event, path=ev.key + ) + ) + res = client.delete(ev.key, recursive=True) + continue + + # Walk through each component of the package path trying to remove them unless the directory is not empty + packagep.pop(-1) + log.debug( + "sdstack_etcd returner <_purge_events> (recursively) removing parent keys for event {event:d} package at {path:s}".format( + event=event, path="/".join(basep + packagep[:]) + ) + ) + for i in range(len(packagep), 0, -1): + log.trace( + "sdstack_etcd returner <_purge_events> removing directory for event {event:d} package at {path:s}".format( + event=event, path="/".join(basep + packagep[:i]) + ) + ) + try: + client.delete("/".join(basep + packagep[:i]), dir=True) + except etcd.EtcdDirNotEmpty as E: + log.debug( + "sdstack_etcd returner <_purge_events> Unable to remove directory for event {event:d} package at {path:s} due to other tags under it still being in use ({exception})".format( + path="/".join(basep + packagep[:i]), event=event, exception=E + ) + ) + break + continue + + # Remove the whole event cache entry now that we"ve properly removed the package + log.debug( + "sdstack_etcd returner <_purge_events> removing event {event:d} at {path:s}".format( + event=event, path=ev.key + ) + ) + res = client.delete(ev.key, recursive=True) + + count += 1 + + return count def clean_old_jobs(): """ - Included for API consistency + Called in the master"s event loop every loop_interval. Removes any jobs, + and returns that are older than the etcd.ttl option (seconds), or the + keep_jobs option (hours). + + :return: """ + # First we"ll purge the jobs... + jobc = _purge_jobs() + if jobc > 0: + log.trace( + "sdstack_etcd returner successfully removed {count:d} jobs".format( + count=jobc + ) + ) + + # ...and then we"ll purge the events + eventsc = _purge_events() + if eventsc > 0: + log.trace( + "sdstack_etcd returner successfully removed {count:d} events".format( + count=eventsc + ) + ) + + # Log that we hit a cleanup iteration + log.debug( + "sdstack_etcd returner completed purging jobs and events" + ) + def get_load(jid): """ - Return the load data that marks a specified jid + Return the load data that marks a specified jid. """ - log.debug("sdstack_etcd returner called jid: {0}".format(jid)) read_profile = __opts__.get("etcd.returner_read_profile") - client, path = _get_conn(__opts__, read_profile) - return salt.utils.json.loads( - client.get("/".join((path, "jobs", jid, ".load.p"))).value + client, path, _ = _get_conn(__opts__, read_profile) + + # Figure out the path that our job should be at + loadp = "/".join([path, Schema["job-cache"], jid, ".load.p"]) + + # Read it. If EtcdKeyNotFound was raised then the key doesn"t exist and so + # we need to return None, because that"s what our caller expects on a + # non-existent job. + log.debug( + "sdstack_etcd returner reading load data for job {jid:s} from {path:s}".format( + jid=jid, path=loadp + ) ) + try: + res = client.read(loadp) + except etcd.EtcdKeyNotFound as E: + log.error( + "sdstack_etcd returner could not find job {jid:s} at the path {path:s}".format( + jid=jid, path=loadp + ) + ) + return None + log.debug( + "sdstack_etcd returner found load data for job {jid:s} at {path:s} with value {data}".format( + jid=jid, path=res.key, data=res.value + ) + ) + return salt.utils.json.loads(res.value) def get_jid(jid): """ - Return the information returned when the specified job id was executed + Return the information returned when the specified job id was executed. """ - log.debug("sdstack_etcd returner called jid: {0}".format(jid)) + client, path, _ = _get_conn(__opts__) + + # Figure out the path that our job should be at + resultsp = "/".join([path, Schema["job-cache"], jid]) + + # Try and read the job directory. If we have a missing key exception then no + # minions have returned anything yet and so we return an empty dict for the + # caller. + log.debug( + "sdstack_etcd returner reading minions that have returned results for job {jid:s} at {path:s}".format( + jid=jid, path=resultsp + ) + ) + try: + results = client.read(resultsp) + except etcd.EtcdKeyNotFound as E: + log.trace( + "sdstack_etcd returner unable to read job {jid:s} from {path:s}".format( + jid=jid, path=resultsp + ) + ) + return {} + + # Iterate through all of the children at our job path that are directories. + # Anything that is a directory should be a minion that contains some results. + log.debug( + "sdstack_etcd returner iterating through minions with results for job {jid:s} from {path:s}".format( + jid=results.key.split("/")[-1], path=results.key + ) + ) ret = {} - client, path = _get_conn(__opts__) - items = client.get("/".join((path, "jobs", jid))) - for item in items.children: - if str(item.key).endswith(".load.p"): - continue - comps = str(item.key).split("/") - data = client.get("/".join((path, "jobs", jid, comps[-1], "return"))).value - ret[comps[-1]] = {"return": salt.utils.json.loads(data)} + for item in results.leaves: + if not item.dir: + continue + + # Extract the minion name from the key in the job, and use it to build + # the path to the return value + comps = item.key.split("/") + returnp = "/".join([resultsp, comps[-1], "return"]) + + # Now we know the minion and the path to the return for its job, we can + # just grab it. If the key exists, but the value is missing entirely, + # then something that shouldn"t happen has happened. + log.trace( + "sdstack_etcd returner grabbing result from minion {minion:s} for job {jid:s} at {path:s}".format( + minion=comps[-1], jid=jid, path=returnp + ) + ) + try: + result = client.read(returnp, recursive=True) + except etcd.EtcdKeyNotFound as E: + log.debug( + "sdstack_etcd returner returned nothing from minion {minion:s} for job {jid:s} at {path:s}".format( + minion=comps[-1], jid=jid, path=returnp + ) + ) + continue + + # Aggregate any keys that we found into a dictionary + res = {} + for item in result.leaves: + name = item.key.split("/")[-1] + try: + res[name] = salt.utils.json.loads(item.value) + + # We use a general exception here instead of ValueError jic someone + # changes the semantics of salt.utils.json.loads out from underneath us + except Exception as E: # pylint: disable=broad-except + log.warning( + "sdstack_etcd returner unable to decode field {name:s} from minion {minion:s} for job {jid:s} at {path:s}".format( + minion=comps[-1], jid=jid, path=item.key, name=name + ) + ) + res[name] = item.value + continue + + # We found something, so update our return dict for the minion id with + # the results that it returned. + ret[comps[-1]] = res + log.debug( + "sdstack_etcd returner job {jid:s} from minion {minion:s} at path {path:s} returned {result}".format( + minion=comps[-1], jid=jid, path=result.key, result=res + ) + ) return ret def get_fun(fun): """ - Return a dict of the last function called for all minions + Return a dict containing the last function called for all the minions that have called a function. """ - log.debug("sdstack_etcd returner called fun: {0}".format(fun)) + client, path, _ = _get_conn(__opts__) + + # Find any minions that had their last function registered by returner() + minionsp = "/".join([path, Schema["minion-fun"]]) + + # If the minions key isn"t found, then no minions registered a function + # and thus we need to return an empty dict so the caller knows that + # nothing is available. + log.debug( + "sdstack_etcd returner reading minions at {path:s} for function {fun:s}".format( + path=minionsp, fun=fun + ) + ) + try: + minions = client.read(minionsp) + except etcd.EtcdKeyNotFound as E: + return {} + + # Walk through the list of all the minions that have a jid registered, + # and cross reference this with the job returns. + log.debug( + "sdstack_etcd returner iterating through minions for function {fun:s} at {path:s}".format( + fun=fun, path=minions.key + ) + ) ret = {} - client, path = _get_conn(__opts__) - items = client.get("/".join((path, "minions"))) - for item in items.children: - comps = str(item.key).split("/") - efun = salt.utils.json.loads( - client.get( - "/".join((path, "jobs", str(item.value), comps[-1], "fun")) - ).value - ) - if efun == fun: - ret[comps[-1]] = str(efun) + for minion in minions.leaves: + if minion.dir: + log.warning( + "sdstack_etcd returner found a non-minion at {path:s} {expire:s}".format( + path=minion.key, + expire="that will need to be manually removed" + if minion.ttl is None + else "that will expire in {ttl:d} seconds".format(ttl=minion.ttl), + ) + ) + continue + + # Now that we have a minion and it"s last jid, we use it to fetch the + # function field (fun) that was registered by returner(). + jid, comps = minion.value, minion.key.split("/") + funp = "/".join([path, Schema["job-cache"], jid, comps[-1], "fun"]) + + # Try and read the field, and skip it if it doesn"t exist or wasn"t + # registered for some reason. + log.trace( + "sdstack_etcd returner reading function from minion {minion:s} for job {jid:s} at {path:s}".format( + minion=comps[-1], jid=jid, path=funp + ) + ) + try: + res = client.read(funp) + except etcd.EtcdKeyNotFound as E: + log.debug( + "sdstack_etcd returner returned nothing from minion {minion:s} for job {jid:s} at path {path:s}".format( + minion=comps[-1], jid=jid, path=funp + ) + ) + continue + + # Check if the function field (fun) matches what the user is looking for + # If it does, then we can just add the minion to our results + log.trace( + "sdstack_etcd returner decoding json data from minion {minion:s} for job {jid:s} at {path:s}".format( + minion=comps[-1], jid=jid, path=funp + ) + ) + data = salt.utils.json.loads(res.value) + if data == fun: + ret[comps[-1]] = str(data) + log.debug( + "sdstack_etcd returner found job {jid:s} for minion {minion:s} using {fun:s} at {path:s}".format( + minion=comps[-1], fun=data, jid=jid, path=minion.key + ) + ) + continue return ret def get_jids(): """ - Return a list of all job ids + Return a list of all job ids that have returned something. """ - log.debug("sdstack_etcd returner called") - ret = [] - client, path = _get_conn(__opts__) - items = client.get("/".join((path, "jobs"))) - for item in items.children: - if item.dir is True: - jid = str(item.key).split("/")[-1] - ret.append(jid) + client, path, _ = _get_conn(__opts__) + + # Enumerate all the jobs that are available. + jobsp = "/".join([path, Schema["job-cache"]]) + + # Fetch all the jobs. If the key doesn"t exist, then it"s likely that no + # jobs have been created yet so return an empty list to the caller. + log.debug( + "sdstack_etcd returner listing jobs at {path:s}".format(path=jobsp) + ) + try: + jobs = client.read(jobsp) + except etcd.EtcdKeyNotFound as E: + return [] + + # Anything that"s a directory is a job id. Since that"s all we"re returning, + # aggregate them into a list. + log.debug( + "sdstack_etcd returner iterating through jobs at {path:s}".format( + path=jobs.key + ) + ) + ret = {} + for job in jobs.leaves: + if not job.dir: + log.warning( + "sdstack_etcd returner found a non-job at {path:s} {expire:s}".format( + path=job.key, + expire="that will need to be manually removed" + if job.ttl is None + else "that will expire in {ttl:d} seconds".format(ttl=job.ttl), + ) + ) + continue + + jid = job.key.split("/")[-1] + loadp = "/".join([job.key, ".load.p"]) + + # Now we can load the data from the job + try: + res = client.read(loadp) + except etcd.EtcdKeyNotFound as E: + log.error( + "sdstack_etcd returner could not find job data {jid:s} at the path {path:s}".format( + jid=jid, path=loadp + ) + ) + continue + + # Decode the load data so we can stash the job data for our caller + try: + data = salt.utils.json.loads(res.value) + + # If we can"t decode the json, then we"re screwed so log it in case the user cares + except Exception as E: # pylint: disable=broad-except + log.error( + "sdstack_etcd returner could not decode data for job {jid:s} at the path {path:s} due to exception ({exception}) being raised. Data was {data}".format( + jid=jid, path=loadp, exception=E, data=res.value + ) + ) + continue + + # Cool. Everything seems to be good... + ret[jid] = salt.utils.jid.format_jid_instance(jid, data) + log.trace( + "sdstack_etcd returner found job {jid:s} at {path:s}".format( + jid=jid, path=job.key + ) + ) + + log.debug( + "sdstack_etcd returner found {count:d} jobs at {path:s}".format( + count=len(ret), path=jobs.key + ) + ) return ret def get_minions(): """ - Return a list of minions + Return a list of all minions that have returned something. """ - log.debug("sdstack_etcd returner called") + client, path, _ = _get_conn(__opts__) + + # Find any minions that have returned anything + minionsp = "/".join([path, Schema["minion-fun"]]) + + # If no minions were found, then nobody has returned anything recently. In + # this case, return an empty last for the caller. + log.debug( + "sdstack_etcd returner reading minions at {path:s}".format( + path=minionsp + ) + ) + try: + minions = client.read(minionsp) + except etcd.EtcdKeyNotFound as E: + return [] + + # We can just walk through everything that isn"t a directory. This path + # is simply a list of minions and the last job that each one returned. + log.debug( + "sdstack_etcd returner iterating through minions at {path:s}".format( + path=minions.key + ) + ) ret = [] - client, path = _get_conn(__opts__) - items = client.get("/".join((path, "minions"))) - for item in items.children: - comps = str(item.key).split("/") + for minion in minions.leaves: + if minion.dir: + log.warning( + "sdstack_etcd returner found a non-minion at {path:s} {expire:s}".format( + path=minion.key, + expire="that will need to be manually removed" + if minion.ttl is None + else "that will expire in {ttl:d} seconds".format(ttl=minion.ttl), + ) + ) + continue + comps = str(minion.key).split("/") + log.trace( + "sdstack_etcd returner found minion {minion:s} at {path:s}".format( + minion=comps[-1], path=minion.key + ) + ) ret.append(comps[-1]) return ret def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument """ - Do any work necessary to prepare a JID, including sending a custom id + Do any work necessary to prepare a JID, including sending a custom id. """ return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__) + + +def event_return(events): + """ + Return event to etcd server + + Requires that configuration enabled via "event_return" + option in master config. + """ + write_profile = __opts__.get("etcd.returner_write_profile") + client, path, ttl = _get_conn(__opts__, write_profile) + + # Iterate through all the events, and add them to the events path based + # on the tag that is labeled in each event. We aggregate all errors into + # a list so the writing of the events are as atomic as possible. + log.debug( + "sdstack_etcd returner iterating through {count:d} events to write into our profile".format( + count=len(events) + ) + ) + exceptions = [] + for event in events: + + # Each event that we receive should already come with these properties, + # but we add these just in case the schema changes as a result of a + # refactor or something. Some of the other returns also include a + # timestamp despite the time only having meaning to the minion that + # it"s coming from. We"ll include it in case the user wants it too + # for some reason. + + package = dict(event) + package.setdefault("master_id", __opts__["id"]) + package.setdefault("timestamp", time.time()) + + # Use the tag from the event package to build a watchable path + packagep = "/".join([path, Schema["package-path"], package["tag"]]) + + # Now we can write the event package into the event path + log.debug( + "sdstack_etcd returner writing package into event path at {path:s}".format( + path=packagep + ) + ) + json = salt.utils.json.dumps(package) + try: + # Try and write the event if it doesn"t exist + res = client.write(packagep, json, prevExist=False) + + # If the event doesn"t actually exist, then just modify it instead of re-creating it + # and tampering with the createdIndex + except etcd.EtcdAlreadyExist as E: + log.trace( + "sdstack_etcd returner fetching already existing event with the tag {name:s} at {path:s}".format( + name=package["tag"], path=packagep + ) + ) + node = client.read(packagep) + + log.debug( + "sdstack_etcd returner updating package for event ({event:d}) with the tag {name:s} at {path:s}".format( + event=node.modifiedIndex, name=package["tag"], path=packagep + ) + ) + node.value = json + res = client.update(node) + + except Exception as E: # pylint: disable=broad-except + log.trace( + "sdstack_etcd returner unable to write event with the tag {name:s} into {path:s} due to exception ({exception}) being raised".format( + name=package["tag"], path=packagep, exception=E + ) + ) + exceptions.append((E, package)) + continue + + # From now on, we can use the modifiedIndex of our tag path as our event + # identifier. Despite usage of the modifiedIndex as a unique identifier, + # this identifer can be arbitrary as the index sub-key is responsible + # for determining ownership of the event package that was registered. + event = res.modifiedIndex + log.trace( + "sdstack_etcd returner wrote event {event:d} with the tag {name:s} to {path:s} using {data}".format( + path=res.key, event=event, name=package["tag"], data=res.value + ) + ) + + # Now we"ll need to store the modifiedIndex for said event so that we can + # use it to determine ownership. This modifiedIndex is what links the + # actual event with the tag. If we were using the etcd3 api, then we + # could make these 3 writes (index, tag, and lock) atomic. But we"re + # not, and so this is a manual effort and potentially racy depending on + # the uniqueness of the modifiedIndex (which etcd guarantees unique) + + basep = "/".join([path, Schema["event-cache"], str(event)]) + + # Here we"ll write our modifiedIndex to our event cache. + indexp = "/".join([basep, "index"]) + try: + # If the event is a new key, then we can simply cache it without concern + if res.newKey: + log.trace( + "sdstack_etcd returner writing new event {event:d} with the modifiedIndex {index:d} for the tag {name:s} at {path:s}".format( + path=indexp, + event=event, + index=res.modifiedIndex, + name=package["tag"], + ) + ) + client.write(indexp, res.modifiedIndex, prevExist=False) + + # Otherwise, the event was updated and thus we need to update our cache too + else: + log.trace( + "sdstack_etcd returner updating event {event:d} with the tag {name:s} at {path:s} with the modifiedIndex {index:d}".format( + path=indexp, + event=event, + index=res.modifiedIndex, + name=package["tag"], + ) + ) + client.write(indexp, res.modifiedIndex) + + except etcd.EtcdAlreadyExist as E: + log.error( + "sdstack_etcd returner unable to write modifiedIndex {index:d} for tag {name:s} to event {event:d} due to the event already existing".format( + event=event, index=res.modifiedIndex, name=package["tag"] + ) + ) + exceptions.append((E, package)) + continue + + # If we got here, then we should be able to write the tag using the event index + tagp = "/".join([basep, "tag"]) + try: + log.trace( + "sdstack_etcd returner updating event {event:d} at {path:s} with tag {name:s}".format( + path=tagp, event=event, name=package["tag"] + ) + ) + client.write(tagp, package["tag"]) + + except Exception as E: # pylint: disable=broad-except + log.trace( + "sdstack_etcd returner unable to update event {event:d} at {path:s} with tag {name:s} due to exception ({exception}) being raised".format( + path=tagp, name=package["tag"], event=event, exception=E + ) + ) + exceptions.append((E, package)) + continue + + # Now that both have been written, let"s write our lock to actually enable the event + lockp = "/".join([basep, "lock"]) + try: + log.trace( + "sdstack_etcd returner writing lock for event {event:d} with the tag {name:s} to {path:s} {expire:s}".format( + path=lockp, + event=event, + name=package["tag"], + expire="that will need to be manually removed" + if ttl is None + else "that will expire in {ttl:d} seconds".format(ttl=ttl), + ) + ) + client.write(lockp, res.createdIndex, ttl=ttl if ttl > 0 else None) + + # If we can"t write the lock, it"s fine because the maintenance thread + # will purge this event from the cache anyways if it"s not written. + except Exception as E: # pylint: disable=broad-except + log.error( + "sdstack_etcd returner unable to write lock for event {event:d} with the tag {name:s} to {path:s} due to exception ({exception}) being raised".format( + path=lockp, name=package["tag"], event=event, exception=E + ) + ) + exceptions.append((E, package)) + + continue + + # Go back through all of the exceptions that occurred and log them. + for E, pack in exceptions: + log.exception( + "sdstack_etcd returner exception ({exception}) was raised while trying to write event {name:s} with the data {data}".format( + exception=E, name=pack["tag"], data=pack + ) + ) + return + + +def get_jids_filter(count, filter_find_job=True): + """ + Return a list of all job ids + :param int count: show not more than the count of most recent jobs + :param bool filter_find_jobs: filter out "saltutil.find_job" jobs + """ + read_profile = __opts__.get("etcd.returner_read_profile") + client, path, ttl = _get_conn(__opts__, read_profile) + + # Enumerate all the jobs that are available. + jobsp = "/".join([path, Schema["job-cache"]]) + + # Fetch all the jobs. If the key doesn"t exist, then it"s likely that no + # jobs have been created yet so return an empty list to the caller. + log.debug( + "sdstack_etcd returner listing jobs at {path:s}".format( + path=jobsp + ) + ) + try: + jobs = client.read(jobsp, sorted=True) + except etcd.EtcdKeyNotFound as E: + return [] + + # Anything that"s a directory is a job id. Since that"s all we"re returning, + # aggregate them into a list. We do this ahead of time in order to conserve + # memory by avoiding just decoding everything here + log.debug( + "sdstack_etcd returner collecting jobs at {path:s}".format( + path=jobs.key + ) + ) + jids = [] + for job in jobs.leaves: + if not job.dir: + continue + jids.append(job.key.split("/")[-1]) + + log.debug( + "sdstack_etcd returner collecting {count:d} job loads at {path:s}".format( + path=jobs.key, count=count + ) + ) + ret = [] + for jid in jids[-count:]: + + # Figure out the path to .load.p from the current jid + loadp = "/".join([jobsp, jid, ".load.p"]) + + # Now we can load the data from the job + try: + res = client.read(loadp) + except etcd.EtcdKeyNotFound as E: + log.error( + "sdstack_etcd returner could not find job data {jid:s} at the path {path:s}".format( + jid=jid, path=loadp + ) + ) + continue + + # Decode the load data so we can stash it for the caller + try: + data = salt.utils.json.loads(res.value) + + # If we can"t decode the json, then we"re screwed so log it in case the user cares + except Exception as E: # pylint: disable=broad-except + log.error( + "sdstack_etcd returner could not decode data for job {jid:s} at the path {path:s} due to exception ({exception}) being raised. Data was {data}".format( + jid=jid, path=loadp, exception=E, data=res.value + ) + ) + continue + + if filter_find_job and data["fun"] == "saltutil.find_job": + continue + + ret.append(salt.utils.jid.format_jid_instance_ext(jid, data)) + return ret + + +def update_endtime(jid, time): + """ + Update (or store) the end time for a given job + + Endtime is stored as a plain text string + """ + write_profile = __opts__.get("etcd.returner_write_profile") + client, path, ttl = _get_conn(__opts__, write_profile) + + # Check if the specified jid is "req", as only incorrect code will do this + if jid == "req": + log.debug( + "sdstack_etcd returner was called using a request job id ({jid:s}) with {data}".format( + jid=jid, data=time + ) + ) + + # Build the path that we"ll use to update the endtime + timep = "/".join([path, Schema["job-cache"], jid, ".endtime"]) + + ## Now we can simply update it + json = salt.utils.json.dumps(time) + + log.debug( + "sdstack_etcd returner storing endtime for job {jid:s} to {path:s} with {data}".format( + jid=jid, path=timep, data=time + ) + ) + try: + res = client.write(timep, json, prevExist=False) + + # If the key already exists, then warn the user but still update the key. + except etcd.EtcdAlreadyExist as E: + node = client.read(timep) + node.value = json + + log.debug( + "sdstack_etcd returner updating endtime for job {jid:s} at {path:s} with {data}".format( + jid=jid, path=timep, data=time + ) + ) + res = client.update(node) + + # If we failed here, it"s okay because the lock won"t get written so this + # essentially means the job will get scheduled for deletion. + except Exception as E: # pylint: disable=broad-except + log.trace( + "sdstack_etcd returner unable to store endtime for job {jid:s} to the path {path:s} due to exception ({exception}) being raised".format( + jid=jid, path=timep, exception=E + ) + ) + return + log.trace( + "sdstack_etcd returner successfully wrote endtime for job {jid:s} to the path {path:s} with {data}".format( + jid=jid, path=timep, data=time + ) + ) + + +def get_endtime(jid): + """ + Retrieve the stored endtime for a given job + + Returns False if no endtime is present + """ + read_profile = __opts__.get("etcd.returner_read_profile") + client, path, _ = _get_conn(__opts__, read_profile) + + # Figure out the path that our endtime should be at + timep = "/".join([path, Schema["job-cache"], jid, ".endtime"]) + + # Read it. If EtcdKeyNotFound was raised then the key doesn"t exist and so + # we need to return False, because that"s what salt.returners.local_cache + # returns when the endtime hasn"t been written.. + log.debug( + "sdstack_etcd returner reading endtime for job {jid:s} from {path:s}".format( + jid=jid, path=timep + ) + ) + try: + res = client.read(timep) + except etcd.EtcdKeyNotFound as E: + log.debug( + "sdstack_etcd returner could not find endtime for job {jid:s} at the path {path:s}".format( + jid=jid, path=timep + ) + ) + return False + log.debug( + "sdstack_etcd returner found endtime for job {jid:s} at {path:s} with value {data}".format( + jid=jid, path=res.key, data=res.value + ) + ) + return salt.utils.json.loads(res.value)