Skip to content

Commit

Permalink
Rename Worker pools -> work pools (#8107)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexander Streed <[email protected]>
  • Loading branch information
2 people authored and madkinsz committed Jan 12, 2023
1 parent aad511e commit 1a5c461
Show file tree
Hide file tree
Showing 39 changed files with 2,616 additions and 1,596 deletions.
194 changes: 97 additions & 97 deletions src/prefect/client/orion.py

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions src/prefect/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ def load_deployments_from_yaml(
return registry


@experimental_field("worker_pool_name", group="workers", when=lambda x: x is not None)
@experimental_field("work_pool_name", group="workers", when=lambda x: x is not None)
@experimental_field(
"worker_pool_queue_name",
"work_pool_queue_name",
group="workers",
when=lambda x: x is not None,
stacklevel=4,
Expand Down Expand Up @@ -288,8 +288,8 @@ def _editable_fields(self) -> List[str]:
"description",
"version",
"work_queue_name",
"worker_pool_name",
"worker_pool_queue_name",
"work_pool_name",
"work_pool_queue_name",
"tags",
"parameters",
"schedule",
Expand Down Expand Up @@ -390,11 +390,11 @@ def _yaml_dict(self) -> dict:
description="The work queue for the deployment.",
yaml_comment="The work queue that will handle this deployment's runs",
)
worker_pool_name: Optional[str] = Field(
default=None, description="The worker pool for the deployment"
work_pool_name: Optional[str] = Field(
default=None, description="The work pool for the deployment"
)
worker_pool_queue_name: Optional[str] = Field(
default=None, description="The worker pool queue for the deployment."
work_pool_queue_name: Optional[str] = Field(
default=None, description="The work pool queue for the deployment."
)
# flow data
parameters: Dict[str, Any] = Field(default_factory=dict)
Expand Down Expand Up @@ -653,8 +653,8 @@ async def apply(
flow_id=flow_id,
name=self.name,
work_queue_name=self.work_queue_name,
worker_pool_name=self.worker_pool_name,
worker_pool_queue_name=self.worker_pool_queue_name,
work_pool_name=self.work_pool_name,
work_pool_queue_name=self.work_pool_queue_name,
version=self.version,
schedule=self.schedule,
parameters=self.parameters,
Expand Down
6 changes: 3 additions & 3 deletions src/prefect/experimental/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ async def start(
worker_name: str = typer.Option(
None, "-n", "--name", help="The name to give to the started worker."
),
worker_pool_name: str = typer.Option(
..., "-p", "--pool", help="The worker pool the started worker should join."
work_pool_name: str = typer.Option(
..., "-p", "--pool", help="The work pool the started worker should join."
),
worker_type: str = typer.Option(
"process", "-t", "--type", help="The type of worker to start."
Expand All @@ -52,7 +52,7 @@ async def start(
)
async with Worker(
name=worker_name,
worker_pool_name=worker_pool_name,
work_pool_name=work_pool_name,
limit=limit,
prefetch_seconds=prefetch_seconds,
) as worker:
Expand Down
60 changes: 29 additions & 31 deletions src/prefect/experimental/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BaseWorker(abc.ABC):
@experimental(feature="The workers feature", group="workers")
def __init__(
self,
worker_pool_name: str,
work_pool_name: str,
name: Optional[str] = None,
prefetch_seconds: Optional[float] = None,
workflow_storage_path: Optional[Path] = None,
Expand All @@ -56,14 +56,14 @@ def __init__(
The name is used to identify the worker in the UI; if two
processes have the same name, they will be treated as the same
worker.
worker_pool_name: The name of the worker pool to use. If not
work_pool_name: The name of the work pool to use. If not
provided, the default will be used.
prefetch_seconds: The number of seconds to prefetch flow runs for.
workflow_storage_path: The filesystem path to workflow storage for
this worker.
create_pool_if_not_found: Whether to create the worker pool
create_pool_if_not_found: Whether to create the work pool
if it is not found. Defaults to `True`, but can be set to `False` to
ensure that worker pools are not created accidentally.
ensure that work pools are not created accidentally.
limit: The maximum number of flow runs this worker should be running at
a given time.
"""
Expand All @@ -74,7 +74,7 @@ def __init__(

self.is_setup = False
self._create_pool_if_not_found = create_pool_if_not_found
self._worker_pool_name = worker_pool_name
self._work_pool_name = work_pool_name

self._prefetch_seconds: float = (
prefetch_seconds or PREFECT_WORKER_PREFETCH_SECONDS.value()
Expand All @@ -83,7 +83,7 @@ def __init__(
workflow_storage_path or PREFECT_WORKER_WORKFLOW_STORAGE_PATH.value()
)

self._worker_pool: Optional[schemas.core.WorkerPool] = None
self._work_pool: Optional[schemas.core.WorkPool] = None
self._runs_task_group: Optional[anyio.abc.TaskGroup] = None
self._client: Optional[OrionClient] = None
self._limit = limit
Expand Down Expand Up @@ -152,48 +152,46 @@ async def get_and_submit_flow_runs(self):
runs_response = await self._get_scheduled_flow_runs()
return await self._submit_scheduled_flow_runs(flow_run_response=runs_response)

async def _update_local_worker_pool_info(self):
async def _update_local_work_pool_info(self):
try:
worker_pool = await self._client.read_worker_pool(
worker_pool_name=self._worker_pool_name
work_pool = await self._client.read_work_pool(
work_pool_name=self._work_pool_name
)
except ObjectNotFound:
if self._create_pool_if_not_found:
worker_pool = await self._client.create_worker_pool(
worker_pool=schemas.actions.WorkerPoolCreate(
name=self._worker_pool_name, type=self.type
work_pool = await self._client.create_work_pool(
work_pool=schemas.actions.WorkPoolCreate(
name=self._work_pool_name, type=self.type
)
)
self._logger.info(f"Worker pool {self._worker_pool_name!r} created.")
self._logger.info(f"Worker pool {self._work_pool_name!r} created.")
else:
self._logger.warning(
f"Worker pool {self._worker_pool_name!r} not found!"
)
self._logger.warning(f"Worker pool {self._work_pool_name!r} not found!")
return

# if the remote config type changes (or if it's being loaded for the
# first time), check if it matches the local type and warn if not
if getattr(self._worker_pool, "type", 0) != worker_pool.type:
if worker_pool.type != self.__class__.type:
if getattr(self._work_pool, "type", 0) != work_pool.type:
if work_pool.type != self.__class__.type:
self._logger.warning(
f"Worker type mismatch! This worker process expects type "
f"{self.type!r} but received {worker_pool.type!r}"
f"{self.type!r} but received {work_pool.type!r}"
" from the server. Unexpected behavior may occur."
)
self._worker_pool = worker_pool
self._work_pool = work_pool

async def _send_worker_heartbeat(self):
if self._worker_pool:
if self._work_pool:
await self._client.send_worker_heartbeat(
worker_pool_name=self._worker_pool_name, worker_name=self.name
work_pool_name=self._work_pool_name, worker_name=self.name
)

async def sync_with_backend(self):
"""
Updates the worker's local information about it's current worker pool and
Updates the worker's local information about it's current work pool and
queues. Sends a worker heartbeat to the API.
"""
await self._update_local_worker_pool_info()
await self._update_local_work_pool_info()

await self._send_worker_heartbeat()

Expand Down Expand Up @@ -259,16 +257,16 @@ async def _get_scheduled_flow_runs(
self,
) -> List[schemas.responses.WorkerFlowRunResponse]:
"""
Retrieve scheduled flow runs from the worker pool's queues.
Retrieve scheduled flow runs from the work pool's queues.
"""
scheduled_before = pendulum.now("utc").add(seconds=int(self._prefetch_seconds))
self._logger.debug(
f"Querying for flow runs scheduled before {scheduled_before}"
)
try:
scheduled_flow_runs = (
await self._client.get_scheduled_flow_runs_for_worker_pool_queues(
worker_pool_name=self._worker_pool_name,
await self._client.get_scheduled_flow_runs_for_work_pool_queues(
work_pool_name=self._work_pool_name,
scheduled_before=scheduled_before,
)
)
Expand Down Expand Up @@ -398,12 +396,12 @@ async def _submit_run_and_capture_errors(
def get_status(self):
"""
Retrieves the status of the current worker including its name, current worker
pool, the worker pool queues it is polling, and its local settings.
pool, the work pool queues it is polling, and its local settings.
"""
return {
"name": self.name,
"worker_pool": self._worker_pool.dict(json_compatible=True)
if self._worker_pool is not None
"work_pool": self._work_pool.dict(json_compatible=True)
if self._work_pool is not None
else None,
"settings": {
"prefetch_seconds": self._prefetch_seconds,
Expand Down Expand Up @@ -485,4 +483,4 @@ async def __aexit__(self, *exc_info):
await self.teardown(*exc_info)

def __repr__(self):
return f"Worker(pool={self._worker_pool_name!r}, name={self.name!r})"
return f"Worker(pool={self._work_pool_name!r}, name={self.name!r})"
4 changes: 2 additions & 2 deletions src/prefect/orion/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ async def clear_database(
response.status_code = status.HTTP_400_BAD_REQUEST
return
async with db.session_context(begin_transaction=True) as session:
# worker pool has a circular dependency on pool queue; delete it first
await session.execute(db.WorkerPool.__table__.delete())
# work pool has a circular dependency on pool queue; delete it first
await session.execute(db.WorkPool.__table__.delete())
for table in reversed(db.Base.metadata.sorted_tables):
await session.execute(table.delete())

Expand Down
42 changes: 21 additions & 21 deletions src/prefect/orion/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,26 @@ async def create_deployment(
async with db.session_context(begin_transaction=True) as session:
# hydrate the input model into a full model
deployment_dict = deployment.dict(
exclude={"worker_pool_name", "worker_pool_queue_name"}
exclude={"work_pool_name", "work_pool_queue_name"}
)
if deployment.worker_pool_name and deployment.worker_pool_queue_name:
if deployment.work_pool_name and deployment.work_pool_queue_name:
# If a specific pool name/queue name combination was provided, get the
# ID for that worker pool queue.
# ID for that work pool queue.
deployment_dict[
"worker_pool_queue_id"
] = await worker_lookups._get_worker_pool_queue_id_from_name(
"work_pool_queue_id"
] = await worker_lookups._get_work_pool_queue_id_from_name(
session=session,
worker_pool_name=deployment.worker_pool_name,
worker_pool_queue_name=deployment.worker_pool_queue_name,
work_pool_name=deployment.work_pool_name,
work_pool_queue_name=deployment.work_pool_queue_name,
)
elif deployment.worker_pool_name:
elif deployment.work_pool_name:
# If just a pool name was provided, get the ID for its default
# worker pool queue.
# work pool queue.
deployment_dict[
"worker_pool_queue_id"
] = await worker_lookups._get_default_worker_pool_queue_id_from_worker_pool_name(
"work_pool_queue_id"
] = await worker_lookups._get_default_work_pool_queue_id_from_work_pool_name(
session=session,
worker_pool_name=deployment.worker_pool_name,
work_pool_name=deployment.work_pool_name,
)

deployment = schemas.core.Deployment(**deployment_dict)
Expand Down Expand Up @@ -163,8 +163,8 @@ async def read_deployments(
flow_runs: schemas.filters.FlowRunFilter = None,
task_runs: schemas.filters.TaskRunFilter = None,
deployments: schemas.filters.DeploymentFilter = None,
worker_pools: schemas.filters.WorkerPoolFilter = None,
worker_pool_queues: schemas.filters.WorkerPoolQueueFilter = None,
work_pools: schemas.filters.WorkPoolFilter = None,
work_pool_queues: schemas.filters.WorkPoolQueueFilter = None,
sort: schemas.sorting.DeploymentSort = Body(
schemas.sorting.DeploymentSort.NAME_ASC
),
Expand All @@ -183,8 +183,8 @@ async def read_deployments(
flow_run_filter=flow_runs,
task_run_filter=task_runs,
deployment_filter=deployments,
worker_pool_filter=worker_pools,
worker_pool_queue_filter=worker_pool_queues,
work_pool_filter=work_pools,
work_pool_queue_filter=work_pool_queues,
)
return [
schemas.responses.DeploymentResponse.from_orm(orm_deployment=deployment)
Expand All @@ -198,8 +198,8 @@ async def count_deployments(
flow_runs: schemas.filters.FlowRunFilter = None,
task_runs: schemas.filters.TaskRunFilter = None,
deployments: schemas.filters.DeploymentFilter = None,
worker_pools: schemas.filters.WorkerPoolFilter = None,
worker_pool_queues: schemas.filters.WorkerPoolQueueFilter = None,
work_pools: schemas.filters.WorkPoolFilter = None,
work_pool_queues: schemas.filters.WorkPoolQueueFilter = None,
db: OrionDBInterface = Depends(provide_database_interface),
) -> int:
"""
Expand All @@ -212,8 +212,8 @@ async def count_deployments(
flow_run_filter=flow_runs,
task_run_filter=task_runs,
deployment_filter=deployments,
worker_pool_filter=worker_pools,
worker_pool_queue_filter=worker_pool_queues,
work_pool_filter=work_pools,
work_pool_queue_filter=work_pool_queues,
)


Expand Down Expand Up @@ -371,7 +371,7 @@ async def create_flow_run_from_deployment(
or deployment.infrastructure_document_id
),
work_queue_name=deployment.work_queue_name,
worker_pool_queue_id=deployment.worker_pool_queue_id,
work_pool_queue_id=deployment.work_pool_queue_id,
)

if not flow_run.state:
Expand Down
16 changes: 8 additions & 8 deletions src/prefect/orion/api/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ async def count_flow_runs(
flow_runs: schemas.filters.FlowRunFilter = None,
task_runs: schemas.filters.TaskRunFilter = None,
deployments: schemas.filters.DeploymentFilter = None,
worker_pools: schemas.filters.WorkerPoolFilter = None,
worker_pool_queues: schemas.filters.WorkerPoolQueueFilter = None,
work_pools: schemas.filters.WorkPoolFilter = None,
work_pool_queues: schemas.filters.WorkPoolQueueFilter = None,
db: OrionDBInterface = Depends(provide_database_interface),
) -> int:
"""
Expand All @@ -105,8 +105,8 @@ async def count_flow_runs(
flow_run_filter=flow_runs,
task_run_filter=task_runs,
deployment_filter=deployments,
worker_pool_filter=worker_pools,
worker_pool_queue_filter=worker_pool_queues,
work_pool_filter=work_pools,
work_pool_queue_filter=work_pool_queues,
)


Expand Down Expand Up @@ -250,8 +250,8 @@ async def read_flow_runs(
flow_runs: schemas.filters.FlowRunFilter = None,
task_runs: schemas.filters.TaskRunFilter = None,
deployments: schemas.filters.DeploymentFilter = None,
worker_pools: schemas.filters.WorkerPoolFilter = None,
worker_pool_queues: schemas.filters.WorkerPoolQueueFilter = None,
work_pools: schemas.filters.WorkPoolFilter = None,
work_pool_queues: schemas.filters.WorkPoolQueueFilter = None,
db: OrionDBInterface = Depends(provide_database_interface),
) -> List[schemas.responses.FlowRunResponse]:
"""
Expand All @@ -264,8 +264,8 @@ async def read_flow_runs(
flow_run_filter=flow_runs,
task_run_filter=task_runs,
deployment_filter=deployments,
worker_pool_filter=worker_pools,
worker_pool_queue_filter=worker_pool_queues,
work_pool_filter=work_pools,
work_pool_queue_filter=work_pool_queues,
offset=offset,
limit=limit,
sort=sort,
Expand Down
Loading

0 comments on commit 1a5c461

Please sign in to comment.