Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncio flowclient #2365

Merged
merged 21 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ python:
before_install:
- cd flowclient
install:
- pip install .
- pip install .[test]
# command to run tests
script:
- pytest
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Added new Flows type query to FlowAPI `unique_locations`, which produces the paired regional connectivity [COVID-19 indicator](https://github.com/Flowminder/COVID-19/blob/master/od_matrix_undirected_all_pairs.md)
- Added FlowClient function `unique_locations_spec`, which can be used on either side of a `flows` query
- Added FlowClient functions: `unique_visitor_counts`, `active_at_reference_location_counts`, `unmoving_counts`, `unmoving_at_reference_location_counts`, `trips_od_matrix`, and `consecutive_trips_od_matrix`. [#2333](https://github.com/Flowminder/FlowKit/issues/2333)
- FlowClient now has an asyncio API. Use `connect_async` instead of `connect` to create an `ASyncConnection`, and `await` methods on `APIQuery` objects. [#2199](https://github.com/Flowminder/FlowKit/issues/2199)

### Fixed
- Fixed FlowMachine server becoming deadlocked under load. [#2390](https://github.com/Flowminder/FlowKit/issues/2390)
Expand Down
4 changes: 3 additions & 1 deletion flowclient/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ tqdm = "*"

[dev-packages]
pytest = "*"
"pytest-cov" = "*"
pytest-asyncio = "*"
pytest-cov = "*"
asynctest = "*"
versioneer = "*"
black = "==19.10b0"
ipython = "*"
Expand Down
18 changes: 17 additions & 1 deletion flowclient/Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 17 additions & 22 deletions flowclient/flowclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@
__version__ = get_versions()["version"]
del get_versions

from flowclient.api_query import APIQuery
from .connection import Connection
from flowclient.client import connect

from flowclient.async_api_query import ASyncAPIQuery
from .async_connection import ASyncConnection
from flowclient.async_client import connect_async


from .client import (
Connection,
connect,
daily_location_spec,
modal_location_spec,
modal_location_from_dates_spec,
get_geography,
get_result,
get_result_by_query_id,
Expand All @@ -24,11 +28,16 @@
query_is_ready,
run_query,
get_available_dates,
)
from .query_specs import (
daily_location_spec,
modal_location_spec,
modal_location_from_dates_spec,
radius_of_gyration_spec,
unique_location_counts_spec,
topup_balance_spec,
subscriber_degree_spec,
topup_amount_spec,
topup_balance_spec,
event_count_spec,
displacement_spec,
pareto_interactions_spec,
Expand All @@ -37,7 +46,6 @@
random_sample_spec,
unique_locations_spec,
)
from .api_query import APIQuery
from . import aggregates
from .aggregates import (
location_event_counts,
Expand All @@ -61,11 +69,8 @@

__all__ = [
"aggregates",
"Connection",
"connect_async",
"connect",
"daily_location_spec",
"modal_location_spec",
"modal_location_from_dates_spec",
"get_geography",
"get_result",
"get_result_by_query_id",
Expand All @@ -74,18 +79,8 @@
"query_is_ready",
"run_query",
"get_available_dates",
"radius_of_gyration_spec",
"unique_location_counts_spec",
"subscriber_degree_spec",
"topup_amount_spec",
"topup_balance_spec",
"event_count_spec",
"displacement_spec",
"pareto_interactions_spec",
"nocturnal_events_spec",
"handset_spec",
"random_sample_spec",
"APIQuery",
"ASyncAPIQuery",
"location_event_counts",
"meaningful_locations_aggregate",
"meaningful_locations_between_label_od_matrix",
Expand Down
58 changes: 21 additions & 37 deletions flowclient/flowclient/aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from merge_args import merge_args

from flowclient.client import Connection
from flowclient import Connection
from flowclient.api_query import APIQuery


Expand Down Expand Up @@ -93,9 +93,7 @@ def location_event_counts(*, connection: Connection, **kwargs) -> APIQuery:
APIQuery
Location event counts query
"""
return APIQuery(
connection=connection, parameters=location_event_counts_spec(**kwargs)
)
return connection.make_api_query(parameters=location_event_counts_spec(**kwargs))


def meaningful_locations_aggregate_spec(
Expand Down Expand Up @@ -266,8 +264,8 @@ def meaningful_locations_aggregate(*, connection: Connection, **kwargs) -> APIQu
-----
Does not return any value below 15.
"""
return APIQuery(
connection=connection, parameters=meaningful_locations_aggregate_spec(**kwargs)
return connection.make_api_query(
parameters=meaningful_locations_aggregate_spec(**kwargs)
)


Expand Down Expand Up @@ -443,8 +441,7 @@ def meaningful_locations_between_label_od_matrix(
.. [1] S. Isaacman et al., "Identifying Important Places in People's Lives from Cellular Network Data", International Conference on Pervasive Computing (2011), pp 133-151.
.. [2] Zagatti, Guilherme Augusto, et al. "A trip to work: Estimation of origin and destination of commuting patterns in the main metropolitan regions of Haiti using CDR." Development Engineering 3 (2018): 133-165.
"""
return APIQuery(
connection=connection,
return connection.make_api_query(
parameters=meaningful_locations_between_label_od_matrix_spec(**kwargs),
)

Expand Down Expand Up @@ -633,8 +630,7 @@ def meaningful_locations_between_dates_od_matrix(
.. [1] S. Isaacman et al., "Identifying Important Places in People's Lives from Cellular Network Data", International Conference on Pervasive Computing (2011), pp 133-151.
.. [2] Zagatti, Guilherme Augusto, et al. "A trip to work: Estimation of origin and destination of commuting patterns in the main metropolitan regions of Haiti using CDR." Development Engineering 3 (2018): 133-165.
"""
return APIQuery(
connection=connection,
return connection.make_api_query(
parameters=meaningful_locations_between_dates_od_matrix_spec(**kwargs),
)

Expand Down Expand Up @@ -687,7 +683,7 @@ def flows(*, connection: Connection, **kwargs) -> APIQuery:
Flows query

"""
return APIQuery(connection=connection, parameters=flows_spec(**kwargs))
return connection.make_api_query(parameters=flows_spec(**kwargs))


def unique_subscriber_counts_spec(
Expand Down Expand Up @@ -739,9 +735,7 @@ def unique_subscriber_counts(*, connection: Connection, **kwargs) -> APIQuery:
APIQuery
Unique subscriber counts query
"""
return APIQuery(
connection=connection, parameters=unique_subscriber_counts_spec(**kwargs)
)
return connection.make_api_query(parameters=unique_subscriber_counts_spec(**kwargs))


def location_introversion_spec(
Expand Down Expand Up @@ -798,9 +792,7 @@ def location_introversion(*, connection: Connection, **kwargs) -> APIQuery:
APIQuery
Location introversion query
"""
return APIQuery(
connection=connection, parameters=location_introversion_spec(**kwargs)
)
return connection.make_api_query(parameters=location_introversion_spec(**kwargs))


def total_network_objects_spec(
Expand Down Expand Up @@ -857,9 +849,7 @@ def total_network_objects(*, connection: Connection, **kwargs) -> APIQuery:
APIQuery
Total network objects query
"""
return APIQuery(
connection=connection, parameters=total_network_objects_spec(**kwargs)
)
return connection.make_api_query(parameters=total_network_objects_spec(**kwargs))


def aggregate_network_objects_spec(
Expand Down Expand Up @@ -913,8 +903,8 @@ def aggregate_network_objects(*, connection: Connection, **kwargs) -> APIQuery:
APIQuery
Aggregate network objects query
"""
return APIQuery(
connection=connection, parameters=aggregate_network_objects_spec(**kwargs)
return connection.make_api_query(
parameters=aggregate_network_objects_spec(**kwargs)
)


Expand Down Expand Up @@ -952,7 +942,7 @@ def spatial_aggregate(*, connection: Connection, **kwargs) -> APIQuery:
APIQuery
Spatial aggregate query
"""
return APIQuery(connection=connection, parameters=spatial_aggregate_spec(**kwargs))
return connection.make_api_query(parameters=spatial_aggregate_spec(**kwargs))


def consecutive_trips_od_matrix_spec(
Expand Down Expand Up @@ -1014,8 +1004,8 @@ def consecutive_trips_od_matrix(*, connection: Connection, **kwargs) -> APIQuery
APIQuery
consecutive_trips_od_matrix query
"""
return APIQuery(
connection=connection, parameters=consecutive_trips_od_matrix_spec(**kwargs),
return connection.make_api_query(
parameters=consecutive_trips_od_matrix_spec(**kwargs),
)


Expand Down Expand Up @@ -1078,7 +1068,7 @@ def trips_od_matrix(*, connection: Connection, **kwargs) -> APIQuery:
APIQuery
trips_od_matrix query
"""
return APIQuery(connection=connection, parameters=trips_od_matrix_spec(**kwargs),)
return connection.make_api_query(parameters=trips_od_matrix_spec(**kwargs),)


def unmoving_counts_spec(
Expand Down Expand Up @@ -1118,7 +1108,7 @@ def unmoving_counts(*, connection: Connection, **kwargs) -> APIQuery:
APIQuery
unmoving_counts query
"""
return APIQuery(connection=connection, parameters=unmoving_counts_spec(**kwargs),)
return connection.make_api_query(parameters=unmoving_counts_spec(**kwargs),)


def unmoving_at_reference_location_counts_spec(
Expand Down Expand Up @@ -1170,8 +1160,7 @@ def unmoving_at_reference_location_counts(
APIQuery
unmoving_at_reference_location_counts query
"""
return APIQuery(
connection=connection,
return connection.make_api_query(
parameters=unmoving_at_reference_location_counts_spec(**kwargs),
)

Expand Down Expand Up @@ -1225,8 +1214,7 @@ def active_at_reference_location_counts(
APIQuery
active_at_reference_location_counts query
"""
return APIQuery(
connection=connection,
return connection.make_api_query(
parameters=active_at_reference_location_counts_spec(**kwargs),
)

Expand Down Expand Up @@ -1284,9 +1272,7 @@ def joined_spatial_aggregate(*, connection: Connection, **kwargs) -> APIQuery:
APIQuery
Joined spatial aggregate query
"""
return APIQuery(
connection=connection, parameters=joined_spatial_aggregate_spec(**kwargs)
)
return connection.make_api_query(parameters=joined_spatial_aggregate_spec(**kwargs))


def histogram_aggregate_spec(
Expand Down Expand Up @@ -1344,6 +1330,4 @@ def histogram_aggregate(*, connection: Connection, **kwargs) -> APIQuery:
APIQuery
Histogram aggregate query
"""
return APIQuery(
connection=connection, parameters=histogram_aggregate_spec(**kwargs)
)
return connection.make_api_query(parameters=histogram_aggregate_spec(**kwargs))
3 changes: 1 addition & 2 deletions flowclient/flowclient/api_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
from typing import Union, Optional

from flowclient.client import (
FlowclientConnectionError,
Connection,
run_query,
get_status,
get_result_by_query_id,
get_geojson_result_by_query_id,
wait_for_query_to_be_ready,
)
from flowclient.connection import Connection


class APIQuery:
Expand Down
Loading