Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cell_centers method to TimeDirectory #211

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion foamlib/_cases/_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import sys
from contextlib import asynccontextmanager
from typing import Any, Callable, Optional, TypeVar, Union
from typing import Any, Callable, Optional, TypeVar, Union, overload

if sys.version_info >= (3, 9):
from collections.abc import (
Expand All @@ -23,6 +23,8 @@

import aioshutil

from .._files import FoamFieldFile
from ._base import FoamCaseBase
from ._run import FoamCaseRunBase
from ._subprocess import run_async
from ._util import ValuedGenerator, awaitableasynccontextmanager
Expand All @@ -42,6 +44,20 @@ class AsyncFoamCase(FoamCaseRunBase):
:param path: The path to the case directory.
"""

class TimeDirectory(FoamCaseRunBase.TimeDirectory):
@property
def _case(self) -> "AsyncFoamCase":
return AsyncFoamCase(self.path.parent)

async def cell_centers(self) -> FoamFieldFile:
"""Write and return the cell centers."""
calls = ValuedGenerator(self._cell_centers_calls())

for coro in calls:
await coro

return calls.value

max_cpus = multiprocessing.cpu_count()
"""
Maximum number of CPUs to use for running `AsyncFoamCase`s concurrently. Defaults to the number of CPUs on the system.
Expand Down Expand Up @@ -106,6 +122,23 @@ async def clean(self, *, check: bool = False) -> None:
for coro in self._clean_calls(check=check):
await coro

@overload
def __getitem__(
self, index: Union[int, float, str]
) -> "AsyncFoamCase.TimeDirectory": ...

@overload
def __getitem__(self, index: slice) -> Sequence["AsyncFoamCase.TimeDirectory"]: ...

def __getitem__(
self, index: Union[int, slice, float, str]
) -> Union["AsyncFoamCase.TimeDirectory", Sequence["AsyncFoamCase.TimeDirectory"]]:
ret = super().__getitem__(index)
if isinstance(ret, FoamCaseBase.TimeDirectory):
return AsyncFoamCase.TimeDirectory(ret)
else:
return [AsyncFoamCase.TimeDirectory(r) for r in ret]

async def run(
self,
cmd: Optional[Union[Sequence[Union[str, "os.PathLike[str]"]], str]] = None,
Expand Down
6 changes: 5 additions & 1 deletion foamlib/_cases/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class TimeDirectory(Set[FoamFieldFile]):
def __init__(self, path: Union["os.PathLike[str]", str]):
self.path = Path(path).absolute()

@property
def _case(self) -> "FoamCaseBase":
return FoamCaseBase(self.path.parent)

@property
def time(self) -> float:
"""The time that corresponds to this directory."""
Expand All @@ -58,7 +62,7 @@ def __getitem__(self, key: str) -> FoamFieldFile:

def __contains__(self, obj: object) -> bool:
if isinstance(obj, FoamFieldFile):
return obj.path.parent == self.path
return obj.path.parent == self.path and obj.path.is_file()
elif isinstance(obj, str):
return (self.path / obj).is_file() or (
self.path / f"{obj}.gz"
Expand Down
23 changes: 23 additions & 0 deletions foamlib/_cases/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,34 @@
else:
from typing_extensions import Self

from .._files import FoamFieldFile
from ._base import FoamCaseBase
from ._subprocess import DEVNULL, STDOUT


class FoamCaseRunBase(FoamCaseBase):
class TimeDirectory(FoamCaseBase.TimeDirectory):
@abstractmethod
def cell_centers(
self,
) -> Union[FoamFieldFile, Coroutine[None, None, FoamFieldFile]]:
raise NotImplementedError

@property
@abstractmethod
def _case(self) -> "FoamCaseRunBase":
raise NotImplementedError

def _cell_centers_calls(self) -> Generator[Any, None, FoamFieldFile]:
ret = self["C"]

if ret not in self:
yield self._case.run(
["postProcess", "-func", "writeCellCentres", "-time", self.name]
)

return ret

def __delitem__(self, key: Union[int, float, str]) -> None:
shutil.rmtree(self[key].path)

Expand Down
43 changes: 36 additions & 7 deletions foamlib/_cases/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@
import sys
from pathlib import Path
from types import TracebackType
from typing import (
Any,
Callable,
Optional,
Type,
Union,
)
from typing import Any, Callable, Optional, Type, Union, overload

if sys.version_info >= (3, 9):
from collections.abc import Collection, Sequence
Expand All @@ -21,6 +15,8 @@
else:
from typing_extensions import Self

from .._files import FoamFieldFile
from ._base import FoamCaseBase
from ._run import FoamCaseRunBase
from ._subprocess import run_sync
from ._util import ValuedGenerator
Expand All @@ -37,6 +33,22 @@ class FoamCase(FoamCaseRunBase):
:param path: The path to the case directory.
"""

class TimeDirectory(FoamCaseRunBase.TimeDirectory):
@property
def _case(self) -> "FoamCase":
return FoamCase(self.path.parent)

def cell_centers(self) -> FoamFieldFile:
"""Write and return the cell centers."""
calls = ValuedGenerator(self._cell_centers_calls())

for _ in calls:
pass

print(calls.value)

return calls.value

def __init__(self, path: Union["os.PathLike[str]", str] = Path()):
super().__init__(path)

Expand Down Expand Up @@ -67,6 +79,23 @@ def _copytree(
) -> None:
shutil.copytree(src, dest, symlinks=symlinks, ignore=ignore)

@overload
def __getitem__(
self, index: Union[int, float, str]
) -> "FoamCase.TimeDirectory": ...

@overload
def __getitem__(self, index: slice) -> Sequence["FoamCase.TimeDirectory"]: ...

def __getitem__(
self, index: Union[int, slice, float, str]
) -> Union["FoamCase.TimeDirectory", Sequence["FoamCase.TimeDirectory"]]:
ret = super().__getitem__(index)
if isinstance(ret, FoamCaseBase.TimeDirectory):
return FoamCase.TimeDirectory(ret)
else:
return [FoamCase.TimeDirectory(r) for r in ret]

def __enter__(self) -> Self:
return self

Expand Down
7 changes: 7 additions & 0 deletions tests/test_cases/test_cavity.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,10 @@ def test_double_clean(cavity: FoamCase) -> None:
cavity.clean()
cavity.clean(check=True)
cavity.run(parallel=False)


def test_cell_centers(cavity: FoamCase) -> None:
cavity.block_mesh()
C = cavity[0].cell_centers()
assert isinstance(C.internal_field, list)
assert len(C.internal_field) == 400
8 changes: 8 additions & 0 deletions tests/test_cases/test_cavity_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ async def test_double_clean(cavity: AsyncFoamCase) -> None:
await cavity.run(parallel=False)


@pytest.mark.asyncio
async def test_cell_centers(cavity: AsyncFoamCase) -> None:
await cavity.block_mesh()
C = await cavity[0].cell_centers()
assert isinstance(C.internal_field, list)
assert len(C.internal_field) == 400


def test_map(cavity: AsyncFoamCase) -> None:
async def f(x: Sequence[float]) -> float:
async with cavity.clone() as clone:
Expand Down