diff --git a/src/biosynonyms/__init__.py b/src/biosynonyms/__init__.py index 39ded3e..d3a9edb 100644 --- a/src/biosynonyms/__init__.py +++ b/src/biosynonyms/__init__.py @@ -1,20 +1,21 @@ """Code for biosynonyms.""" +from .model import Synonym, grounder_from_synonyms, group_synonyms, parse_synonyms from .resources import ( - Synonym, get_gilda_terms, + get_grounder, get_negative_synonyms, get_positive_synonyms, - group_synonyms, load_unentities, - parse_synonyms, ) __all__ = [ "Synonym", "get_gilda_terms", + "get_grounder", "get_negative_synonyms", "get_positive_synonyms", + "grounder_from_synonyms", "group_synonyms", "load_unentities", "parse_synonyms", diff --git a/src/biosynonyms/generate_owl.py b/src/biosynonyms/generate_owl.py index feb2940..8e9e0d9 100644 --- a/src/biosynonyms/generate_owl.py +++ b/src/biosynonyms/generate_owl.py @@ -1,5 +1,9 @@ """Generate OWL from the positive synonyms.""" +# TODO re-implement this using pyobo. + +from __future__ import annotations + import gzip from collections import ChainMap from pathlib import Path @@ -10,8 +14,8 @@ from curies import Reference from typing_extensions import Doc -from biosynonyms import Synonym, get_positive_synonyms -from biosynonyms.resources import _clean_str, group_synonyms +from biosynonyms.model import Synonym, group_synonyms +from biosynonyms.resources import get_positive_synonyms HERE = Path(__file__).parent.resolve() EXPORT = HERE.parent.parent.joinpath("exports") @@ -116,6 +120,14 @@ """ +def _text_for_turtle(synonym: Synonym) -> str: + """Get the text ready for an object slot in Turtle, with optional language tag.""" + tt = f'"{_clean_str(synonym.text)}"' + if synonym.language: + tt += f"@{synonym.language}" + return tt + + def write_owl_rdf(**kwargs: Any) -> None: """Write OWL RDF in a Turtle file.""" with open(TTL_PATH, "w") as file: @@ -211,7 +223,7 @@ def get_axiom_str(reference: Reference, synonym: Synonym) -> str | None: a owl:Axiom ; owl:annotatedSource {reference.curie} ; owl:annotatedProperty {synonym.scope.curie} ; - owl:annotatedTarget {synonym.text_for_turtle} ; + owl:annotatedTarget {_text_for_turtle(synonym)} ; {axiom_parts_str} ] . """ @@ -243,7 +255,7 @@ def _write_owl_rdf( # noqa:C901 mains: list[str] = [] axiom_strs: list[str] = [] for synonym in synonyms: - mains.append(f"{synonym.scope.curie} {synonym.text_for_turtle}") + mains.append(f"{synonym.scope.curie} {_text_for_turtle(synonym)}") if axiom_str := get_axiom_str(reference, synonym): axiom_strs.append(axiom_str) @@ -265,5 +277,9 @@ def _write_owl_rdf( # noqa:C901 file.write(dedent(axiom_str)) +def _clean_str(s: str) -> str: + return s + + if __name__ == "__main__": write_owl_rdf() diff --git a/src/biosynonyms/lint.py b/src/biosynonyms/lint.py index d4ef4d4..9f51c49 100644 --- a/src/biosynonyms/lint.py +++ b/src/biosynonyms/lint.py @@ -1,29 +1,18 @@ """Sort the synonyms file.""" -from pathlib import Path +from biosynonyms.model import lint_synonyms from .resources import ( NEGATIVES_PATH, POSITIVES_PATH, _load_unentities, - sort_key, write_unentities, ) -def _sort(path: Path) -> None: - with path.open() as file: - header, *rows = (line.strip().split("\t") for line in file) - rows = sorted(rows, key=sort_key) - with path.open("w") as file: - print(*header, sep="\t", file=file) - for row in rows: - print(*row, sep="\t", file=file) - - def _main() -> None: - _sort(POSITIVES_PATH) - _sort(NEGATIVES_PATH) + lint_synonyms(POSITIVES_PATH) + lint_synonyms(NEGATIVES_PATH) write_unentities(list(_load_unentities())) diff --git a/src/biosynonyms/model.py b/src/biosynonyms/model.py new file mode 100644 index 0000000..021ea85 --- /dev/null +++ b/src/biosynonyms/model.py @@ -0,0 +1,292 @@ +"""A data model for synonyms.""" + +from __future__ import annotations + +import csv +import datetime +from collections import defaultdict +from collections.abc import Iterable, Mapping, Sequence +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import requests +from curies import NamedReference, Reference +from pydantic import BaseModel, Field +from pydantic_extra_types.language_code import LanguageAlpha2 +from tqdm import tqdm + +if TYPE_CHECKING: + import gilda + +__all__ = [ + "Synonym", + "append_synonym", + "group_synonyms", + "lint_synonyms", + "parse_synonyms", +] + + +class Synonym(BaseModel): + """A data model for synonyms.""" + + text: str + language: LanguageAlpha2 | None = Field( + None, + description="The language of the synonym. If not given, typically " + "assumed to be american english.", + ) + reference: NamedReference + scope: Reference = Field( + default=Reference.from_curie("oboInOwl:hasSynonym"), + description="The predicate that connects the term (as subject) " + "to the textual synonym (as object)", + ) + type: Reference | None = Field( + default=None, + title="Synonym type", + description="See the OBO Metadata Ontology for valid values", + ) + + provenance: list[Reference] = Field( + default_factory=list, + description="A list of articles (e.g., from PubMed, PMC, arXiv) where this synonym appears", + ) + contributor: Reference | None = Field( + None, description="The contributor, usually given as a reference to ORCID" + ) + comment: str | None = Field( + None, description="An optional comment on the synonym curation or status" + ) + source: str | None = Field( + None, description="The name of the resource where the synonym was curated" + ) + date: datetime.datetime | None = Field(None, description="The date of initial curation") + + def get_all_references(self) -> set[Reference]: + """Get all references made by this object.""" + rv: set[Reference] = {self.reference, self.scope, *self.provenance} + if self.type: + rv.add(self.type) + if self.contributor: + rv.add(self.contributor) + return rv + + @property + def name(self) -> str: + """Get the reference's name.""" + return self.reference.name + + @property + def curie(self) -> str: + """Get the reference's CURIE.""" + return self.reference.curie + + @property + def date_str(self) -> str: + """Get the date as a string.""" + if self.date is None: + raise ValueError("date is not set") + return self.date.strftime("%Y-%m-%d") + + @classmethod + def from_row( + cls, row: dict[str, Any], *, names: Mapping[Reference, str] | None = None + ) -> Synonym: + """Parse a dictionary representing a row in a TSV.""" + reference = Reference.from_curie(row["curie"]) + name = (names or {}).get(reference) or row.get("name") or row["text"] + data = { + "text": row["text"], + "reference": NamedReference( + prefix=reference.prefix, identifier=reference.identifier, name=name + ), + "scope": ( + Reference.from_curie(scope_curie.strip()) + if (scope_curie := row.get("scope")) + else Reference.from_curie("oboInOwl:hasSynonym") + ), + "type": _safe_parse_curie(row["type"]) if "type" in row else None, + "provenance": [ + Reference.from_curie(provenance_curie.strip()) + for provenance_curie in (row.get("provenance") or "").split(",") + if provenance_curie.strip() + ], + # get("X") or None protects against empty strings + "language": row.get("language") or None, + "comment": row.get("comment") or None, + "source": row.get("source") or None, + } + if contributor_curie := (row.get("contributor") or "").strip(): + data["contributor"] = Reference.from_curie(contributor_curie) + if date := (row.get("date") or "").strip(): + data["date"] = datetime.datetime.strptime(date, "%Y-%m-%d") + + return cls.model_validate(data) + + @classmethod + def from_gilda(cls, term: gilda.Term) -> Synonym: + """Get this synonym as a gilda term. + + :param term: A Gilda term + :returns: A synonym object + + .. warning:: + + Gilda's data model is less complete, so resulting synonym objects + will not have detailed curation provenance + """ + data = { + "text": term.text, + # TODO standardize? + "reference": NamedReference(prefix=term.db, identifier=term.id, name=term.entry_name), + "source": term.source, + } + return cls.model_validate(data) + + def to_gilda(self) -> gilda.Term: + """Get this synonym as a gilda term.""" + if not self.name: + raise ValueError("can't make a Gilda term without a label") + return _gilda_term( + text=self.text, + reference=self.reference, + name=self.name, + # TODO is Gilda's status vocabulary worth building an OMO map to/from? + status="synonym", + source=self.source or "biosynonyms", + ) + + +def _gilda_term( + *, + text: str, + reference: Reference, + name: str | None = None, + status: str, + source: str | None, +) -> gilda.Term: + import gilda + from gilda.process import normalize + + return gilda.Term( + normalize(text), + text=text, + db=reference.prefix, + id=reference.identifier, + entry_name=name or text, + status=status, + source=source, + ) + + +def _safe_parse_curie(x) -> Reference | None: # type:ignore + if not isinstance(x, str) or not x.strip(): + return None + return Reference.from_curie(x.strip()) + + +def append_synonym(path: str | Path, synonym: Synonym) -> None: + """Append a synonym to an existing file.""" + raise NotImplementedError + + +def parse_synonyms( + path: str | Path, + *, + delimiter: str | None = None, + names: Mapping[Reference, str] | None = None, +) -> list[Synonym]: + """Load synonyms from a file. + + :param path: A local file path or URL for a biosynonyms-flavored CSV/TSV file + :param delimiter: The delimiter for the CSV/TSV file. Defaults to tab + :param names: A pre-parsed dictionary from references + (i.e., prefix-luid pairs) to default labels + :returns: A list of synonym objects parsed from the table + """ + if isinstance(path, str) and any(path.startswith(schema) for schema in ("https://", "http://")): + res = requests.get(path, timeout=15) + res.raise_for_status() + return _from_lines(res.iter_lines(decode_unicode=True), delimiter=delimiter, names=names) + + path = Path(path).resolve() + + if path.suffix == ".numbers": + return _parse_numbers(path, names=names) + + with path.open() as file: + return _from_lines(file, delimiter=delimiter, names=names) + + +def _parse_numbers( + path: str | Path, + *, + names: Mapping[Reference, str] | None = None, +) -> list[Synonym]: + # code example from https://pypi.org/project/numbers-parser + import numbers_parser + + doc = numbers_parser.Document(path) + sheets = doc.sheets + tables = sheets[0].tables + header, *rows = tables[0].rows(values_only=True) + return _from_dicts((dict(zip(header, row, strict=False)) for row in rows), names=names) + + +def _from_lines( + lines: Iterable[str], + *, + delimiter: str | None = None, + names: Mapping[Reference, str] | None = None, +) -> list[Synonym]: + return _from_dicts(csv.DictReader(lines, delimiter=delimiter or "\t"), names=names) + + +def _from_dicts( + dicts: Iterable[dict[str, Any]], + *, + names: Mapping[Reference, str] | None = None, +) -> list[Synonym]: + rv = [] + for i, record in enumerate(dicts, start=2): + record = {k: v for k, v in record.items() if k and v and k.strip() and v.strip()} + if record: + try: + synonym = Synonym.from_row(record, names=names) + except ValueError as e: + raise ValueError(f"failed on row {i}: {record}") from e + rv.append(synonym) + return rv + + +def group_synonyms(synonyms: Iterable[Synonym]) -> dict[Reference, list[Synonym]]: + """Aggregate synonyms by reference.""" + dd: defaultdict[Reference, list[Synonym]] = defaultdict(list) + for synonym in tqdm(synonyms, unit="synonym", unit_scale=True, leave=False): + dd[synonym.reference].append(synonym) + return dict(dd) + + +def grounder_from_synonyms(synonyms: Iterable[Synonym]) -> gilda.Grounder: + """Get a Gilda grounder from synonyms.""" + import gilda + + rv = gilda.Grounder([synonym.to_gilda() for synonym in synonyms]) + return rv + + +def lint_synonyms(path: Path) -> None: + """Lint a synonyms file.""" + with path.open() as file: + header, *rows = (line.strip().split("\t") for line in file) + rows = sorted(rows, key=_sort_key) + with path.open("w") as file: + print(*header, sep="\t", file=file) + for row in rows: + print(*row, sep="\t", file=file) + + +def _sort_key(row: Sequence[str]) -> tuple[str, str, str, str]: + """Return a key for sorting a row.""" + return row[0].casefold(), row[0], row[1].casefold(), row[1] diff --git a/src/biosynonyms/predict.py b/src/biosynonyms/predict.py index aca0a6b..911e753 100644 --- a/src/biosynonyms/predict.py +++ b/src/biosynonyms/predict.py @@ -123,7 +123,7 @@ def main(size: int, force: bool) -> None: from embiggen import GraphVisualizer visualizer = GraphVisualizer(graph) - fig, axes = visualizer.fit_and_plot_all(embedding) + fig, _axes = visualizer.fit_and_plot_all(embedding) click.echo(f"Outputting plots to {PLOT_PATH}") plt.savefig(PLOT_PATH, dpi=300) plt.close(fig) diff --git a/src/biosynonyms/resources/__init__.py b/src/biosynonyms/resources/__init__.py index 1023cb0..a365663 100644 --- a/src/biosynonyms/resources/__init__.py +++ b/src/biosynonyms/resources/__init__.py @@ -2,34 +2,21 @@ from __future__ import annotations -import csv -import datetime -from collections import defaultdict -from collections.abc import Iterable, Mapping, Sequence +from collections.abc import Iterable, Sequence from pathlib import Path -from typing import ( - TYPE_CHECKING, - Any, - cast, -) +from typing import TYPE_CHECKING, cast -import requests -from curies import NamedReference, Reference -from pydantic import BaseModel, Field -from pydantic_extra_types.language_code import LanguageAlpha2 -from tqdm import tqdm +from biosynonyms.model import Synonym, grounder_from_synonyms, parse_synonyms if TYPE_CHECKING: import gilda __all__ = [ - "Synonym", "get_gilda_terms", + "get_grounder", "get_negative_synonyms", "get_positive_synonyms", - "group_synonyms", "load_unentities", - "parse_synonyms", "write_unentities", ] @@ -47,11 +34,6 @@ } -def sort_key(row: Sequence[str]) -> tuple[str, str, str, str]: - """Return a key for sorting a row.""" - return row[0].casefold(), row[0], row[1].casefold(), row[1] - - def load_unentities() -> set[str]: """Load all strings that are known not to be named entities.""" return {line[0] for line in _load_unentities()} @@ -76,177 +58,6 @@ def write_unentities(rows: Iterable[tuple[str, str]]) -> None: print(*row, sep="\t", file=file) -def _clean_str(s: str) -> str: - return s - - -class Synonym(BaseModel): - """A data model for synonyms.""" - - text: str - language: LanguageAlpha2 | None = Field( - None, - description="The language of the synonym. If not given, typically " - "assumed to be american english.", - ) - reference: NamedReference - scope: Reference = Field( - default=Reference.from_curie("oboInOwl:hasSynonym"), - description="The predicate that connects the term (as subject) " - "to the textual synonym (as object)", - ) - type: Reference | None = Field( - default=None, - title="Synonym type", - description="See the OBO Metadata Ontology for valid values", - ) - - provenance: list[Reference] = Field( - default_factory=list, - description="A list of articles (e.g., from PubMed, PMC, arXiv) where this synonym appears", - ) - contributor: Reference | None = Field( - None, description="The contributor, usually given as a reference to ORCID" - ) - comment: str | None = Field( - None, description="An optional comment on the synonym curation or status" - ) - source: str | None = Field( - None, description="The name of the resource where the synonym was curated" - ) - date: datetime.datetime | None = Field(None, description="The date of initial curation") - - def get_all_references(self) -> set[Reference]: - """Get all references made by this object.""" - rv: set[Reference] = {self.reference, self.scope, *self.provenance} - if self.type: - rv.add(self.type) - if self.contributor: - rv.add(self.contributor) - return rv - - @property - def name(self) -> str: - """Get the reference's name.""" - return self.reference.name - - @property - def curie(self) -> str: - """Get the reference's CURIE.""" - return self.reference.curie - - @property - def date_str(self) -> str: - """Get the date as a string.""" - if self.date is None: - raise ValueError("date is not set") - return self.date.strftime("%Y-%m-%d") - - @property - def text_for_turtle(self) -> str: - """Get the text ready for an object slot in Turtle, with optional language tag.""" - tt = f'"{_clean_str(self.text)}"' - if self.language: - tt += f"@{self.language}" - return tt - - @classmethod - def from_row( - cls, row: dict[str, Any], *, names: Mapping[Reference, str] | None = None - ) -> Synonym: - """Parse a dictionary representing a row in a TSV.""" - reference = Reference.from_curie(row["curie"]) - name = (names or {}).get(reference) or row.get("name") or row["text"] - data = { - "text": row["text"], - "reference": NamedReference( - prefix=reference.prefix, identifier=reference.identifier, name=name - ), - "scope": ( - Reference.from_curie(scope_curie.strip()) - if (scope_curie := row.get("scope")) - else Reference.from_curie("oboInOwl:hasSynonym") - ), - "type": _safe_parse_curie(row["type"]) if "type" in row else None, - "provenance": [ - Reference.from_curie(provenance_curie.strip()) - for provenance_curie in (row.get("provenance") or "").split(",") - if provenance_curie.strip() - ], - "language": row.get("language") - or None, # get("X") or None protects against empty strings - "comment": row.get("comment") or None, - "source": row.get("source") or None, - } - if contributor_curie := (row.get("contributor") or "").strip(): - data["contributor"] = Reference.from_curie(contributor_curie) - if date := (row.get("date") or "").strip(): - data["date"] = datetime.datetime.strptime(date, "%Y-%m-%d") - - return cls.model_validate(data) - - @classmethod - def from_gilda_term(cls, term: gilda.Term) -> Synonym: - """Get this synonym as a gilda term. - - :param term: A Gilda term - :returns: A synonym object - - .. warning:: - - Gilda's data model is less complete, so resulting synonym objects - will not have detailed curation provenance - """ - data = { - "text": term.text, - # TODO standardize? - "reference": NamedReference(prefix=term.db, identifier=term.id, name=term.entry_name), - "source": term.source, - } - return cls.model_validate(data) - - def as_gilda_term(self) -> gilda.Term: - """Get this synonym as a gilda term.""" - if not self.name: - raise ValueError("can't make a Gilda term without a label") - return _gilda_term( - text=self.text, - reference=self.reference, - name=self.name, - # TODO is Gilda's status vocabulary worth building an OMO map to/from? - status="synonym", - source=self.source or "biosynonyms", - ) - - -def _gilda_term( - *, - text: str, - reference: Reference, - name: str | None = None, - status: str, - source: str | None, -) -> gilda.Term: - import gilda - from gilda.process import normalize - - return gilda.Term( - normalize(text), - text=text, - db=reference.prefix, - id=reference.identifier, - entry_name=name or text, - status=status, - source=source, - ) - - -def _safe_parse_curie(x) -> Reference | None: # type:ignore - if not isinstance(x, str) or not x.strip(): - return None - return Reference.from_curie(x.strip()) - - def get_positive_synonyms() -> list[Synonym]: """Get positive synonyms curated in Biosynonyms.""" return parse_synonyms(POSITIVES_PATH) @@ -257,84 +68,11 @@ def get_negative_synonyms() -> list[Synonym]: return parse_synonyms(NEGATIVES_PATH) -def parse_synonyms( - path: str | Path, - *, - delimiter: str | None = None, - names: Mapping[Reference, str] | None = None, -) -> list[Synonym]: - """Load synonyms from a file. - - :param path: A local file path or URL for a biosynonyms-flavored CSV/TSV file - :param delimiter: The delimiter for the CSV/TSV file. Defaults to tab - :param names: A pre-parsed dictionary from references - (i.e., prefix-luid pairs) to default labels - :returns: A list of synonym objects parsed from the table - """ - if isinstance(path, str) and any(path.startswith(schema) for schema in ("https://", "http://")): - res = requests.get(path, timeout=15) - res.raise_for_status() - return _from_lines(res.iter_lines(decode_unicode=True), delimiter=delimiter, names=names) - - path = Path(path).resolve() - - if path.suffix == ".numbers": - return _parse_numbers(path, names=names) - - with path.open() as file: - return _from_lines(file, delimiter=delimiter, names=names) - - -def _parse_numbers( - path: str | Path, - *, - names: Mapping[Reference, str] | None = None, -) -> list[Synonym]: - # code example from https://pypi.org/project/numbers-parser - import numbers_parser - - doc = numbers_parser.Document(path) - sheets = doc.sheets - tables = sheets[0].tables - header, *rows = tables[0].rows(values_only=True) - return _from_dicts((dict(zip(header, row, strict=False)) for row in rows), names=names) - - -def _from_lines( - lines: Iterable[str], - *, - delimiter: str | None = None, - names: Mapping[Reference, str] | None = None, -) -> list[Synonym]: - return _from_dicts(csv.DictReader(lines, delimiter=delimiter or "\t"), names=names) - - -def _from_dicts( - dicts: Iterable[dict[str, Any]], - *, - names: Mapping[Reference, str] | None = None, -) -> list[Synonym]: - rv = [] - for i, record in enumerate(dicts, start=2): - record = {k: v for k, v in record.items() if k and v and k.strip() and v.strip()} - if record: - try: - synonym = Synonym.from_row(record, names=names) - except ValueError as e: - raise ValueError(f"failed on row {i}: {record}") from e - rv.append(synonym) - return rv - - -def get_gilda_terms() -> Iterable[gilda.Term]: +def get_gilda_terms() -> list[gilda.Term]: """Get Gilda terms for all positive synonyms.""" - for synonym in parse_synonyms(POSITIVES_PATH): - yield synonym.as_gilda_term() + return [synonym.to_gilda() for synonym in get_positive_synonyms()] -def group_synonyms(synonyms: Iterable[Synonym]) -> dict[Reference, list[Synonym]]: - """Aggregate synonyms by reference.""" - dd: defaultdict[Reference, list[Synonym]] = defaultdict(list) - for synonym in tqdm(synonyms, unit="synonym", unit_scale=True, leave=False): - dd[synonym.reference].append(synonym) - return dict(dd) +def get_grounder() -> gilda.Groudner: + """Get a grounder from all positive synonyms.""" + return grounder_from_synonyms(get_positive_synonyms()) diff --git a/tests/test_integrity.py b/tests/test_integrity.py index 7c98a4b..bd7b1a7 100644 --- a/tests/test_integrity.py +++ b/tests/test_integrity.py @@ -4,17 +4,16 @@ from collections import Counter import bioregistry -import gilda from curies import ReferenceTuple import biosynonyms +from biosynonyms.model import _sort_key from biosynonyms.resources import ( NEGATIVES_PATH, POSITIVES_PATH, SYNONYM_SCOPES, UNENTITIES_PATH, _unentities_key, - sort_key, ) @@ -54,9 +53,9 @@ def test_positives(self): references, contributor_curie, date, - lang, - comment, - src, + _lang, + _comment, + _src, ) = row self.assertLess(1, len(text), msg="can not have 1 letter synonyms") self.assert_curie(curie) @@ -71,7 +70,7 @@ def test_positives(self): self.assertRegex(date, "\\d{4}-\\d{2}-\\d{2}") # test sorted - self.assertEqual(sorted(rows, key=sort_key), rows, msg="synonyms are not properly sorted") + self.assertEqual(sorted(rows, key=_sort_key), rows, msg="synonyms are not properly sorted") # test no duplicates c = Counter(row[:2] for row in rows) @@ -96,7 +95,7 @@ def test_negatives(self): # test sorted self.assertEqual( - sorted(rows, key=sort_key), + sorted(rows, key=_sort_key), rows, msg="negative synonyms are not properly sorted", ) @@ -120,7 +119,7 @@ def test_non_entities(self): def test_gilda(self): """Test getting tilda terms.""" - grounder = gilda.Grounder(biosynonyms.get_gilda_terms()) + grounder = biosynonyms.get_grounder() scored_matches = grounder.ground("YAL021C") self.assertEqual(1, len(scored_matches)) self.assertEqual("sgd", scored_matches[0].term.db)