Skip to content

Commit

Permalink
refactor: new operation entities
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulFarault committed Nov 29, 2024
1 parent 1963255 commit 9c866eb
Show file tree
Hide file tree
Showing 16 changed files with 529 additions and 333 deletions.
2 changes: 1 addition & 1 deletion scripts/playbooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def playbooks(services, output_dir, for_collection, collections):
for operation in dag.get_all_operations():
dag_services.add_node(operation.name.service)
for dependency in operation.depends_on:
dependency_operation = OperationName.from_name(dependency)
dependency_operation = OperationName.from_str(dependency)
if dependency_operation.service != operation.name.service:
dag_services.add_edge(
dependency_operation.service, operation.name.service
Expand Down
23 changes: 19 additions & 4 deletions tdp/cli/commands/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

from tdp.cli.params import collections_option, hosts_option
from tdp.core.dag import Dag
from tdp.core.entities.operation import Operation, PlaybookOperation

if TYPE_CHECKING:
from tdp.core.collections import Collections
from tdp.core.entities.operation import Operation


@click.command()
Expand Down Expand Up @@ -50,7 +50,11 @@ def ops(
operations = [
operation
for operation in dag.get_all_operations()
if len(hosts) == 0 or bool(set(operation.host_names) & set(hosts))
if len(hosts) == 0
or (
not isinstance(operation, PlaybookOperation)
or bool(set(operation.playbook.hosts) & set(hosts))
)
]
if topo_sort:
sorted_operations = dag.topological_sort_key(
Expand All @@ -65,7 +69,11 @@ def ops(
operations = [
operation
for operation in collections.operations.values()
if len(hosts) == 0 or bool(set(operation.host_names) & set(hosts))
if len(hosts) == 0
or (
not isinstance(operation, PlaybookOperation)
or bool(set(operation.playbook.hosts) & set(hosts))
)
]
sorted_operations = sorted(
operations, key=lambda operation: operation.name.name
Expand All @@ -78,7 +86,14 @@ def _print_operations(operations: Iterable[Operation], /):
click.echo(
tabulate(
[
[operation.name.name, operation.host_names or ""]
[
operation.name.name,
(
", ".join(operation.playbook.hosts)
if isinstance(operation, PlaybookOperation)
else ""
),
]
for operation in operations
],
headers=["Operation name", "Hosts"],
Expand Down
7 changes: 6 additions & 1 deletion tdp/core/cluster_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
create_hosted_entity,
)
from tdp.core.entities.hosted_entity_status import HostedEntityStatus
from tdp.core.entities.operation import PlaybookOperation
from tdp.core.models.sch_status_log_model import (
SCHStatusLogModel,
SCHStatusLogSourceEnum,
Expand Down Expand Up @@ -133,7 +134,11 @@ def generate_stale_sch_logs(
continue

# Create a log for each host where the entity is deployed
for host in operation.host_names:
for host in (
operation.playbook.hosts
if isinstance(operation, PlaybookOperation)
else []
):
log = logs.setdefault(
create_hosted_entity(
create_entity_name(
Expand Down
51 changes: 30 additions & 21 deletions tdp/core/collections/collection_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
SCHEMA_VARS_DIRECTORY_NAME,
YML_EXTENSION,
)
from tdp.core.entities.operation import DagOperation, Playbook
from tdp.core.entities.operation import Playbook
from tdp.core.inventory_reader import InventoryReader
from tdp.core.types import PathLike
from tdp.core.variables.schema import ServiceCollectionSchema
Expand Down Expand Up @@ -51,6 +51,32 @@ class MissingMandatoryDirectoryError(Exception):
pass


class TDPLibDagNodeModel(BaseModel):
"""Model for a DAG node read from a DAG file.
Meant to be used in a DagNodeBuilder.
Args:
name: Name of the operation.
depends_on: List of operations that must be executed before this one.
noop: Whether the operation is a noop.
"""

model_config = ConfigDict(extra="ignore")

name: str
depends_on: frozenset[str] = frozenset()
noop: Optional[bool] = False


class TDPLibDagModel(BaseModel):
"""Model for a TDP DAG defined in a tdp_lib_dag file."""

model_config = ConfigDict(extra="ignore")

operations: list[TDPLibDagNodeModel]


class CollectionReader:
"""An enriched version of an Ansible collection.
Expand Down Expand Up @@ -122,34 +148,17 @@ def schema_directory(self) -> Path:
"""Path to the variables schema directory."""
return self._path / SCHEMA_VARS_DIRECTORY_NAME

def read_dag_nodes(self) -> Generator[DagOperation, None, None]:
def read_dag_nodes(self) -> Generator[TDPLibDagNodeModel, None, None]:
"""Read the DAG nodes stored in the dag_directory."""

class TDPLibDagNodeModel(BaseModel):
"""Model for a TDP operation defined in a tdp_lib_dag file."""

model_config = ConfigDict(extra="ignore")

name: str
depends_on: frozenset[str] = frozenset()

class TDPLibDagModel(BaseModel):
"""Model for a TDP DAG defined in a tdp_lib_dag file."""

model_config = ConfigDict(extra="ignore")

operations: list[TDPLibDagNodeModel]

for dag_file in (self.dag_directory).glob("*" + YML_EXTENSION):
with dag_file.open("r") as operations_file:
file_content = yaml.load(operations_file, Loader=Loader)

try:
tdp_lib_dag = TDPLibDagModel(operations=file_content)
for operation in tdp_lib_dag.operations:
yield DagOperation.from_name(
name=operation.name, depends_on=operation.depends_on
)
yield operation
except ValidationError as e:
logger.error(f"Error while parsing tdp_lib_dag file {dag_file}: {e}")
raise
Expand Down Expand Up @@ -201,7 +210,7 @@ def _check_collection_structure(self, path: Path) -> None:

def read_hosts_from_playbook(
playbook_path: Path, inventory_reader: Optional[InventoryReader]
) -> set[str]:
) -> frozenset[str]:
"""Read the hosts from a playbook.
Args:
Expand Down
Loading

0 comments on commit 9c866eb

Please sign in to comment.