-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[CIVIS-7472] DOC add technical notes (#9)
- Loading branch information
1 parent
b127a6d
commit b1bf2e5
Showing
25 changed files
with
1,187 additions
and
183 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
# Sphinx build info version 1 | ||
# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. | ||
config: b3dd4ed71bb2b56b4ce0349a56974cef | ||
config: 82a13e1d440b1fccb9c82f453fa605a1 | ||
tags: 645f666f9bcd5a90fca523b33c5a78b7 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,237 @@ | ||
.. _technical: | ||
|
||
Technical Notes | ||
=============== | ||
|
||
Functions Arranged as a Graph | ||
----------------------------- | ||
|
||
The ``async-graph-data-flow`` package provides a framework for arranging | ||
functions in a directed acyclic graph (DAG). | ||
|
||
.. mermaid:: | ||
|
||
flowchart LR | ||
|
||
A[func1] --> B[func2] | ||
B --> C[func3] | ||
B --> D[func4] | ||
C --> E[func5] | ||
D --> E | ||
|
||
Each function must be an asynchronous generator function, | ||
which means it is defined with the ``async`` keyword | ||
and yields something. | ||
|
||
.. code-block:: python | ||
async def func1(arg1, arg2): | ||
... | ||
yield output | ||
For two nodes to be connected correctly in terms of function definition, | ||
the source node's function must yield what the destination node's function | ||
expects as input | ||
(for more information, | ||
see the ``unpack_input`` parameter of :func:`~async_graph_data_flow.AsyncGraph.add_node`). | ||
|
||
.. mermaid:: | ||
|
||
flowchart LR | ||
|
||
START[ ] -.-> A | ||
B -.-> STOP[ ] | ||
A["async def func2(...):\n ...\n yield <strong>foo, bar</strong>"] --> B["async def func3(<strong>foo, bar</strong>):\n ...\n yield ..."] | ||
style A text-align:left | ||
style B text-align:left | ||
style START fill-opacity:0, stroke-opacity:0; | ||
style STOP fill-opacity:0, stroke-opacity:0; | ||
|
||
Tasks and Queues | ||
---------------- | ||
|
||
During runtime, each node's function is run concurrently | ||
as one or more :class:`tasks<asyncio.Task>` in the event loop. | ||
The number of tasks for a given node is controlled by | ||
the ``max_tasks`` parameter that can be set at :func:`~async_graph_data_flow.AsyncGraph.add_node`. | ||
|
||
A node is associated with a :class:`Queue<asyncio.Queue>` instance | ||
responsible for providing the items to the tasks of the node. | ||
The queue receives its items as the source nodes yield them. | ||
The maximum number of items a queue can hold is specified by ``queue_size`` | ||
at :func:`~async_graph_data_flow.AsyncGraph.add_node`. | ||
The queue is first-in-first-out, which means that | ||
it keeps track of the items yielded from the tasks of the source nodes | ||
and feeds them one by one in the order by which the queue has received them. | ||
An item leaves a queue when a task of the destination node becomes | ||
available to process it. | ||
|
||
.. mermaid:: | ||
|
||
flowchart LR | ||
|
||
start1[ ] -.- queue1((" ")) | ||
subgraph box1 [ ] | ||
queue1 --> node1[" "] | ||
end | ||
style start1 fill-opacity:0, stroke-opacity:0; | ||
|
||
start2[ ] -.- queue2((" ")) | ||
subgraph box2 [ ] | ||
queue2 --> node2[" "] | ||
end | ||
style start2 fill-opacity:0, stroke-opacity:0; | ||
|
||
subgraph node and its associated queue | ||
queue3((queue)) --> node3[task 1, task 2,\ntask 3, ...] | ||
end | ||
|
||
node1 --> |yields\nitems| queue3 | ||
node2 --> |yields\nitems| queue3 | ||
node3 -.-> |yields\nitems| STOP[ ] | ||
style STOP fill-opacity:0, stroke-opacity:0; | ||
|
||
Example | ||
------- | ||
|
||
Let's check out a sample script using async-graph-data-flow and processing actual data | ||
that brings together some of the components discussed above. | ||
The example below pulls data from `Open Brewery DB <https://www.openbrewerydb.org/>`_ | ||
into a local CSV file. | ||
|
||
|
||
.. code-block:: python | ||
# This Python script was tested with Python 3.11. | ||
# Apart from async-graph-data-flow, it requires several other third-party dependencies, | ||
# which can be installed by `pip install aiocsv aiofile aiohttp`. | ||
import aiocsv | ||
import aiofile | ||
import aiohttp | ||
from async_graph_data_flow import AsyncGraph, AsyncExecutor | ||
# API doc: https://www.openbrewerydb.org/documentation | ||
URL = "https://api.openbrewerydb.org/v1/breweries" | ||
CSV_HEADER = [ | ||
"id", | ||
"name", | ||
"brewery_type", | ||
"address_1", | ||
"address_2", | ||
"address_3", | ||
"city", | ||
"state_province", | ||
"postal_code", | ||
"country", | ||
"longitude", | ||
"latitude", | ||
"phone", | ||
"website_url", | ||
"state", | ||
"street", | ||
] | ||
OUTPUT_FILENAME = "breweries_us_async.csv" | ||
has_written_csv_header = False | ||
async def get_open_brewery_data(): | ||
page = 1 | ||
async with aiohttp.ClientSession() as session: | ||
while True: | ||
params = { | ||
"by_country": "United States", | ||
"page": page, | ||
"per_page": 200, | ||
} | ||
async with session.get(URL, params=params) as response: | ||
response.raise_for_status() | ||
data = await response.json() | ||
if not data: | ||
break | ||
else: | ||
yield data | ||
page += 1 | ||
async def write_to_csv(data: list[dict[str, str]]): | ||
global has_written_csv_header | ||
async with aiofile.async_open(OUTPUT_FILENAME, mode="a", encoding="utf8") as f: | ||
csv_writer = aiocsv.AsyncDictWriter(f, CSV_HEADER) | ||
if not has_written_csv_header: | ||
await csv_writer.writeheader() | ||
has_written_csv_header = True | ||
await csv_writer.writerows(data) | ||
yield | ||
def main(): | ||
graph = AsyncGraph() | ||
graph.add_node(get_open_brewery_data) | ||
graph.add_node(write_to_csv) | ||
graph.add_edge(get_open_brewery_data, write_to_csv) | ||
executor = AsyncExecutor(graph) | ||
executor.execute() | ||
print("data downloaded:", OUTPUT_FILENAME) | ||
if __name__ == "__main__": | ||
main() | ||
In this code, ``main()`` defines a graph and executes it. | ||
The graph has two connected nodes. | ||
The source node, with the asynchronous generator function ``get_open_brewery_data()``, | ||
yields items to the destination node with ``write_to_csv()``: | ||
|
||
.. mermaid:: | ||
|
||
flowchart LR | ||
|
||
A[get_open_brewery_data] --> B[write_to_csv] | ||
|
||
For the source node, | ||
the following shows an abridged version of ``get_open_brewery_data()`` | ||
to highlight what the function yields: | ||
|
||
.. code-block:: python | ||
async def get_open_brewery_data(): | ||
page = 1 | ||
... | ||
while True: | ||
params = {"page": page, ...} | ||
... | ||
yield data | ||
page += 1 | ||
As the data from Open Brewery DB is paginated from its API, | ||
``get_open_brewery_data()`` makes an API call for one page worth of data, | ||
yields this data to the destination node (``write_to_csv()``), | ||
repeats this process, and stops once all pages of data have been retrieved. | ||
|
||
The destination node with ``write_to_csv()`` has its associated queue provide | ||
inputs from the items yielded by ``get_open_brewery_data()``. | ||
|
||
|
||
.. mermaid:: | ||
|
||
flowchart LR | ||
|
||
Q(("Queue items:\n[{'col1': 'val1', ...}, ...]\n[{'col1': 'val1', ...}, ...]\n...\n")) | ||
A[get_open_brewery_data] | ||
B[write_to_csv] | ||
A --> |yields\nitems| Q | ||
Q --> B | ||
|
||
``get_open_brewery_data()`` yields a page of the Open Brewery DB data, | ||
which is a list of records where each record is a dictionary of column names | ||
mapped to values. The function signature of ``write_to_csv()`` expects exactly | ||
such a list of dictionaries: | ||
|
||
.. code-block:: python | ||
async def write_to_csv(data: list[dict[str, str]]): | ||
... | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.