Skip to content

Commit

Permalink
Merge pull request #562 from Flowminder/unique_subscriber_counts
Browse files Browse the repository at this point in the history
Unique subscriber counts
  • Loading branch information
maxalbert authored Apr 5, 2019
2 parents c330e5c + 3e50c9b commit 5c283c6
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [Unreleased]
### Added
- [#562] Implemented new flowclient API entrypoint, unique_subscriber_counts(), to access (with simplified parameters) equivalent flowmachine query

- New schema `aggregates` and table `aggregates.aggregates` have been created for maintaining a record of the process and completion of scheduled aggregates.

Expand Down
4 changes: 4 additions & 0 deletions docs/notebook_preamble.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ def format_dict(x):
"permissions": {"run": True, "poll": True, "get_result": True},
"spatial_aggregation": ["admin3", "admin2", "admin1"],
},
"unique_subscriber_counts": {
"permissions": {"run": True, "poll": True, "get_result": True},
"spatial_aggregation": ["admin3", "admin2", "admin1"],
},
}

TOKEN = make_token(
Expand Down
1 change: 1 addition & 0 deletions flowauth/backend/flowauth/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ def make_demodata(): # pragma: no cover
"meaningful_locations_aggregate",
"meaningful_locations_od_matrix",
"geography",
"unique_subscriber_counts",
):
c = Capability(name=c)
db.session.add(c)
Expand Down
2 changes: 2 additions & 0 deletions flowclient/flowclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
query_is_ready,
run_query,
get_available_dates,
unique_subscriber_counts,
)

__all__ = [
Expand All @@ -48,4 +49,5 @@
"query_is_ready",
"run_query",
"get_available_dates",
"unique_subscriber_counts",
]
28 changes: 28 additions & 0 deletions flowclient/flowclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,3 +935,31 @@ def flows(
"to_location": to_location,
"aggregation_unit": aggregation_unit,
}


def unique_subscriber_counts(
start_date: str, end_date: str, aggregation_unit: str
) -> dict:
"""
Return query spec for unique subscriber counts
Parameters
----------
start_date : str
ISO format date of the first day of the count, e.g. "2016-01-01"
end_date : str
ISO format date of the day _after_ the final date of the count, e.g. "2016-01-08"
aggregation_unit : str
Unit of aggregation, e.g. "admin3"
Returns
-------
dict
Dict which functions as the query specification
"""
return {
"query_kind": "unique_subscriber_counts",
"start_date": start_date,
"end_date": end_date,
"aggregation_unit": aggregation_unit,
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
)
from .geography import GeographySchema, GeographyExposed
from .location_event_counts import LocationEventCountsSchema, LocationEventCountsExposed
from .unique_subscriber_counts import (
UniqueSubscriberCountsSchema,
UniqueSubscriberCountsExposed,
)
from .dfs_metric_total_amount import (
DFSTotalMetricAmountSchema,
DFSTotalMetricAmountExposed,
Expand All @@ -36,6 +40,7 @@ class FlowmachineQuerySchema(OneOfSchema):
"meaningful_locations_between_dates_od_matrix": MeaningfulLocationsBetweenDatesODMatrixSchema,
"geography": GeographySchema,
"location_event_counts": LocationEventCountsSchema,
"unique_subscriber_counts": UniqueSubscriberCountsSchema,
"dfs_metric_total_amount": DFSTotalMetricAmountSchema,
}

Expand All @@ -58,6 +63,8 @@ def get_obj_type(self, obj):
return "geography"
elif isinstance(obj, LocationEventCountsExposed):
return "location_event_counts"
elif isinstance(obj, UniqueSubscriberCountsExposed):
return "unique_subscriber_counts"
elif isinstance(obj, DFSTotalMetricAmountExposed):
return "dfs_metric_total_amount"
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from marshmallow import Schema, fields, post_load
from marshmallow.validate import OneOf, Length

from flowmachine.features import UniqueSubscriberCounts
from .base_exposed_query import BaseExposedQuery
from .custom_fields import AggregationUnit

__all__ = ["UniqueSubscriberCountsSchema", "UniqueSubscriberCountsExposed"]


class UniqueSubscriberCountsSchema(Schema):

start_date = fields.Date(required=True)
end_date = fields.Date(required=True)
aggregation_unit = AggregationUnit()

@post_load
def make_query_object(self, params):
return UniqueSubscriberCountsExposed(**params)


class UniqueSubscriberCountsExposed(BaseExposedQuery):
def __init__(self, *, start_date, end_date, aggregation_unit):
# Note: all input parameters need to be defined as attributes on `self`
# so that marshmallow can serialise the object correctly.
self.start_date = start_date
self.end_date = end_date
self.aggregation_unit = aggregation_unit

@property
def _flowmachine_query_obj(self):
"""
Return the underlying flowmachine UniqueSubscriberCounts object.
Returns
-------
Query
"""
return UniqueSubscriberCounts(
start=self.start_date, stop=self.end_date, level=self.aggregation_unit
)
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def test_get_query_params(params, zmq_port, zmq_host):
Running 'get_query_params' against an existing query_id returns the expected parameters with which the query was run.
"""
#
# Run daily_location query.
# Run query.
#
msg = {"action": "run_query", "params": params, "request_id": "DUMMY_ID"}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def test_get_query_kind(params, zmq_port, zmq_host):
Running 'get_query_kind' against an existing query_id returns the expected query kind.
"""
#
# Run daily_location query.
# Run query.
#
msg = {"action": "run_query", "params": params, "request_id": "DUMMY_ID"}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def test_get_available_queries(send_zmq_message_and_receive_reply):
"meaningful_locations_between_dates_od_matrix",
"geography",
"location_event_counts",
"unique_subscriber_counts",
"dfs_metric_total_amount",
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@
"meaningful_locations_aggregate": "#/components/schemas/MeaningfulLocationsAggregate",
"meaningful_locations_between_dates_od_matrix": "#/components/schemas/MeaningfulLocationsBetweenDatesODMatrix",
"meaningful_locations_between_label_od_matrix": "#/components/schemas/MeaningfulLocationsBetweenLabelODMatrix",
"modal_location": "#/components/schemas/ModalLocation"
"modal_location": "#/components/schemas/ModalLocation",
"unique_subscriber_counts": "#/components/schemas/UniqueSubscriberCounts"
},
"propertyName": "query_kind"
},
Expand Down Expand Up @@ -130,6 +131,9 @@
},
{
"$ref": "#/components/schemas/ModalLocation"
},
{
"$ref": "#/components/schemas/UniqueSubscriberCounts"
}
]
},
Expand Down Expand Up @@ -538,5 +542,32 @@
"aggregation_unit"
],
"type": "object"
},
"UniqueSubscriberCounts": {
"properties": {
"aggregation_unit": {
"enum": [
"admin0",
"admin1",
"admin2",
"admin3"
],
"type": "string"
},
"end_date": {
"format": "date",
"type": "string"
},
"start_date": {
"format": "date",
"type": "string"
}
},
"required": [
"aggregation_unit",
"end_date",
"start_date"
],
"type": "object"
}
}
8 changes: 8 additions & 0 deletions integration_tests/tests/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
"subscriber_subset": None,
},
),
(
"unique_subscriber_counts",
{
"start_date": "2016-01-01",
"end_date": "2016-01-02",
"aggregation_unit": "admin3",
},
),
# (
# # TODO: currently flowclient.modal_location() doesn't accept a 'locations'
# # argument but rather expects a list of location objects. We can't test this
Expand Down

0 comments on commit 5c283c6

Please sign in to comment.