-
Notifications
You must be signed in to change notification settings - Fork 91
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
245 additions
and
72 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,46 +1,92 @@ | ||
from abc import ABC, abstractmethod | ||
# decouple represetation from evaluation | ||
# typevar always of kind *, type hint won't work much here | ||
|
||
from dataclasses import dataclass | ||
from typing import Union | ||
from typing import Any, Callable, Generic, Set, TypeVar | ||
|
||
from typing_extensions import Protocol, Self | ||
|
||
# TODO: a la carte | ||
SExpr = Union["And2", "Or2", "Not", "Lit", "_SExpr"] | ||
T = TypeVar("T") | ||
U = TypeVar("U") | ||
|
||
|
||
class _SExpr(ABC): | ||
@abstractmethod | ||
def eval(self, x) -> bool: | ||
pass | ||
class Functor(Protocol, Generic[T]): | ||
def map(self, f: Callable[[T], U]) -> Self: | ||
# map :: f t -> (t -> u) -> f u | ||
... | ||
|
||
|
||
@dataclass | ||
class And2(_SExpr): | ||
lexpr: SExpr | ||
rexpr: SExpr | ||
class And2(Generic[T]): | ||
lterm: T | ||
rterm: T | ||
|
||
def eval(self, x) -> bool: | ||
return self.lexpr.eval(x) and self.rexpr.eval(x) | ||
def map(self, f): | ||
return And2(f(self.lterm), f(self.rterm)) | ||
|
||
|
||
@dataclass | ||
class Or2(_SExpr): | ||
lexpr: SExpr | ||
rexpr: SExpr | ||
class Or2(Generic[T]): | ||
lterm: T | ||
rterm: T | ||
|
||
def eval(self, x) -> bool: | ||
return self.lexpr.eval(x) or self.rexpr.eval(x) | ||
def map(self, f): | ||
return Or2(f(self.lterm), f(self.rterm)) | ||
|
||
|
||
@dataclass | ||
class Not(_SExpr): | ||
expr: SExpr | ||
class Not(Generic[T]): | ||
term: T | ||
|
||
def eval(self, x) -> bool: | ||
return not self.expr.eval(x) | ||
def map(self, f): | ||
return Not(f(self.term)) | ||
|
||
|
||
@dataclass | ||
class Lit(_SExpr): | ||
val: str | ||
class Eq(Generic[U]): | ||
val: U | ||
|
||
def map(self, f): | ||
return self | ||
|
||
|
||
Expr = TypeVar("Expr", bound=Functor) | ||
|
||
|
||
def cata(f: Callable, rep: Expr): | ||
# cata :: (f t -> t) -> f (f (f ...)) -> t | ||
return f(rep.map(lambda x: cata(f, x))) | ||
|
||
|
||
def eval_on_set(s: Set) -> Callable: | ||
def _(x): | ||
if isinstance(x, Eq): | ||
return {i for i in s if i == x.val} | ||
if isinstance(x, And2): | ||
return x.lterm & x.rterm | ||
if isinstance(x, Or2): | ||
return x.lterm | x.rterm | ||
if isinstance(x, Not): | ||
return s - x.term | ||
raise TypeError(f"unexpected {x}") | ||
|
||
return _ | ||
|
||
|
||
def apply_on_set(rep: Expr, s: Set) -> Set: | ||
return cata(eval_on_set(s), rep) | ||
|
||
|
||
def apply_single(rep: Expr, i: Any) -> bool: | ||
def _(x): | ||
if isinstance(x, Eq): | ||
return x.val == i | ||
if isinstance(x, And2): | ||
return x.lterm and x.rterm | ||
if isinstance(x, Or2): | ||
return x.lterm or x.rterm | ||
if isinstance(x, Not): | ||
return not x.term | ||
raise TypeError(f"unexpected {x}") | ||
|
||
def eval(self, x) -> bool: | ||
return self.val == x | ||
return cata(_(i), rep) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from dataclasses import dataclass | ||
from functools import reduce | ||
from typing import Generic, List, TypeVar | ||
|
||
import testplan.common.utils.selector as S | ||
|
||
|
||
def test_basic_op(): | ||
assert S.apply_on_set(S.Not(S.Eq("a")), {"a", "b"}) == {"b"} | ||
assert S.apply_on_set(S.Not(S.Eq("a")), {"c", "b"}) == {"b", "c"} | ||
assert S.apply_on_set( | ||
S.Or2(S.Eq("a"), S.Eq("b")), {"a", "b", "c", "d"} | ||
) == {"a", "b"} | ||
|
||
|
||
def test_ext(): | ||
|
||
X = TypeVar("X") | ||
|
||
@dataclass | ||
class AndN(Generic[X]): | ||
terms: List[X] | ||
|
||
def map(self, f): | ||
return AndN(list(map(f, self.terms))) | ||
|
||
to_reuse = S.eval_on_set({"a", "b", "c"}) | ||
|
||
def _(x): | ||
if isinstance(x, AndN): | ||
return reduce(lambda x, y: x.intersection(y), x.terms) | ||
return to_reuse(x) | ||
|
||
assert S.cata( | ||
_, | ||
AndN( | ||
[ | ||
S.Or2(S.Eq("a"), S.Eq("b")), | ||
S.Or2(S.Eq("b"), S.Eq("c")), | ||
S.Not(S.Eq("a")), | ||
] | ||
), | ||
) == {"b"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
from time import sleep | ||
|
||
import pytest | ||
|
||
import testplan.testing.multitest as mt | ||
from testplan.common.utils.selector import Eq | ||
from testplan.runnable import TestRunner as MyTestRunner | ||
from testplan.runnable.messaging import InterExecutorMessage | ||
from testplan.runners.local import LocalRunner | ||
|
||
|
||
@mt.testsuite | ||
class Suite: | ||
def __init__(self, pre_sleep, post_sleep): | ||
self.pre = pre_sleep | ||
self.post = post_sleep | ||
|
||
@mt.testcase | ||
def case_a(self, env, result): | ||
sleep(self.pre) | ||
result.true(False) | ||
sleep(self.post) | ||
|
||
@mt.testcase | ||
def case_b(self, env, result): | ||
result.true(False) | ||
|
||
|
||
MT_NAME = "dummy_mt" | ||
|
||
|
||
def gen_mt(*suites): | ||
return mt.MultiTest(MT_NAME, suites=suites) | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"pre_sleep,post_sleep,out_sleep,has_result", | ||
( | ||
(1, 0, 0.5, False), | ||
(0, 1, 0.5, False), | ||
(0, 0, 0.5, True), | ||
), | ||
) | ||
def test_local_simple_abort(pre_sleep, post_sleep, out_sleep, has_result): | ||
par = MyTestRunner(name="in-the-middle-of-unit-tests") | ||
par.add_resource(LocalRunner(), "non-express") | ||
mt = gen_mt(Suite(pre_sleep, post_sleep)) | ||
par.add(mt, "non-express") | ||
r: LocalRunner = par.resources["non-express"] | ||
chs = par._exec_channels | ||
r.start() | ||
sleep(out_sleep) | ||
chs.cast(Eq("non-express"), InterExecutorMessage.make_expected_abort()) | ||
while r.pending_work(): | ||
sleep(0.1) | ||
r.stop() | ||
if has_result: | ||
# we don't have other runners here, casted messages might not get | ||
# processed in time before runner dies | ||
assert MT_NAME in r.results | ||
repo = r.results[MT_NAME].report | ||
assert len(repo) == 1 | ||
assert len(repo.entries[0]) == 2 | ||
else: | ||
assert r._to_skip_remaining is True | ||
assert len(r.results) == 0 |
Oops, something went wrong.