Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flowmachine unique location count #979

Merged
merged 17 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Added

- The dev provisioning Ansible playbook now automatically generates an SSH key pair for the `flowkit` user. [#892](https://github.com/Flowminder/FlowKit/issues/892)
- FlowAPI's 'joined_spatial_aggregate' endpoint now exposes unique location counts.[#949](https://github.com/Flowminder/FlowKit/issues/949)

### Changed

Expand Down
22 changes: 17 additions & 5 deletions flowapi/flowapi/user_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,24 @@ def _get_query_kinds_and_aggregation_units(
location_spec = self._get_query_kinds_and_aggregation_units(
query_json=query_json["locations"]
)[0]
# Because metric specs don't have an aggregation unit, we stitch on the one
# from the locations
# If the metric spec doesn't have an aggregation unit, we stitch on the one
# from the locations.
# TODO: This is a bit of a hack, we should tidy this up soon!
metric_query_json = query_json["metric"].copy()
location_aggregation_unit = location_spec[1]
if "aggregation_unit" not in metric_query_json.keys():
metric_query_json["aggregation_unit"] = location_aggregation_unit
else:
if (
metric_query_json["aggregation_unit"]
!= location_aggregation_unit
):
# TODO: add support for different aggregation units
raise ValueError(
"Different aggregation units for metric and location are not currently supported."
)
metric_spec = self._get_query_kinds_and_aggregation_units(
query_json=dict(
**query_json["metric"], aggregation_unit=location_spec[1]
)
query_json=metric_query_json
)[0]
return [location_spec, metric_spec]
except (KeyError, SyntaxError):
Expand Down
2 changes: 2 additions & 0 deletions flowclient/flowclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
spatial_aggregate,
joined_spatial_aggregate,
radius_of_gyration,
unique_location_counts,
)

__all__ = [
Expand Down Expand Up @@ -62,4 +63,5 @@
"spatial_aggregate",
"joined_spatial_aggregate",
"radius_of_gyration",
"unique_location_counts",
]
36 changes: 34 additions & 2 deletions flowclient/flowclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,12 @@ def get_geography(*, connection: Connection, aggregation_unit: str) -> dict:
API connection to use
aggregation_unit : str
aggregation unit, e.g. 'admin3'

Returns
-------
dict
geography data as a GeoJSON FeatureCollection

"""
logger.info(
f"Getting {connection.url}/api/{connection.api_version}/geography/{aggregation_unit}"
Expand Down Expand Up @@ -1249,3 +1249,35 @@ def aggregate_network_objects(
"statistic": statistic,
"aggregate_by": aggregate_by,
}


def unique_location_counts(
*, start_date, end_date, aggregation_unit, subscriber_subset=None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need to be type annotated.

) -> dict:
"""
Return query spec for unique location count

Parameters
----------
start_date : str
ISO format date of the first day of the count, e.g. "2016-01-01"
end_date : str
ISO format date of the day _after_ the final date of the count, e.g. "2016-01-08"
level : str
Unit of aggregation, e.g. "admin3"
subscriber_subset : dict or None, default None
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
subset query.
Returns
-------
dict
Dict which functions as the query specification
"""
return {
"query_kind": "unique_location_counts",
"start_date": start_date,
"end_date": end_date,
"aggregation_unit": aggregation_unit,
"subscriber_subset": subscriber_subset,
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from flowmachine.core.server.query_schemas.radius_of_gyration import (
RadiusOfGyrationSchema,
)
from flowmachine.core.server.query_schemas.unique_location_counts import (
UniqueLocationCountsSchema,
)
from flowmachine.core.server.query_schemas.spatial_aggregate import (
InputToSpatialAggregate,
)
Expand All @@ -21,7 +24,10 @@

class JoinableMetrics(OneOfSchema):
type_field = "query_kind"
type_schemas = {"radius_of_gyration": RadiusOfGyrationSchema}
type_schemas = {
"radius_of_gyration": RadiusOfGyrationSchema,
"unique_location_counts": UniqueLocationCountsSchema,
}


class JoinedSpatialAggregateSchema(Schema):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from marshmallow import Schema, fields, post_load
from marshmallow.validate import OneOf, Length

from flowmachine.features import UniqueLocationCounts
from .base_exposed_query import BaseExposedQuery
from .custom_fields import SubscriberSubset, AggregationUnit

__all__ = ["UniqueLocationCountsSchema", "UniqueLocationCountsExposed"]


class UniqueLocationCountsSchema(Schema):
query_kind = fields.String(validate=OneOf(["unique_location_counts"]))
start_date = fields.Date(required=True)
end_date = fields.Date(required=True)
aggregation_unit = AggregationUnit()
subscriber_subset = SubscriberSubset()

@post_load
def make_query_object(self, params):
return UniqueLocationCountsExposed(**params)


class UniqueLocationCountsExposed(BaseExposedQuery):
def __init__(
self, *, start_date, end_date, aggregation_unit, subscriber_subset=None
):
# Note: all input parameters need to be defined as attributes on `self`
# so that marshmallow can serialise the object correctly.
self.start_date = start_date
self.end_date = end_date
self.aggregation_unit = aggregation_unit
self.subscriber_subset = subscriber_subset

@property
def _flowmachine_query_obj(self):
"""
Return the underlying flowmachine unique_location_counts object.

Returns
-------
Query
"""
return UniqueLocationCounts(
start=self.start_date,
stop=self.end_date,
level=self.aggregation_unit,
subscriber_subset=self.subscriber_subset,
)
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def __init__(

@property
def column_names(self) -> List[str]:
return ["subscriber", "unique_location_counts"]
return ["subscriber", "value"]

def _make_query(self):
"""
Expand All @@ -134,13 +134,13 @@ def _make_query(self):
get_columns_for_level(self.ul.level, self.ul.column_name)
)
sql = """
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to an f-string while we're here?

SELECT
subscriber,
COUNT(*) as unique_location_counts
SELECT
subscriber,
COUNT(*) as value
FROM
(SELECT DISTINCT subscriber, {rc}
(SELECT DISTINCT subscriber, {rc}
FROM ({all_locs}) AS all_locs) AS _
GROUP BY subscriber
GROUP BY subscriber
""".format(
all_locs=self.ul.get_query(), rc=relevant_columns
)
Expand Down
6 changes: 1 addition & 5 deletions flowmachine/tests/test_unique_location_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ def test_correct_counts(get_dataframe):
dful = get_dataframe(
subscriber_locations("2016-01-01", "2016-01-02", level="cell", hours=(5, 17))
)
assert [
df["unique_location_counts"][0],
df["unique_location_counts"][1],
df["unique_location_counts"][2],
] == [
assert [df["value"][0], df["value"][1], df["value"][2]] == [
len(dful[dful["subscriber"] == df["subscriber"][0]]["location_id"].unique()),
len(dful[dful["subscriber"] == df["subscriber"][1]]["location_id"].unique()),
len(dful[dful["subscriber"] == df["subscriber"][2]]["location_id"].unique()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,17 @@
"JoinableMetrics": {
"discriminator": {
"mapping": {
"radius_of_gyration": "#/components/schemas/RadiusOfGyration"
"radius_of_gyration": "#/components/schemas/RadiusOfGyration",
"unique_location_counts": "#/components/schemas/UniqueLocationCounts"
},
"propertyName": "query_kind"
},
"oneOf": [
{
"$ref": "#/components/schemas/RadiusOfGyration"
},
{
"$ref": "#/components/schemas/UniqueLocationCounts"
}
]
},
Expand Down Expand Up @@ -884,6 +888,47 @@
],
"type": "object"
},
"UniqueLocationCounts": {
"properties": {
"aggregation_unit": {
"enum": [
"admin0",
"admin1",
"admin2",
"admin3"
],
"type": "string"
},
"end_date": {
"format": "date",
"type": "string"
},
"query_kind": {
"enum": [
"unique_location_counts"
],
"type": "string"
},
"start_date": {
"format": "date",
"type": "string"
},
"subscriber_subset": {
"enum": [
null
],
"nullable": true,
"type": "string"
}
},
"required": [
"aggregation_unit",
"end_date",
"query_kind",
"start_date"
],
"type": "object"
},
"UniqueSubscriberCounts": {
"properties": {
"aggregation_unit": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@
"oneOf": [
{
"$ref": "#/components/schemas/RadiusOfGyration"
},
{
"$ref": "#/components/schemas/UniqueLocationCounts"
}
]
},
Expand Down Expand Up @@ -833,6 +836,47 @@
],
"type": "object"
},
"UniqueLocationCounts": {
"properties": {
"aggregation_unit": {
"enum": [
"admin0",
"admin1",
"admin2",
"admin3"
],
"type": "string"
},
"end_date": {
"format": "date",
"type": "string"
},
"query_kind": {
"enum": [
"unique_location_counts"
],
"type": "string"
},
"start_date": {
"format": "date",
"type": "string"
},
"subscriber_subset": {
"enum": [
null
],
"nullable": true,
"type": "string"
}
},
"required": [
"aggregation_unit",
"end_date",
"query_kind",
"start_date"
],
"type": "object"
},
"UniqueSubscriberCounts": {
"properties": {
"aggregation_unit": {
Expand Down
Loading