[TOC]
$ pip install ugly-code
- 自动注入命令行参数到函数 (
ugly_code.cmd.Command
)
创建测试文件 cmd_debug.py
from ugly_code.cmd import Command
@Command
def main(x:int, y ,z=1023):
"""
测试一下
"""
print("{x} \t {y} \t {z}".format(**locals()))
if __name__ == '__main__':
globals()['main']()
执行该文件
$ python cmd_debug.py -x 1023 -y 333
1023 333 1023
有默认值的参数会被设置为可选参数,无默认值则设置为必选.
使用Type Hints的参数可自动进行类型检查.
- 创建命令行工具组
编辑文件cmd_debug.py
from ugly_code.cmd import CMDHolder
@CMDHolder.command("test", "测试")
def main(x: int, b: str, c: int=1):
"""
测试一下
"""
print("{x} \t {y} \t {z}".format(**locals()))
@CMDHolder.command("echo", "echo")
def echo(words):
"""echo"""
print(words)
if __name__ == '__main__':
CMDHolder(__name__).execute()
执行该文件:
$ python3 cmd_debug.py echo -words "测试"
测试
$ python3 cmd_debug.py test
usage: Command line create by ugly-code.
测试一下
[-h] -x X -b B [-c C]
Command line create by ugly-code.
测试一下
: error: the following arguments are required: -x, -b
由示例可发现,CMDHolder可以持有多个命令行工具,根据不同的参数调用不同的命令行对象。而且还可以自定义命令行工具的名称与介绍。
ugly_code.runit
多任务管理工具。使用 Runner
和Worker
管理多进程程序。支持XML-RPC控制任务启停。
一个Worker对应一个进程,各进程使用tag区分。
使用方式: 注册任务到Runner,启动Runner即可自动\手动启动任务或使用XML-RPC接口对任务进行调度管理。
import threading
import time
from ugly_code.runit import Switch, Worker, Runner
class AWorker(Worker):
def serve(self):
while self.switch.on:
print(f"{self.switch.name} {time.time()}")
time.sleep(3.0)
def close_it(switch: Switch):
time.sleep(10)
switch.close()
if __name__ == '__main__':
st = Switch('Runner')
threading.Thread(target=close_it, args=(st,)).start()
Runner(st, (("a", AWorker), ('b', AWorker)), m=('127.0.0.1', 0)).run()
import threading
import time
from ugly_code.runit import Worker, Runner, RuntItMonitor
class AWorker(Worker):
def serve(self):
while self.switch.on:
print(f"{self.switch.name} {time.time()}")
time.sleep(3.0)
def start_and_close_task(m, tag):
time.sleep(10)
monitor = RuntItMonitor(m)
monitor.start(tag)
monitor.start(tag)
time.sleep(3)
print(monitor.stats())
monitor.close(tag, True)
time.sleep(3)
monitor.prune()
print(monitor.stats())
monitor.shutdown()
if __name__ == '__main__':
m = ('127.0.0.1', 65432)
runner = Runner(None, (("a", AWorker, 0, 0, False), ('b', AWorker)), m=m)
threading.Thread(target=start_and_close_task, args=("http://{}:{}".format(*m), "a",)).start()
runner.run()
import queue
import threading
import time
from ugly_code.runit import Switch
from ugly_code.runit.extra import QueueInterface, MultiprocessQueueWorker
class TestQueue(QueueInterface[int]):
def __init__(self):
super().__init__()
self.q = queue.Queue()
def empty(self) -> bool:
return self.q.empty()
def pop(self, block: bool = True, timeout: float | None = None) -> int:
return self.q.get(block, timeout)
def put(self, t: int):
return self.q.put(t)
def handle_sleep_time(sleep_time: float):
time.sleep(sleep_time)
class TestMultiprocessQueueWorker:
def run_it(self):
on = Switch('test')
q = TestQueue()
threading.Thread(target=self.stop_it, args=(on,)).start()
q.put(1)
q.put(4)
q.put(5)
MultiprocessQueueWorker(on, q, handle_sleep_time, max_workers=1).serve()
def stop_it(self, on: Switch, timeout: float = 15.0):
time.sleep(timeout)
on.close()
from ugly_code.ex import ObjectProxy
obj = ObjectProxy(dict(a=1, b=2, c=3, d=dict(a=1, b=2)))
print(obj.d.a)
$ pip install ugly-code[rabbit]
import threading
import time
from pika import BasicProperties
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic
from ugly_code.rabbit import ListenOpt, default_listen_ctx, RabbitListenCtx
opt = ListenOpt(
queue='test',
uri="amqp://guest:[email protected]:5672/test",
exchange='test',
ext='direct',
routing_key='test',
durable=True,
prefetch=1,
retry_delay=1.0,
retry=False
)
def handle_message(channel: BlockingChannel, deliver: Basic.Deliver, props: BasicProperties, body: bytes):
print(body)
channel.connection.process_data_events(3)
def close_it(val: RabbitListenCtx):
time.sleep(10)
val.shutdown()
with default_listen_ctx(opt).start(handle_message) as ctx:
threading.Thread(target=close_it, args=(ctx,)).start()