Skip to content

Commit

Permalink
[CIVIS-9747] MAINT show code examples in docs; add static type checki…
Browse files Browse the repository at this point in the history
…ng (#12)
  • Loading branch information
jacksonlee-civis authored Nov 7, 2024
1 parent dcfb6c7 commit 184cf40
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 48 deletions.
7 changes: 7 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ workflows:
command-run: |
pip install ".[dev]" && \
black --check src tests examples
- pre-build:
name: mypy
command-run: |
pip install ".[dev]" && \
mypy src
- pre-build:
name: twine
command-run: |
Expand All @@ -92,6 +97,7 @@ workflows:
requires:
- flake8
- black
- mypy
- twine
- bandit
matrix:
Expand All @@ -101,5 +107,6 @@ workflows:
requires:
- flake8
- black
- mypy
- twine
- bandit
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Security

## [1.5.1] - 2024-11-07

### Added
- Added static type checking using mypy. (#12)

### Fixed
- Fixed documentation to actually show example code. (#12)

## [1.5.0] - 2024-10-30

### Added
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output

html_theme = 'furo'
numpydoc_class_members_toctree = False

intersphinx_mapping = {
'python': ('https://docs.python.org/3', None)
Expand Down
20 changes: 10 additions & 10 deletions docs/more_examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ To override this default behavior,
use the keyword argument ``start_nodes`` at :func:`~async_graph_data_flow.AsyncExecutor.execute`
to select which nodes to execute instead and/or supply the arguments to whichever nodes you've selected.

.. literalinclude:: ../../examples/custom_start_nodes.py
.. literalinclude:: ../examples/custom_start_nodes.py
:language: python
:emphasize-lines: 34

.. literalinclude:: ../../examples/custom_start_node_args.py
.. literalinclude:: ../examples/custom_start_node_args.py
:language: python
:emphasize-lines: 34

Expand All @@ -41,7 +41,7 @@ and therefore all nodes will execute concurrently upon graph execution.
The start nodes and their arguments at the beginning of the graph execution are available at
:attr:`~async_graph_data_flow.AsyncExecutor.start_nodes` of :func:`~async_graph_data_flow.AsyncExecutor.execute`.

.. literalinclude:: ../../examples/graph_with_no_edges.py
.. literalinclude:: ../examples/graph_with_no_edges.py
:language: python
:emphasize-lines: 30,39,48

Expand All @@ -63,7 +63,7 @@ An :class:`~async_graph_data_flow.AsyncExecutor` instance has
the method :func:`~async_graph_data_flow.AsyncExecutor.turn_on_data_flow_logging`,
which you can call to turn on and configure logging.

.. literalinclude:: ../../examples/data_flow_logging.py
.. literalinclude:: ../examples/data_flow_logging.py
:language: python
:emphasize-lines: 36-38

Expand All @@ -74,7 +74,7 @@ By default, each node creates one task at a time.
To spawn multiple, concurrent tasks from a node,
set ``max_tasks`` at :func:`~async_graph_data_flow.AsyncGraph.add_node`.

.. literalinclude:: ../../examples/concurrent_tasks_per_node.py
.. literalinclude:: ../examples/concurrent_tasks_per_node.py
:language: python
:emphasize-lines: 25

Expand All @@ -89,11 +89,11 @@ This behavior can be altered in two different ways:
* To halt execution at any node, set ``halt_on_exception`` to ``True`` when initializing an :class:`~async_graph_data_flow.AsyncGraph` instance.
* To halt execution at a specific node, set ``halt_on_exception`` to ``True`` when using :func:`~async_graph_data_flow.AsyncGraph.add_node` to add the node in question to the graph.

.. literalinclude:: ../../examples/halt_on_exception_at_a_specific_node.py
.. literalinclude:: ../examples/halt_on_exception_at_a_specific_node.py
:language: python
:emphasize-lines: 39

.. literalinclude:: ../../examples/halt_on_exception_at_any_node.py
.. literalinclude:: ../examples/halt_on_exception_at_any_node.py
:language: python
:emphasize-lines: 36

Expand All @@ -108,7 +108,7 @@ allows access to the exceptions from the nodes,
and you can determine what to do with this information
(e.g., raise an exception on your own).

.. literalinclude:: ../../examples/raising_an_exception.py
.. literalinclude:: ../examples/raising_an_exception.py
:language: python
:emphasize-lines: 24-27

Expand All @@ -121,7 +121,7 @@ Inside a node's async function,
you may grab the asyncio's running loop,
then call the synchronous function with this loop.

.. literalinclude:: ../../examples/external_sync_call.py
.. literalinclude:: ../examples/external_sync_call.py
:language: python
:emphasize-lines: 27,29

Expand All @@ -132,6 +132,6 @@ Sharing state across the async functions is possible
if you pass the same object around them.
Such an object can be a custom class instance with methods and attributes as needed.

.. literalinclude:: ../../examples/shared_state.py
.. literalinclude:: ../examples/shared_state.py
:language: python
:emphasize-lines: 21,23,27,30,34,44,45
14 changes: 7 additions & 7 deletions docs/technical.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ see the ``unpack_input`` parameter of :func:`~async_graph_data_flow.AsyncGraph.a

START[ ] -.-> A
B -.-> STOP[ ]
A["async def func2(...):\n&nbsp;&nbsp;&nbsp;&nbsp;...\n&nbsp;&nbsp;&nbsp;&nbsp;yield <strong>foo, bar</strong>"] --> B["async def func3(<strong>foo, bar</strong>):\n&nbsp;&nbsp;&nbsp;&nbsp;...\n&nbsp;&nbsp;&nbsp;&nbsp;yield ..."]
A["async def func2(...):<br/>&nbsp;&nbsp;&nbsp;&nbsp;...<br/>&nbsp;&nbsp;&nbsp;&nbsp;yield <strong>foo, bar</strong>"] --> B["async def func3(<strong>foo, bar</strong>):<br/>&nbsp;&nbsp;&nbsp;&nbsp;...<br/>&nbsp;&nbsp;&nbsp;&nbsp;yield ..."]
style A text-align:left
style B text-align:left
style START fill-opacity:0, stroke-opacity:0;
Expand Down Expand Up @@ -83,12 +83,12 @@ available to process it.
style start2 fill-opacity:0, stroke-opacity:0;

subgraph node and its associated queue
queue3((queue)) --> node3[task 1, task 2,\ntask 3, ...]
queue3((queue)) --> node3[task 1, task 2,<br/>task 3, ...]
end

node1 --> |yields\nitems| queue3
node2 --> |yields\nitems| queue3
node3 -.-> |yields\nitems| STOP[ ]
node1 --> |yield<br/>items| queue3
node2 --> |yields<br/>items| queue3
node3 -.-> |yields<br/>items| STOP[ ]
style STOP fill-opacity:0, stroke-opacity:0;

Example
Expand Down Expand Up @@ -219,10 +219,10 @@ inputs from the items yielded by ``get_open_brewery_data()``.

flowchart LR

Q(("Queue items:\n[{'col1': 'val1', ...}, ...]\n[{'col1': 'val1', ...}, ...]\n...\n"))
Q(("Queue items:<br/>[{'col1': 'val1', ...}, ...]<br/>[{'col1': 'val1', ...}, ...]<br/>...<br/>"))
A[get_open_brewery_data]
B[write_to_csv]
A --> |yields\nitems| Q
A --> |yields<br/>items| Q
Q --> B

``get_open_brewery_data()`` yields a page of the Open Brewery DB data,
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "async-graph-data-flow"
version = "1.5.0"
version = "1.5.1"
description = "Asynchronous functions that pass data along a directed acyclic graph"
readme = "README.md"
requires-python = ">= 3.10"
Expand Down Expand Up @@ -43,6 +43,7 @@ dev = [
"black == 24.10.0",
"flake8 == 7.1.1",
"pytest == 8.3.3",
"mypy == 1.13.0",

# Managing source distributions
"build == 1.2.2",
Expand Down
60 changes: 33 additions & 27 deletions src/async_graph_data_flow/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(
self,
graph: AsyncGraph,
*,
logger: logging.Logger = None,
logger: logging.Logger | None = None,
max_exceptions: int = 1_000,
):
"""Initialize an executor.
Expand All @@ -41,23 +41,23 @@ def __init__(
if not isinstance(self._graph, AsyncGraph):
raise TypeError(f"{self._graph} must be an AsyncGraph instance")

self._node_queues = {}
self._consumer_tasks = {}
self._node_queues: dict[str, asyncio.Queue] = {}
self._consumer_tasks: dict[str, asyncio.Task] = {}
self._halt_pipeline_execution = False
self._logger = logger if logger else _LOG
self._max_exceptions = max_exceptions

self._data_flow_stats = None
self._data_flow_stats: dict[str, dict[str, int]] | None = None
self._data_flow_logging_lock = asyncio.Lock()
self._data_flow_logging = False
self._data_flow_logging_node_format = _DEFAULT_DATA_FLOW_LOGGING_NODE_FORMAT
self._data_flow_logging_time_interval = _DEFAULT_DATA_FLOW_LOGGING_TIME_INTERVAL
self._data_flow_logging_node_filter = self._graph._nodes.keys()
self._data_flow_logging_last_timestamp = 0
self._data_flow_logging_node_filter: Iterable[str] = self._graph._nodes.keys()
self._data_flow_logging_last_timestamp = 0.0

self._start_node_args = None
self._start_node_args: dict[str, tuple] | None = None

self._exceptions = None
self._exceptions: dict[str, deque[Exception]] | None = None

@property
def graph(self) -> AsyncGraph:
Expand All @@ -72,7 +72,7 @@ def exceptions(self) -> dict[str, list[Exception]] | None:
raised from the node.
"""
if self._exceptions is None:
return
return None
from_deque_to_list = {}
for node_name, excs in self._exceptions.items():
# `excs` is a deque. Turning it into a list for user-friendliness.
Expand Down Expand Up @@ -152,7 +152,7 @@ def turn_off_data_flow_logging(self) -> None:
"""Turn off data flow logging."""
self._data_flow_logging = False

async def _log_data_flow_nodes(self):
def _log_data_flow_nodes(self):
for node, flow in self._data_flow_stats.items():
if (
self._data_flow_logging_node_filter
Expand Down Expand Up @@ -186,7 +186,7 @@ async def _consumer(self, node_name: str):
current_timestamp - self._data_flow_logging_last_timestamp
> self._data_flow_logging_time_interval
):
await self._log_data_flow_nodes()
self._log_data_flow_nodes()
self._data_flow_logging_last_timestamp = current_timestamp

queue = self._node_queues[node_name]
Expand All @@ -202,17 +202,17 @@ async def _consumer(self, node_name: str):
try:
if len(params) == 0:
coro = node.func()
elif node.unpack_input and (
(is_tuple := isinstance(data, tuple)) or isinstance(data, dict)
):
coro = node.func(*data) if is_tuple else node.func(**data)
elif node.unpack_input and isinstance(data, tuple):
coro = node.func(*data)
elif node.unpack_input and isinstance(data, dict):
coro = node.func(**data)
else:
coro = node.func(data)
except asyncio.CancelledError:
continue
except Exception as exc:
await self._update_data_flow_error_stats(node_name)
await self._update_exceptions(node_name, exc)
self._update_data_flow_error_stats(node_name)
self._update_exceptions(node_name, exc)
self._logger.error(traceback.format_exc())
if self._graph.halt_on_exception or node.halt_on_exception:
self._logger.error(
Expand All @@ -238,8 +238,8 @@ async def _consumer(self, node_name: str):
except asyncio.CancelledError:
break
except Exception as exc:
await self._update_data_flow_error_stats(node_name)
await self._update_exceptions(node_name, exc)
self._update_data_flow_error_stats(node_name)
self._update_exceptions(node_name, exc)
self._logger.error(traceback.format_exc())
if self._graph.halt_on_exception or node.halt_on_exception:
# close current agen
Expand All @@ -255,27 +255,33 @@ async def _consumer(self, node_name: str):
continue
else:
node_edges = self._graph._nodes_to_edges[node_name]
await self._update_data_flow_in_out_stats(node_name, node_edges)
self._update_data_flow_in_out_stats(node_name, node_edges)
await self._add_to_node_queue(node_edges, next_data_item)

queue.task_done()
except asyncio.CancelledError:
break

async def _update_data_flow_in_out_stats(self, in_node: str, out_nodes: set[str]):
def _update_data_flow_in_out_stats(self, in_node: str, out_nodes: set[str]):
if self._data_flow_stats is None:
return None
self._data_flow_stats[in_node]["out"] += 1
for node in out_nodes:
self._data_flow_stats[node]["in"] += 1

async def _update_data_flow_error_stats(self, node: str):
def _update_data_flow_error_stats(self, node: str):
if self._data_flow_stats is None:
return None
self._data_flow_stats[node]["err"] += 1

async def _update_exceptions(self, node: str, exc: Exception):
def _update_exceptions(self, node: str, exc: Exception):
if self._exceptions is None:
return None
self._exceptions[node].append(exc)

async def _pipeline_execution(self):
self._data_flow_stats: dict[str, dict[str, int]] = {}
self._exceptions: dict[str, deque[Exception]] = {}
self._data_flow_stats = {}
self._exceptions = {}

for node_name, node in self._graph._nodes.items():
queue = asyncio.Queue(maxsize=node.queue_size)
Expand All @@ -299,7 +305,7 @@ async def _pipeline_execution(self):
await asyncio.gather(*self._consumer_tasks.values())

if self._data_flow_logging:
await self._log_data_flow_nodes()
self._log_data_flow_nodes()

def _get_start_node_args(self, start_node_args) -> dict[str, tuple]:
if start_node_args is None:
Expand All @@ -313,7 +319,7 @@ def _get_start_node_args(self, start_node_args) -> dict[str, tuple]:
raise TypeError(f"args for the node '{node}' isn't a tuple: {args}")
return start_node_args

def execute(self, start_nodes: dict[str, tuple] = None) -> None:
def execute(self, start_nodes: dict[str, tuple] | None = None) -> None:
"""Start executing the functions along the graph.
Parameters
Expand Down
3 changes: 0 additions & 3 deletions src/async_graph_data_flow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
from typing import Any, Callable, NamedTuple


_DEFAULT_NUM_OF_WORKERS = 1


class InvalidAsyncGraphError(Exception):
pass

Expand Down

0 comments on commit 184cf40

Please sign in to comment.