-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrest_grouping.py
115 lines (80 loc) · 4.01 KB
/
rest_grouping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# -*- coding: utf-8 -*-
"""rest_grouping.py
This module implements:
* ``RestTaskLoader`` class, which is used to load a group of RESTful services (call tasks) from a JSON payload into ``TaskContainer``.
* ``start`` function, which is the main entry for the client to call a group of RESTful services.
This module was originally shipped as an example code from https://github.com/DataBooster/PyWebApi, licensed under the MIT license.
Anyone who obtains a copy of this code is welcome to modify it for any purpose, and holds all rights to the modified part only.
The above license notice and permission notice shall be included in all copies or substantial portions of the Software.
"""
from collections.abc import Mapping
from typing import List, Tuple, Dict, Any
from concurrent.futures import ThreadPoolExecutor
from task_grouping import TaskContainer, ITaskLoader
from simple_rest_call import rest
_reserved_key_parallel_group : str = "[###]"
_reserved_key_serial_group : str = "[+++]"
_reserved_key_rest_url : str = "(://)"
_reserved_key_headers : str = "(:^:)"
_reserved_key_payload : str = "(...)"
_reserved_key_payload_with_pipe : str = "(.|.)"
_reserved_key_timeout : str = "(:!!)"
def _task_func(url:str, data:dict=None, timeout:float=None, headers:dict=None):
return rest(url, data, timeout=timeout, headers=headers)
def _pipeargs_merge_fn(kw_args:Dict[str, Any], pipe_args:Dict[str, Any]) -> Dict[str, Any]:
if pipe_args and isinstance(pipe_args, Mapping):
merged_args = kw_args.copy() if kw_args else {}
payload = kw_args.get('data', {})
if payload is None:
payload = {}
if isinstance(payload, Mapping):
merged_args['data'] = payload.copy().update(pipe_args)
return merged_args
else:
return kw_args
class RestTaskLoader(ITaskLoader):
"""This class is used to load a group of RESTful services (call tasks) from a JSON payload into ``TaskContainer``"""
def __init__(self, thread_pool:ThreadPoolExecutor):
self.thread_pool = thread_pool
def create_base_container(self) -> TaskContainer:
return TaskContainer(_task_func, _pipeargs_merge_fn, self.thread_pool)
@staticmethod
def _get_timeout(task_node:Dict[str, Any]) -> float:
timeout = task_node.get(_reserved_key_timeout)
if isinstance(timeout, (int, float)) and timeout > 0:
return timeout
else:
return None
def extract_single_task(self, task_node:Dict[str, Any]) -> Tuple[tuple, Dict[str, Any], bool]:
url = task_node.get(_reserved_key_rest_url)
if url:
headers = task_node.get(_reserved_key_headers)
data = task_node.get(_reserved_key_payload)
if data is None:
data = {}
data_ = task_node.get(_reserved_key_payload_with_pipe)
if data_ is not None or _reserved_key_payload_with_pipe in task_node:
if data_:
data.update(data_)
with_pipe = True
else:
with_pipe = False
timeout = self._get_timeout(task_node)
return ((), {'url': url, 'data': data, 'timeout': timeout, 'headers': headers}, with_pipe)
else:
return None
def extract_serial_group(self, task_node:Dict[str, Any]) -> List[Dict[str, Any]]:
return task_node.get(_reserved_key_serial_group)
def extract_parallel_group(self, task_node:Dict[str, Any]) -> List[Dict[str, Any]]:
return task_node.get(_reserved_key_parallel_group)
def load(self, task_tree:Dict[str, Any]) -> TaskContainer:
container = super().load(task_tree)
container.timeout = self._get_timeout(task_tree)
return container
def start(rest:Dict[str, Any]):
"""the main entry for the client to call a group of RESTful services."""
with ThreadPoolExecutor(max_workers=64) as thread_pool:
loader = RestTaskLoader(thread_pool)
container = loader.load(rest)
return container.run()
__version__ = "0.1"