Skip to content

Commit

Permalink
Merge pull request #16 from fcfangcc/v0.2
Browse files Browse the repository at this point in the history
mod: 配置修改为配置类
  • Loading branch information
fcfangcc authored Aug 11, 2022
2 parents 003c805 + ef03b33 commit 8446a4a
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 149 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ pip install pyxxl
```python
import asyncio

from pyxxl import PyxxlRunner
from pyxxl import ExecutorConfig, PyxxlRunner

app = PyxxlRunner(
"http://localhost:8080/xxl-job-admin/api/",
executor_name="xxl-job-executor-sample",
port=9999,
host="172.17.0.1",
config = ExecutorConfig(
xxl_admin_baseurl="http://localhost:8080/xxl-job-admin/api/",
executor_app_name="xxl-job-executor-sample",
executor_host="172.17.0.1",
)

app = PyxxlRunner(config)

@app.handler.register(name="demoJobHandler")
async def test_task():
await asyncio.sleep(5)
Expand Down
3 changes: 3 additions & 0 deletions docs/docs/apis/config.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ExecutorConfig

::: pyxxl.ExecutorConfig
17 changes: 11 additions & 6 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ nav:
- Example: 'example.md'
- ClassDocs:
- apis/runner.md
- apis/config.md
- apis/executor.md
- apis/run_data.md
- Changelog: 'changelog.md'
Expand All @@ -22,12 +23,16 @@ plugins:
options:
show_source: false
merge_init_into_class: true
show_if_no_docstring: true
show_if_no_docstring: false
show_signature_annotations: true

markdown_extensions:
- mdx_include:
base_path: docs
- pymdownx.highlight:
anchor_linenums: true
- pymdownx.superfences
- mdx_include:
base_path: docs
- pymdownx.highlight:
anchor_linenums: true
- pymdownx.superfences
- pymdownx.inlinehilite
- pymdownx.snippets
- pymdownx.details
- admonition
13 changes: 7 additions & 6 deletions example/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@
import logging
import time

from pyxxl import PyxxlRunner
from pyxxl import ExecutorConfig, PyxxlRunner
from pyxxl.ctx import g
from pyxxl.utils import setup_logging


setup_logging(logging.DEBUG)

app = PyxxlRunner(
"http://localhost:8080/xxl-job-admin/api/",
executor_name="xxl-job-executor-sample",
port=9999,
host="172.17.0.1",
config = ExecutorConfig(
xxl_admin_baseurl="http://localhost:8080/xxl-job-admin/api/",
executor_app_name="xxl-job-executor-sample",
executor_host="172.17.0.1",
)

app = PyxxlRunner(config)


@app.handler.register(name="demoJobHandler")
async def test_task():
Expand Down
7 changes: 3 additions & 4 deletions example/gunicorn_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
from fastapi import FastAPI

from pyxxl import JobHandler
from pyxxl.utils import setup_logging


logger = logging.getLogger("pyxxl")
handler = logging.StreamHandler()
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
setup_logging(logging.DEBUG)


app = FastAPI()
xxl_handler = JobHandler()
Expand Down
14 changes: 8 additions & 6 deletions example/gunicorn_app/gunicorn.conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from multiprocessing.util import _exit_function

from pyxxl import PyxxlRunner
from pyxxl import ExecutorConfig, PyxxlRunner


bind = ["0.0.0.0:8000"]
Expand All @@ -18,11 +18,13 @@ def when_ready(server):
from app import xxl_handler

atexit.unregister(_exit_function)
runner = PyxxlRunner(
"http://localhost:8080/xxl-job-admin/api/",
executor_name="xxl-job-executor-sample",
host="172.17.0.1",
handler=xxl_handler,

config = ExecutorConfig(
xxl_admin_baseurl="http://localhost:8080/xxl-job-admin/api/",
executor_app_name="xxl-job-executor-sample",
executor_host="172.17.0.1",
)

runner = PyxxlRunner(config, handler=xxl_handler)
server.pyxxl_runner = runner
runner.run_with_daemon()
61 changes: 16 additions & 45 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pyxxl"
version = "0.1.7"
version = "0.2.1"
readme = "README.md"
license = "GPL-3.0-only"
description = "A Python executor for XXL-jobs"
Expand All @@ -20,6 +20,7 @@ mkdocs = { version = "1.3.0", optional = true }
mkdocstrings = {version = "^0.19.0", optional = true, extras=["python"]}
mkdocs-material = {version = "^8.3.2", optional = true}
mdx-include = {version = "^1.4.1", optional = true}
python-dotenv = {version = "*", optional = true}

[tool.poetry.dev-dependencies]
pytest = "7.1.2"
Expand All @@ -34,6 +35,7 @@ pytest-cov = "3.0.0"

[tool.poetry.extras]
doc = ["mkdocs","mkdocstrings","mkdocs-material","mdx-include"]
dotenv = ["python-dotenv"]

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
1 change: 1 addition & 0 deletions pyxxl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .executor import JobHandler
from .main import PyxxlRunner
from .setting import ExecutorConfig


__version__ = importlib.metadata.version("pyxxl")
32 changes: 17 additions & 15 deletions pyxxl/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pyxxl.ctx import g
from pyxxl.enum import executorBlockStrategy
from pyxxl.schema import HandlerInfo, RunData
from pyxxl.setting import ExecutorConfig
from pyxxl.types import DecoratedCallable
from pyxxl.xxl_client import XXL

Expand Down Expand Up @@ -55,33 +56,32 @@ class Executor:
def __init__(
self,
xxl_client: XXL,
config: ExecutorConfig,
*,
handler: Optional[JobHandler] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
max_workers: int = 20,
task_timeout: int = 60 * 60,
max_queue_length: int = 30,
) -> None:
"""执行器,真正的调度任务和策略都在这里
Args:
xxl_client (XXL): xxl客户端
config (ExecutorConfig): 配置参数
handler (Optional[JobHandler], optional): Defaults to None.
loop (Optional[asyncio.AbstractEventLoop], optional): Defaults to None.
max_workers (int, optional): 执行同步任务的线程池. Defaults to 20.
task_timeout (int, optional): 全局的任务超时配置,如果executorTimeout参数存在,以executorTimeout为准. Defaults to 60*60.
max_queue_length (int, optional): 单机串行的队列长度,当阻塞的任务大于此值时会抛弃. Defaults to 30.
"""

self.xxl_client = xxl_client
self.config = config

self.handler: JobHandler = handler or JobHandler()
self.loop = loop or asyncio.get_event_loop()
self.tasks: Dict[int, asyncio.Task] = {}
self.queue: Dict[int, List[RunData]] = defaultdict(list)
self.lock = asyncio.Lock()
self.handler: JobHandler = handler or JobHandler()
self.thread_pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="pyxxl_pool")
self.task_timeout = task_timeout
# 串行队列的最长长度
self.max_queue_length = max_queue_length
self.thread_pool = ThreadPoolExecutor(
max_workers=self.config.max_workers,
thread_name_prefix="pyxxl_pool",
)

async def shutdown(self) -> None:
for _, task in self.tasks.items():
Expand All @@ -108,13 +108,13 @@ async def run_job(self, run_data: RunData) -> None:
await self._cancel(run_data.jobId)
elif run_data.executorBlockStrategy == executorBlockStrategy.SERIAL_EXECUTION.value:

if len(self.queue[run_data.jobId]) >= self.max_queue_length:
if len(self.queue[run_data.jobId]) >= self.config.task_queue_length:
msg = (
"job {job_id} is SERIAL, queue length more than {max_length}."
"logId {log_id} discard!".format(
job_id=run_data.jobId,
log_id=run_data.logId,
max_length=self.max_queue_length,
max_length=self.config.task_queue_length,
)
)
logger.error(msg)
Expand All @@ -126,7 +126,7 @@ async def run_job(self, run_data: RunData) -> None:
job_id=run_data.jobId,
log_id=run_data.logId,
ranked=len(queue) + 1,
max_length=self.max_queue_length,
max_length=self.config.task_queue_length,
)
)
queue.append(run_data)
Expand Down Expand Up @@ -160,7 +160,7 @@ async def _run(self, handler: HandlerInfo, start_time: int, data: RunData) -> No
handler.handler,
)
)
result = await asyncio.wait_for(func, data.executorTimeout or self.task_timeout)
result = await asyncio.wait_for(func, data.executorTimeout or self.config.task_timeout)
logger.info("Job finished jobId=%s logId=%s" % (data.jobId, data.logId))
await self.xxl_client.callback(data.logId, start_time, code=200, msg=result)
except asyncio.CancelledError as e:
Expand Down Expand Up @@ -191,6 +191,8 @@ async def _cancel(self, job_id: int) -> None:
logger.warning("Job %s cancelled." % job_id)

async def graceful_close(self, timeout: int = 60) -> None:
"""优雅关闭"""

async def _graceful_close() -> None:
while len(self.tasks) > 0:
await asyncio.wait(self.tasks.values())
Expand Down
Loading

0 comments on commit 8446a4a

Please sign in to comment.