diff --git a/examples/example_decorator_magic.py b/examples/example_decorator_magic.py index ba06281..018fb37 100644 --- a/examples/example_decorator_magic.py +++ b/examples/example_decorator_magic.py @@ -1,5 +1,7 @@ import os import sys +import json +import time sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -9,40 +11,40 @@ box = Meesee() -@box.worker() -def foobar(item, worker_id): - print('func: foobar, worker_id: {}, item: {}'.format(worker_id, item)) - - -@box.worker() -def name_of_the_function(item, worker_id): - print('func: name_of_the_function, worker_id: {}, item: {}'.format(worker_id, item)) - - -@box.worker(queue="passed_name") -def passed_name_not_this_one(item, worker_id): - print('func: passed_name_not_this_one, worker_id: {}, item: {}'.format(worker_id, item)) - - -@box.produce(queue="passed_name") -def produce_some_items(amount): - yield from range(amount) - - @box.produce() -def produce_to_foo(items): +def produce_to_process_data(items): return items -@box.worker_producer(input_queue="foo", output_queue="foobar") -def foo(item, worker_id): - print(f"{worker_id} {item} foo pass it too foobar") +@box.worker_producer(output_queue="foobar") +def process_data(item, worker_id): + item = json.loads(item) + wait = item["wait"] + print(f"{worker_id} processing: {item} for {wait} seconds and send it too foobar") + item["name"] = f"{item['name']}_processed" + time.sleep(wait) return [item,] +@box.collector(wait=1, until=5) +def foobar(items): + return items + + if __name__ == '__main__': workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10 - produce_some_items(10) - items = [{"name": f"name{i}"} for i in range(10)] - produce_to_foo(items) - box.push_button(workers, wait=1) + wait = int(sys.argv[sys.argv.index('--wait') + 1]) if '--wait' in sys.argv else 5 + items = [{"name": f"name{i}", "wait": wait} for i in range(10)] + print(f"sending {len(items)} tasks to {workers} workers") + print(f"simulate processing with with a wait of {wait}") + start = time.time() + + produce_to_process_data(items) + box.push_button(workers, wait=0.1) + + result = foobar() + print(result) + print("-----") + result = foobar() + print(result) + print(f"done with running took: {round(time.time()- start, 2)}") diff --git a/meesee.py b/meesee.py index 209326c..95534c2 100644 --- a/meesee.py +++ b/meesee.py @@ -219,6 +219,26 @@ def decorator(func): return func return decorator + def collector(self, wait=10, until=float('inf'), queue=None): + def decorator(func): + def wrapper(*args, **kwargs): + parsed_name = queue if queue is not None else self.parse_func_name(func) + config = { + "key": parsed_name, + "namespace": self.namespace, + "timeout": kwargs.pop("wait", None) or wait, + "redis_config": self.redis_config, + } + r = RedisQueue(**config) + results = [] + for _, item in r: + results.append(item.decode('utf-8')) + if len(results) >= until: + break + return func(results) + return wrapper + return decorator + def start_workers(self, workers=10, config=config): n_workers = len(self._worker_funcs) if n_workers == 0: