diff --git a/salt/beacons/__init__.py b/salt/beacons/__init__.py index d464e247402d..5cc15d348205 100644 --- a/salt/beacons/__init__.py +++ b/salt/beacons/__init__.py @@ -233,9 +233,9 @@ def list_beacons(self, beacons = self._get_beacons(include_pillar, include_opts) # Fire the complete event back along with the list of beacons - evt = salt.utils.event.get_event('minion', opts=self.opts) - evt.fire_event({'complete': True, 'beacons': beacons}, - tag='/salt/minion/minion_beacons_list_complete') + with salt.utils.event.get_event('minion', opts=self.opts) as evt: + evt.fire_event({'complete': True, 'beacons': beacons}, + tag='/salt/minion/minion_beacons_list_complete') return True @@ -247,9 +247,9 @@ def list_available_beacons(self): for _beacon in self.beacons if '.beacon' in _beacon] # Fire the complete event back along with the list of beacons - evt = salt.utils.event.get_event('minion', opts=self.opts) - evt.fire_event({'complete': True, 'beacons': _beacons}, - tag='/salt/minion/minion_beacons_list_available_complete') + with salt.utils.event.get_event('minion', opts=self.opts) as evt: + evt.fire_event({'complete': True, 'beacons': _beacons}, + tag='/salt/minion/minion_beacons_list_available_complete') return True @@ -270,11 +270,11 @@ def validate_beacon(self, name, beacon_data): valid = True # Fire the complete event back along with the list of beacons - evt = salt.utils.event.get_event('minion', opts=self.opts) - evt.fire_event({'complete': True, - 'vcomment': vcomment, - 'valid': valid}, - tag='/salt/minion/minion_beacon_validation_complete') + with salt.utils.event.get_event('minion', opts=self.opts) as evt: + evt.fire_event({'complete': True, + 'vcomment': vcomment, + 'valid': valid}, + tag='/salt/minion/minion_beacon_validation_complete') return True @@ -300,10 +300,10 @@ def add_beacon(self, name, beacon_data): self.opts['beacons'].update(data) # Fire the complete event back along with updated list of beacons - evt = salt.utils.event.get_event('minion', opts=self.opts) - evt.fire_event({'complete': complete, 'comment': comment, - 'beacons': self.opts['beacons']}, - tag='/salt/minion/minion_beacon_add_complete') + with salt.utils.event.get_event('minion', opts=self.opts) as evt: + evt.fire_event({'complete': complete, 'comment': comment, + 'beacons': self.opts['beacons']}, + tag='/salt/minion/minion_beacon_add_complete') return True @@ -326,10 +326,10 @@ def modify_beacon(self, name, beacon_data): self.opts['beacons'].update(data) # Fire the complete event back along with updated list of beacons - evt = salt.utils.event.get_event('minion', opts=self.opts) - evt.fire_event({'complete': complete, 'comment': comment, - 'beacons': self.opts['beacons']}, - tag='/salt/minion/minion_beacon_modify_complete') + with salt.utils.event.get_event('minion', opts=self.opts) as evt: + evt.fire_event({'complete': complete, 'comment': comment, + 'beacons': self.opts['beacons']}, + tag='/salt/minion/minion_beacon_modify_complete') return True def delete_beacon(self, name): @@ -350,10 +350,10 @@ def delete_beacon(self, name): complete = True # Fire the complete event back along with updated list of beacons - evt = salt.utils.event.get_event('minion', opts=self.opts) - evt.fire_event({'complete': complete, 'comment': comment, - 'beacons': self.opts['beacons']}, - tag='/salt/minion/minion_beacon_delete_complete') + with salt.utils.event.get_event('minion', opts=self.opts) as evt: + evt.fire_event({'complete': complete, 'comment': comment, + 'beacons': self.opts['beacons']}, + tag='/salt/minion/minion_beacon_delete_complete') return True @@ -365,9 +365,9 @@ def enable_beacons(self): self.opts['beacons']['enabled'] = True # Fire the complete event back along with updated list of beacons - evt = salt.utils.event.get_event('minion', opts=self.opts) - evt.fire_event({'complete': True, 'beacons': self.opts['beacons']}, - tag='/salt/minion/minion_beacons_enabled_complete') + with salt.utils.event.get_event('minion', opts=self.opts) as evt: + evt.fire_event({'complete': True, 'beacons': self.opts['beacons']}, + tag='/salt/minion/minion_beacons_enabled_complete') return True @@ -379,9 +379,9 @@ def disable_beacons(self): self.opts['beacons']['enabled'] = False # Fire the complete event back along with updated list of beacons - evt = salt.utils.event.get_event('minion', opts=self.opts) - evt.fire_event({'complete': True, 'beacons': self.opts['beacons']}, - tag='/salt/minion/minion_beacons_disabled_complete') + with salt.utils.event.get_event('minion', opts=self.opts) as evt: + evt.fire_event({'complete': True, 'beacons': self.opts['beacons']}, + tag='/salt/minion/minion_beacons_disabled_complete') return True @@ -400,10 +400,10 @@ def enable_beacon(self, name): complete = True # Fire the complete event back along with updated list of beacons - evt = salt.utils.event.get_event('minion', opts=self.opts) - evt.fire_event({'complete': complete, 'comment': comment, - 'beacons': self.opts['beacons']}, - tag='/salt/minion/minion_beacon_enabled_complete') + with salt.utils.event.get_event('minion', opts=self.opts) as evt: + evt.fire_event({'complete': complete, 'comment': comment, + 'beacons': self.opts['beacons']}, + tag='/salt/minion/minion_beacon_enabled_complete') return True @@ -422,10 +422,10 @@ def disable_beacon(self, name): complete = True # Fire the complete event back along with updated list of beacons - evt = salt.utils.event.get_event('minion', opts=self.opts) - evt.fire_event({'complete': complete, 'comment': comment, - 'beacons': self.opts['beacons']}, - tag='/salt/minion/minion_beacon_disabled_complete') + with salt.utils.event.get_event('minion', opts=self.opts) as evt: + evt.fire_event({'complete': complete, 'comment': comment, + 'beacons': self.opts['beacons']}, + tag='/salt/minion/minion_beacon_disabled_complete') return True diff --git a/salt/client/mixins.py b/salt/client/mixins.py index a004d5e1125c..2711b7f31e61 100644 --- a/salt/client/mixins.py +++ b/salt/client/mixins.py @@ -159,19 +159,19 @@ def cmd_sync(self, low, timeout=None, full_return=False): 'eauth': 'pam', }) ''' - event = salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=True) - job = self.master_call(**low) - ret_tag = salt.utils.event.tagify('ret', base=job['tag']) + with salt.utils.event.get_master_event(self.opts, self.opts['sock_dir'], listen=True) as event: + job = self.master_call(**low) + ret_tag = salt.utils.event.tagify('ret', base=job['tag']) - if timeout is None: - timeout = self.opts.get('rest_timeout', 300) - ret = event.get_event(tag=ret_tag, full=True, wait=timeout, auto_reconnect=True) - if ret is None: - raise salt.exceptions.SaltClientTimeout( - "RunnerClient job '{0}' timed out".format(job['jid']), - jid=job['jid']) + if timeout is None: + timeout = self.opts.get('rest_timeout', 300) + ret = event.get_event(tag=ret_tag, full=True, wait=timeout, auto_reconnect=True) + if ret is None: + raise salt.exceptions.SaltClientTimeout( + "RunnerClient job '{0}' timed out".format(job['jid']), + jid=job['jid']) - return ret if full_return else ret['data']['return'] + return ret if full_return else ret['data']['return'] def cmd(self, fun, arg=None, pub_data=None, kwarg=None, print_event=True, full_return=False): ''' @@ -307,13 +307,6 @@ def _low(self, fun, low, print_event=True, full_return=False): 'user': low.get('__user__', 'UNKNOWN'), } - event = salt.utils.event.get_event( - 'master', - self.opts['sock_dir'], - self.opts['transport'], - opts=self.opts, - listen=False) - if print_event: print_func = self.print_async_event \ if hasattr(self, 'print_async_event') \ @@ -323,113 +316,117 @@ def _low(self, fun, low, print_event=True, full_return=False): # runner/wheel output during orchestration). print_func = None - namespaced_event = salt.utils.event.NamespacedEvent( - event, + with salt.utils.event.NamespacedEvent( + salt.utils.event.get_event( + 'master', + self.opts['sock_dir'], + self.opts['transport'], + opts=self.opts, + listen=False, + ), tag, print_func=print_func - ) - - # TODO: test that they exist - # TODO: Other things to inject?? - func_globals = {'__jid__': jid, - '__user__': data['user'], - '__tag__': tag, - # weak ref to avoid the Exception in interpreter - # teardown of event - '__jid_event__': weakref.proxy(namespaced_event), - } + ) as namespaced_event: + + # TODO: test that they exist + # TODO: Other things to inject?? + func_globals = {'__jid__': jid, + '__user__': data['user'], + '__tag__': tag, + # weak ref to avoid the Exception in interpreter + # teardown of event + '__jid_event__': weakref.proxy(namespaced_event), + } - try: - self_functions = pycopy.copy(self.functions) - salt.utils.lazy.verify_fun(self_functions, fun) - - # Inject some useful globals to *all* the function's global - # namespace only once per module-- not per func - completed_funcs = [] - - for mod_name in six.iterkeys(self_functions): - if '.' not in mod_name: - continue - mod, _ = mod_name.split('.', 1) - if mod in completed_funcs: - continue - completed_funcs.append(mod) - for global_key, value in six.iteritems(func_globals): - self.functions[mod_name].__globals__[global_key] = value - - # There are some discrepancies of what a "low" structure is in the - # publisher world it is a dict including stuff such as jid, fun, - # arg (a list of args, with kwargs packed in). Historically this - # particular one has had no "arg" and just has had all the kwargs - # packed into the top level object. The plan is to move away from - # that since the caller knows what is an arg vs a kwarg, but while - # we make the transition we will load "kwargs" using format_call if - # there are no kwargs in the low object passed in. - - if 'arg' in low and 'kwarg' in low: - args = low['arg'] - kwargs = low['kwarg'] - else: - f_call = salt.utils.args.format_call( - self.functions[fun], - low, - expected_extra_kws=CLIENT_INTERNAL_KEYWORDS - ) - args = f_call.get('args', ()) - kwargs = f_call.get('kwargs', {}) - - # Update the event data with loaded args and kwargs - data['fun_args'] = list(args) + ([kwargs] if kwargs else []) - func_globals['__jid_event__'].fire_event(data, 'new') - - # Initialize a context for executing the method. - with tornado.stack_context.StackContext(self.functions.context_dict.clone): - data['return'] = self.functions[fun](*args, **kwargs) - try: - data['success'] = self.context.get('retcode', 0) == 0 - except AttributeError: - # Assume a True result if no context attribute - data['success'] = True - if isinstance(data['return'], dict) and 'data' in data['return']: - # some functions can return boolean values - data['success'] = salt.utils.state.check_result(data['return']['data']) - except (Exception, SystemExit) as ex: - if isinstance(ex, salt.exceptions.NotImplemented): - data['return'] = six.text_type(ex) - else: - data['return'] = 'Exception occurred in {0} {1}: {2}'.format( - self.client, - fun, - traceback.format_exc(), - ) - data['success'] = False - - if self.store_job: try: - salt.utils.job.store_job( - self.opts, - { - 'id': self.opts['id'], - 'tgt': self.opts['id'], - 'jid': data['jid'], - 'return': data, - }, - event=None, - mminion=self.mminion, + self_functions = pycopy.copy(self.functions) + salt.utils.lazy.verify_fun(self_functions, fun) + + # Inject some useful globals to *all* the function's global + # namespace only once per module-- not per func + completed_funcs = [] + + for mod_name in six.iterkeys(self_functions): + if '.' not in mod_name: + continue + mod, _ = mod_name.split('.', 1) + if mod in completed_funcs: + continue + completed_funcs.append(mod) + for global_key, value in six.iteritems(func_globals): + self.functions[mod_name].__globals__[global_key] = value + + # There are some discrepancies of what a "low" structure is in the + # publisher world it is a dict including stuff such as jid, fun, + # arg (a list of args, with kwargs packed in). Historically this + # particular one has had no "arg" and just has had all the kwargs + # packed into the top level object. The plan is to move away from + # that since the caller knows what is an arg vs a kwarg, but while + # we make the transition we will load "kwargs" using format_call if + # there are no kwargs in the low object passed in. + + if 'arg' in low and 'kwarg' in low: + args = low['arg'] + kwargs = low['kwarg'] + else: + f_call = salt.utils.args.format_call( + self.functions[fun], + low, + expected_extra_kws=CLIENT_INTERNAL_KEYWORDS ) - except salt.exceptions.SaltCacheError: - log.error('Could not store job cache info. ' - 'Job details for this run may be unavailable.') - - # Outputters _can_ mutate data so write to the job cache first! - namespaced_event.fire_event(data, 'ret') - - # if we fired an event, make sure to delete the event object. - # This will ensure that we call destroy, which will do the 0MQ linger - log.info('Runner completed: %s', data['jid']) - del event - del namespaced_event - return data if full_return else data['return'] + args = f_call.get('args', ()) + kwargs = f_call.get('kwargs', {}) + + # Update the event data with loaded args and kwargs + data['fun_args'] = list(args) + ([kwargs] if kwargs else []) + func_globals['__jid_event__'].fire_event(data, 'new') + + # Initialize a context for executing the method. + with tornado.stack_context.StackContext(self.functions.context_dict.clone): + data['return'] = self.functions[fun](*args, **kwargs) + try: + data['success'] = self.context.get('retcode', 0) == 0 + except AttributeError: + # Assume a True result if no context attribute + data['success'] = True + if isinstance(data['return'], dict) and 'data' in data['return']: + # some functions can return boolean values + data['success'] = salt.utils.state.check_result(data['return']['data']) + except (Exception, SystemExit) as ex: + if isinstance(ex, salt.exceptions.NotImplemented): + data['return'] = six.text_type(ex) + else: + data['return'] = 'Exception occurred in {0} {1}: {2}'.format( + self.client, + fun, + traceback.format_exc(), + ) + data['success'] = False + + if self.store_job: + try: + salt.utils.job.store_job( + self.opts, + { + 'id': self.opts['id'], + 'tgt': self.opts['id'], + 'jid': data['jid'], + 'return': data, + }, + event=None, + mminion=self.mminion, + ) + except salt.exceptions.SaltCacheError: + log.error('Could not store job cache info. ' + 'Job details for this run may be unavailable.') + + # Outputters _can_ mutate data so write to the job cache first! + namespaced_event.fire_event(data, 'ret') + + # if we fired an event, make sure to delete the event object. + # This will ensure that we call destroy, which will do the 0MQ linger + log.info('Runner completed: %s', data['jid']) + return data if full_return else data['return'] def get_docs(self, arg=None): ''' diff --git a/salt/crypt.py b/salt/crypt.py index c2ecf7e032bd..d7ffcaf876ca 100644 --- a/salt/crypt.py +++ b/salt/crypt.py @@ -650,8 +650,8 @@ def _authenticate(self): self._authenticate_future.set_result(True) # mark the sign-in as complete # Notify the bus about creds change if self.opts.get('auth_events') is True: - event = salt.utils.event.get_event(self.opts.get('__role'), opts=self.opts, listen=False) - event.fire_event({'key': key, 'creds': creds}, salt.utils.event.tagify(prefix='auth', suffix='creds')) + with salt.utils.event.get_event(self.opts.get('__role'), opts=self.opts, listen=False) as event: + event.fire_event({'key': key, 'creds': creds}, salt.utils.event.tagify(prefix='auth', suffix='creds')) @tornado.gen.coroutine def sign_in(self, timeout=60, safe=True, tries=1, channel=None): diff --git a/salt/engines/http_logstash.py b/salt/engines/http_logstash.py index 4a92718fdfbd..9d3df3b4c80f 100644 --- a/salt/engines/http_logstash.py +++ b/salt/engines/http_logstash.py @@ -98,22 +98,24 @@ def start(url, funs=None, tags=None): instance = 'master' else: instance = 'minion' - event_bus = salt.utils.event.get_event(instance, - sock_dir=__opts__['sock_dir'], - transport=__opts__['transport'], - opts=__opts__) - while True: - event = event_bus.get_event(full=True) - if event: - publish = True - if isinstance(tags, list) and len(tags) > 0: - found_match = False - for tag in tags: - if fnmatch.fnmatch(event['tag'], tag): - found_match = True - publish = found_match - if funs and 'fun' in event['data']: - if not event['data']['fun'] in funs: - publish = False - if publish: - _logstash(url, event['data']) + with salt.utils.event.get_event( + instance, + sock_dir=__opts__['sock_dir'], + transport=__opts__['transport'], + opts=__opts__, + ) as event_bus: + while True: + event = event_bus.get_event(full=True) + if event: + publish = True + if isinstance(tags, list) and len(tags) > 0: + found_match = False + for tag in tags: + if fnmatch.fnmatch(event['tag'], tag): + found_match = True + publish = found_match + if funs and 'fun' in event['data']: + if not event['data']['fun'] in funs: + publish = False + if publish: + _logstash(url, event['data']) diff --git a/salt/engines/logentries.py b/salt/engines/logentries.py index 7b59ba483482..e5065f64e1fc 100644 --- a/salt/engines/logentries.py +++ b/salt/engines/logentries.py @@ -171,6 +171,22 @@ def open_connection(self): SocketAppender = TLSSocketAppender +def event_bus_context(opts): + if opts.get('id').endswith('_master'): + event_bus = salt.utils.event.get_master_event( + opts, + opts['sock_dir'], + listen=True) + else: + event_bus = salt.utils.event.get_event( + 'minion', + transport=opts['transport'], + opts=opts, + sock_dir=opts['sock_dir'], + listen=True) + return event_bus + + def start(endpoint='data.logentries.com', port=10000, token=None, @@ -178,38 +194,26 @@ def start(endpoint='data.logentries.com', ''' Listen to salt events and forward them to Logentries ''' - if __opts__.get('id').endswith('_master'): - event_bus = salt.utils.event.get_master_event( - __opts__, - __opts__['sock_dir'], - listen=True) - else: - event_bus = salt.utils.event.get_event( - 'minion', - transport=__opts__['transport'], - opts=__opts__, - sock_dir=__opts__['sock_dir'], - listen=True) - log.debug('Logentries engine started') - - try: - val = uuid.UUID(token) - except ValueError: - log.warning('Not a valid logentries token') - - appender = SocketAppender(verbose=False, LE_API=endpoint, LE_PORT=port) - appender.reopen_connection() - - while True: - event = event_bus.get_event() - if event: - # future lint: disable=blacklisted-function - msg = str(' ').join(( - salt.utils.stringutils.to_str(token), - salt.utils.stringutils.to_str(tag), - salt.utils.json.dumps(event) - )) - # future lint: enable=blacklisted-function - appender.put(msg) - - appender.close_connection() + with event_bus_context(__opts__) as event_bus: + log.debug('Logentries engine started') + try: + val = uuid.UUID(token) + except ValueError: + log.warning('Not a valid logentries token') + + appender = SocketAppender(verbose=False, LE_API=endpoint, LE_PORT=port) + appender.reopen_connection() + + while True: + event = event_bus.get_event() + if event: + # future lint: disable=blacklisted-function + msg = str(' ').join(( + salt.utils.stringutils.to_str(token), + salt.utils.stringutils.to_str(tag), + salt.utils.json.dumps(event) + )) + # future lint: enable=blacklisted-function + appender.put(msg) + + appender.close_connection() diff --git a/salt/engines/logstash_engine.py b/salt/engines/logstash_engine.py index 28f537f5e43f..7883bc276cce 100644 --- a/salt/engines/logstash_engine.py +++ b/salt/engines/logstash_engine.py @@ -45,6 +45,22 @@ def __virtual__(): log = logging.getLogger(__name__) +def event_bus_context(opts): + if opts.get('id').endswith('_master'): + event_bus = salt.utils.event.get_master_event( + opts, + opts['sock_dir'], + listen=True) + else: + event_bus = salt.utils.event.get_event( + 'minion', + transport=opts['transport'], + opts=opts, + sock_dir=opts['sock_dir'], + listen=True) + return event_bus + + def start(host, port=5959, tag='salt/engine/logstash', proto='udp'): ''' Listen to salt events and forward them to logstash @@ -59,21 +75,9 @@ def start(host, port=5959, tag='salt/engine/logstash', proto='udp'): logstash_logger.setLevel(logging.INFO) logstash_logger.addHandler(logstashHandler(host, port, version=1)) - if __opts__.get('id').endswith('_master'): - event_bus = salt.utils.event.get_master_event( - __opts__, - __opts__['sock_dir'], - listen=True) - else: - event_bus = salt.utils.event.get_event( - 'minion', - transport=__opts__['transport'], - opts=__opts__, - sock_dir=__opts__['sock_dir'], - listen=True) + with event_bus_context(__opts__) as event_bus: log.debug('Logstash engine started') - - while True: - event = event_bus.get_event() - if event: - logstash_logger.info(tag, extra=event) + while True: + event = event_bus.get_event() + if event: + logstash_logger.info(tag, extra=event) diff --git a/salt/engines/test.py b/salt/engines/test.py index a078e403ab1d..e91dbe596125 100644 --- a/salt/engines/test.py +++ b/salt/engines/test.py @@ -14,26 +14,30 @@ log = logging.getLogger(__name__) -def start(): - ''' - Listen to events and write them to a log file - ''' - if __opts__['__role'] == 'master': +def event_bus_context(opts): + if opts['__role'] == 'master': event_bus = salt.utils.event.get_master_event( - __opts__, - __opts__['sock_dir'], + opts, + opts['sock_dir'], listen=True) else: event_bus = salt.utils.event.get_event( 'minion', - transport=__opts__['transport'], - opts=__opts__, - sock_dir=__opts__['sock_dir'], + transport=opts['transport'], + opts=opts, + sock_dir=opts['sock_dir'], listen=True) log.debug('test engine started') + return event_bus + - while True: - event = event_bus.get_event() - jevent = salt.utils.json.dumps(event) - if event: - log.debug(jevent) +def start(): + ''' + Listen to events and write them to a log file + ''' + with event_bus_context(__opts__) as event_bus: + while True: + event = event_bus.get_event() + jevent = salt.utils.json.dumps(event) + if event: + log.debug(jevent) diff --git a/salt/fileserver/svnfs.py b/salt/fileserver/svnfs.py index 24226a702f1c..0177d1de8b6e 100644 --- a/salt/fileserver/svnfs.py +++ b/salt/fileserver/svnfs.py @@ -471,13 +471,13 @@ def update(): # if there is a change, fire an event if __opts__.get('fileserver_events', False): - event = salt.utils.event.get_event( + with salt.utils.event.get_event( 'master', __opts__['sock_dir'], __opts__['transport'], opts=__opts__, - listen=False) - event.fire_event(data, tagify(['svnfs', 'update'], prefix='fileserver')) + listen=False) as event: + event.fire_event(data, tagify(['svnfs', 'update'], prefix='fileserver')) try: salt.fileserver.reap_fileserver_cache_dir( os.path.join(__opts__['cachedir'], 'svnfs/hash'), diff --git a/salt/utils/cloud.py b/salt/utils/cloud.py index 691ccf1f56b3..be1577a046fd 100644 --- a/salt/utils/cloud.py +++ b/salt/utils/cloud.py @@ -1772,25 +1772,22 @@ def fire_event(key, msg, tag, sock_dir, args=None, transport='zeromq'): ''' Fire deploy action ''' - event = salt.utils.event.get_event( - 'master', - sock_dir, - transport, - listen=False) - - try: - event.fire_event(msg, tag) - except ValueError: - # We're using at least a 0.17.x version of salt - if isinstance(args, dict): - args[key] = msg - else: - args = {key: msg} - event.fire_event(args, tag) + with salt.utils.event.get_event('master', sock_dir, transport, listen=False) as event: + try: + event.fire_event(msg, tag) + except ValueError: + # We're using at least a 0.17.x version of salt + if isinstance(args, dict): + args[key] = msg + else: + args = {key: msg} + event.fire_event(args, tag) + finally: + event.destroy() - # https://github.com/zeromq/pyzmq/issues/173#issuecomment-4037083 - # Assertion failed: get_load () == 0 (poller_base.cpp:32) - time.sleep(0.025) + # https://github.com/zeromq/pyzmq/issues/173#issuecomment-4037083 + # Assertion failed: get_load () == 0 (poller_base.cpp:32) + time.sleep(0.025) def _exec_ssh_cmd(cmd, error_msg=None, allow_failure=False, **kwargs): @@ -2257,19 +2254,19 @@ def check_auth(name, sock_dir=None, queue=None, timeout=300): This function is called from a multiprocess instance, to wait for a minion to become available to receive salt commands ''' - event = salt.utils.event.SaltEvent('master', sock_dir, listen=True) - starttime = time.mktime(time.localtime()) - newtimeout = timeout - log.debug('In check_auth, waiting for %s to become available', name) - while newtimeout > 0: - newtimeout = timeout - (time.mktime(time.localtime()) - starttime) - ret = event.get_event(full=True) - if ret is None: - continue - if ret['tag'] == 'minion_start' and ret['data']['id'] == name: - queue.put(name) - newtimeout = 0 - log.debug('Minion %s is ready to receive commands', name) + with salt.utils.event.SaltEvent('master', sock_dir, listen=True) as event: + starttime = time.mktime(time.localtime()) + newtimeout = timeout + log.debug('In check_auth, waiting for %s to become available', name) + while newtimeout > 0: + newtimeout = timeout - (time.mktime(time.localtime()) - starttime) + ret = event.get_event(full=True) + if ret is None: + continue + if ret['tag'] == 'minion_start' and ret['data']['id'] == name: + queue.put(name) + newtimeout = 0 + log.debug('Minion %s is ready to receive commands', name) def ip_to_int(ip): diff --git a/salt/utils/event.py b/salt/utils/event.py index 4dc336a5d9a1..6dfb7e790a90 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -884,6 +884,12 @@ def __del__(self): except Exception: pass + def __enter__(self): + return self + + def __exit__(self, *args): + self.destroy() + class MasterEvent(SaltEvent): ''' @@ -932,6 +938,15 @@ def fire_event(self, data, tag): if self.print_func is not None: self.print_func(tag, data) + def destroy(self): + self.event.destroy() + + def __enter__(self): + return self + + def __exit__(self, *args): + self.destroy() + class MinionEvent(SaltEvent): ''' diff --git a/salt/utils/reactor.py b/salt/utils/reactor.py index 903283b1389c..69c7dffbec92 100644 --- a/salt/utils/reactor.py +++ b/salt/utils/reactor.py @@ -229,43 +229,43 @@ def run(self): salt.utils.process.appendproctitle(self.__class__.__name__) # instantiate some classes inside our new process - self.event = salt.utils.event.get_event( + with salt.utils.event.get_event( self.opts['__role'], self.opts['sock_dir'], self.opts['transport'], opts=self.opts, - listen=True) - self.wrap = ReactWrap(self.opts) + listen=True) as event: + self.wrap = ReactWrap(self.opts) - for data in self.event.iter_events(full=True): - # skip all events fired by ourselves - if data['data'].get('user') == self.wrap.event_user: - continue - if data['tag'].endswith('salt/reactors/manage/add'): - _data = data['data'] - res = self.add_reactor(_data['event'], _data['reactors']) - self.event.fire_event({'reactors': self.list_all(), - 'result': res}, - 'salt/reactors/manage/add-complete') - elif data['tag'].endswith('salt/reactors/manage/delete'): - _data = data['data'] - res = self.delete_reactor(_data['event']) - self.event.fire_event({'reactors': self.list_all(), - 'result': res}, - 'salt/reactors/manage/delete-complete') - elif data['tag'].endswith('salt/reactors/manage/list'): - self.event.fire_event({'reactors': self.list_all()}, - 'salt/reactors/manage/list-results') - else: - reactors = self.list_reactors(data['tag']) - if not reactors: + for data in event.iter_events(full=True): + # skip all events fired by ourselves + if data['data'].get('user') == self.wrap.event_user: continue - chunks = self.reactions(data['tag'], data['data'], reactors) - if chunks: - try: - self.call_reactions(chunks) - except SystemExit: - log.warning('Exit ignored by reactor') + if data['tag'].endswith('salt/reactors/manage/add'): + _data = data['data'] + res = self.add_reactor(_data['event'], _data['reactors']) + event.fire_event({'reactors': self.list_all(), + 'result': res}, + 'salt/reactors/manage/add-complete') + elif data['tag'].endswith('salt/reactors/manage/delete'): + _data = data['data'] + res = self.delete_reactor(_data['event']) + event.fire_event({'reactors': self.list_all(), + 'result': res}, + 'salt/reactors/manage/delete-complete') + elif data['tag'].endswith('salt/reactors/manage/list'): + event.fire_event({'reactors': self.list_all()}, + 'salt/reactors/manage/list-results') + else: + reactors = self.list_reactors(data['tag']) + if not reactors: + continue + chunks = self.reactions(data['tag'], data['data'], reactors) + if chunks: + try: + self.call_reactions(chunks) + except SystemExit: + log.warning('Exit ignored by reactor') class ReactWrap(object): diff --git a/salt/utils/schedule.py b/salt/utils/schedule.py index 8d2947649f75..ddafbf65edd7 100644 --- a/salt/utils/schedule.py +++ b/salt/utils/schedule.py @@ -254,10 +254,10 @@ def delete_job(self, name, persist=True): log.warning("Cannot delete job %s, it's in the pillar!", name) # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True, - 'schedule': self._get_schedule()}, - tag='/salt/minion/minion_schedule_delete_complete') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + evt.fire_event({'complete': True, + 'schedule': self._get_schedule()}, + tag='/salt/minion/minion_schedule_delete_complete') # remove from self.intervals if name in self.intervals: @@ -287,10 +287,10 @@ def delete_job_prefix(self, name, persist=True): log.warning("Cannot delete job %s, it's in the pillar!", job) # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True, - 'schedule': self._get_schedule()}, - tag='/salt/minion/minion_schedule_delete_complete') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + evt.fire_event({'complete': True, + 'schedule': self._get_schedule()}, + tag='/salt/minion/minion_schedule_delete_complete') # remove from self.intervals for job in list(self.intervals.keys()): @@ -334,10 +334,10 @@ def add_job(self, data, persist=True): self.opts['schedule'].update(data) # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True, - 'schedule': self._get_schedule()}, - tag='/salt/minion/minion_schedule_add_complete') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + evt.fire_event({'complete': True, + 'schedule': self._get_schedule()}, + tag='/salt/minion/minion_schedule_add_complete') if persist: self.persist() @@ -354,10 +354,10 @@ def enable_job(self, name, persist=True): log.warning("Cannot modify job %s, it's in the pillar!", name) # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True, - 'schedule': self._get_schedule()}, - tag='/salt/minion/minion_schedule_enabled_job_complete') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + evt.fire_event({'complete': True, + 'schedule': self._get_schedule()}, + tag='/salt/minion/minion_schedule_enabled_job_complete') if persist: self.persist() @@ -373,11 +373,11 @@ def disable_job(self, name, persist=True): elif name in self._get_schedule(include_opts=False): log.warning("Cannot modify job %s, it's in the pillar!", name) - # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True, - 'schedule': self._get_schedule()}, - tag='/salt/minion/minion_schedule_disabled_job_complete') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + # Fire the complete event back along with updated list of schedule + evt.fire_event({'complete': True, + 'schedule': self._get_schedule()}, + tag='/salt/minion/minion_schedule_disabled_job_complete') if persist: self.persist() @@ -451,10 +451,10 @@ def enable_schedule(self): self.opts['schedule']['enabled'] = True # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True, - 'schedule': self._get_schedule()}, - tag='/salt/minion/minion_schedule_enabled_complete') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + evt.fire_event({'complete': True, + 'schedule': self._get_schedule()}, + tag='/salt/minion/minion_schedule_enabled_complete') def disable_schedule(self): ''' @@ -463,10 +463,10 @@ def disable_schedule(self): self.opts['schedule']['enabled'] = False # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True, - 'schedule': self._get_schedule()}, - tag='/salt/minion/minion_schedule_disabled_complete') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + evt.fire_event({'complete': True, + 'schedule': self._get_schedule()}, + tag='/salt/minion/minion_schedule_disabled_complete') def reload(self, schedule): ''' @@ -491,9 +491,9 @@ def list(self, where): schedule = self._get_schedule() # Fire the complete event back along with the list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True, 'schedule': schedule}, - tag='/salt/minion/minion_schedule_list_complete') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + evt.fire_event({'complete': True, 'schedule': schedule}, + tag='/salt/minion/minion_schedule_list_complete') def save_schedule(self): ''' @@ -502,9 +502,9 @@ def save_schedule(self): self.persist() # Fire the complete event back along with the list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True}, - tag='/salt/minion/minion_schedule_saved') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + evt.fire_event({'complete': True}, + tag='/salt/minion/minion_schedule_saved') def postpone_job(self, name, data): ''' @@ -531,10 +531,10 @@ def postpone_job(self, name, data): log.warning("Cannot modify job %s, it's in the pillar!", name) # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True, - 'schedule': self._get_schedule()}, - tag='/salt/minion/minion_schedule_postpone_job_complete') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + evt.fire_event({'complete': True, + 'schedule': self._get_schedule()}, + tag='/salt/minion/minion_schedule_postpone_job_complete') def skip_job(self, name, data): ''' @@ -555,10 +555,10 @@ def skip_job(self, name, data): log.warning("Cannot modify job %s, it's in the pillar!", name) # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True, - 'schedule': self._get_schedule()}, - tag='/salt/minion/minion_schedule_skip_job_complete') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + evt.fire_event({'complete': True, + 'schedule': self._get_schedule()}, + tag='/salt/minion/minion_schedule_skip_job_complete') def get_next_fire_time(self, name, fmt='%Y-%m-%dT%H:%M:%S'): ''' @@ -573,9 +573,9 @@ def get_next_fire_time(self, name, fmt='%Y-%m-%dT%H:%M:%S'): _next_fire_time = _next_fire_time.strftime(fmt) # Fire the complete event back along with updated list of schedule - evt = salt.utils.event.get_event('minion', opts=self.opts, listen=False) - evt.fire_event({'complete': True, 'next_fire_time': _next_fire_time}, - tag='/salt/minion/minion_schedule_next_fire_time_complete') + with salt.utils.event.get_event('minion', opts=self.opts, listen=False) as evt: + evt.fire_event({'complete': True, 'next_fire_time': _next_fire_time}, + tag='/salt/minion/minion_schedule_next_fire_time_complete') def job_status(self, name): ''' @@ -694,15 +694,14 @@ def handle_func(self, multiprocessing_enabled, func, data): jid = salt.utils.jid.gen_jid(self.opts) tag = salt.utils.event.tagify(jid, prefix='salt/scheduler/') - event = salt.utils.event.get_event( + namespaced_event = salt.utils.event.NamespacedEvent( + salt.utils.event.get_event( self.opts['__role'], self.opts['sock_dir'], self.opts['transport'], opts=self.opts, - listen=False) - - namespaced_event = salt.utils.event.NamespacedEvent( - event, + listen=False, + ), tag, print_func=None ) @@ -804,6 +803,11 @@ def handle_func(self, multiprocessing_enabled, func, data): event.fire_event(load, '__schedule_return') except Exception as exc: log.exception('Unhandled exception firing __schedule_return event') + finally: + event.destroy() + + if self.opts['__role'] == 'master': + namespaced_event.destroy() if not self.standalone: log.debug('schedule.handle_func: Removing %s', proc_fn) diff --git a/tests/integration/client/test_kwarg.py b/tests/integration/client/test_kwarg.py index bb30ccb63d9e..a82d98da0e72 100644 --- a/tests/integration/client/test_kwarg.py +++ b/tests/integration/client/test_kwarg.py @@ -10,6 +10,9 @@ from salt.ext import six +TIMEOUT = 600 + + class StdTest(ModuleCase): ''' Test standard client calls @@ -81,12 +84,15 @@ def test_kwarg_type(self): ''' terrible_yaml_string = 'foo: ""\n# \'' ret = self.client.cmd_full_return( - 'minion', - 'test.arg_type', - ['a', 1], - kwarg={'outer': {'a': terrible_yaml_string}, - 'inner': 'value'} - ) + 'minion', + 'test.arg_type', + ['a', 1], + kwarg={ + 'outer': {'a': terrible_yaml_string}, + 'inner': 'value' + }, + timeout=TIMEOUT, + ) data = ret['minion']['ret'] self.assertIn(six.text_type.__name__, data['args'][0]) self.assertIn('int', data['args'][1]) @@ -94,7 +100,9 @@ def test_kwarg_type(self): self.assertIn(six.text_type.__name__, data['kwargs']['inner']) def test_full_return_kwarg(self): - ret = self.client.cmd('minion', 'test.ping', full_return=True) + ret = self.client.cmd( + 'minion', 'test.ping', full_return=True, timeout=TIMEOUT, + ) for mid, data in ret.items(): self.assertIn('retcode', data) @@ -107,7 +115,9 @@ def test_cmd_arg_kwarg_parsing(self): ], kwarg={ 'quux': 'Quux', - }) + }, + timeout=TIMEOUT, + ) self.assertEqual(ret['minion'], { 'args': ['foo'],