Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose 'event_types' parameters #2632

Merged
merged 17 commits into from
Jun 3, 2020
Merged
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## [Unreleased]

### Added
- Queries run through FlowAPI can now be run on only a subset of the available CDR types, by supplying an `event_types` parameter. [#2631](https://github.com/Flowminder/FlowKit/issues/2631)
- FlowETL now includes QA checks for the earliest and latest timestamps in the ingested data. [#2627](https://github.com/Flowminder/FlowKit/issues/2627)

### Changed
76 changes: 71 additions & 5 deletions flowclient/flowclient/aggregates.py

Large diffs are not rendered by default.

67 changes: 60 additions & 7 deletions flowclient/flowclient/query_specs.py
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ def unique_locations_spec(
start_date: str,
end_date: str,
aggregation_unit: str,
event_types: Optional[List[str]] = None,
subscriber_subset: Union[dict, None] = None,
mapping_table: Optional[str] = None,
geom_table: Optional[str] = None,
@@ -27,6 +28,9 @@ def unique_locations_spec(
ISO format dates between which to get unique locations, e.g. "2016-01-01"
aggregation_unit : str
Unit of aggregation, e.g. "admin3"
event_types : list of {"calls", "sms", "mds", "topups"}, optional
Optionally, include only a subset of event types (for example: ["calls", "sms"]).
If None, include all event types in the query.
subscriber_subset : dict or None
Subset of subscribers to retrieve daily locations for. Must be None
(= all subscribers) or a dictionary with the specification of a
@@ -43,6 +47,7 @@ def unique_locations_spec(
start_date=start_date,
end_date=end_date,
aggregation_unit=aggregation_unit,
event_types=event_types,
subscriber_subset=subscriber_subset,
mapping_table=mapping_table,
geom_table=geom_table,
@@ -55,6 +60,7 @@ def daily_location_spec(
date: str,
aggregation_unit: str,
method: str,
event_types: Optional[List[str]] = None,
subscriber_subset: Union[dict, None] = None,
mapping_table: Optional[str] = None,
geom_table: Optional[str] = None,
@@ -72,6 +78,9 @@ def daily_location_spec(
Unit of aggregation, e.g. "admin3"
method : str
Method to use for daily location, one of 'last' or 'most-common'
event_types : list of {"calls", "sms", "mds", "topups"}, optional
Optionally, include only a subset of event types (for example: ["calls", "sms"]).
If None, include all event types in the query.
subscriber_subset : dict or None
Subset of subscribers to retrieve daily locations for. Must be None
(= all subscribers) or a dictionary with the specification of a
@@ -88,6 +97,7 @@ def daily_location_spec(
"date": date,
"aggregation_unit": aggregation_unit,
"method": method,
"event_types": event_types,
"subscriber_subset": subscriber_subset,
"mapping_table": mapping_table,
"geom_table": geom_table,
@@ -126,6 +136,7 @@ def modal_location_from_dates_spec(
end_date: str,
aggregation_unit: str,
method: str,
event_types: Optional[List[str]] = None,
subscriber_subset: Union[dict, None] = None,
mapping_table: Optional[str] = None,
geom_table: Optional[str] = None,
@@ -145,6 +156,9 @@ def modal_location_from_dates_spec(
Unit of aggregation, e.g. "admin3"
method : str
Method to use for daily locations, one of 'last' or 'most-common'
event_types : list of {"calls", "sms", "mds", "topups"}, optional
Optionally, include only a subset of event types (for example: ["calls", "sms"]).
If None, include all event types in the query.
subscriber_subset : dict or None
Subset of subscribers to retrieve modal locations for. Must be None
(= all subscribers) or a dictionary with the specification of a
@@ -165,6 +179,7 @@ def modal_location_from_dates_spec(
date=date,
aggregation_unit=aggregation_unit,
method=method,
event_types=event_types,
subscriber_subset=subscriber_subset,
mapping_table=mapping_table,
geom_table=geom_table,
@@ -176,7 +191,11 @@ def modal_location_from_dates_spec(


def radius_of_gyration_spec(
*, start_date: str, end_date: str, subscriber_subset: Union[dict, None] = None
*,
start_date: str,
end_date: str,
event_types: Optional[List[str]] = None,
subscriber_subset: Union[dict, None] = None,
) -> dict:
"""
Return query spec for radius of gyration
@@ -187,6 +206,9 @@ def radius_of_gyration_spec(
ISO format date of the first day of the count, e.g. "2016-01-01"
end_date : str
ISO format date of the day _after_ the final date of the count, e.g. "2016-01-08"
event_types : list of {"calls", "sms", "mds", "topups"}, optional
Optionally, include only a subset of event types (for example: ["calls", "sms"]).
If None, include all event types in the query.
subscriber_subset : dict or None, default None
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
@@ -201,6 +223,7 @@ def radius_of_gyration_spec(
"query_kind": "radius_of_gyration",
"start_date": start_date,
"end_date": end_date,
"event_types": event_types,
"subscriber_subset": subscriber_subset,
}

@@ -210,6 +233,7 @@ def unique_location_counts_spec(
start_date: str,
end_date: str,
aggregation_unit: str,
event_types: Optional[List[str]] = None,
subscriber_subset: Union[dict, None] = None,
mapping_table: Optional[str] = None,
geom_table: Optional[str] = None,
@@ -226,6 +250,9 @@ def unique_location_counts_spec(
ISO format date of the day _after_ the final date of the count, e.g. "2016-01-08"
aggregation_unit : str
Unit of aggregation, e.g. "admin3"
event_types : list of {"calls", "sms", "mds", "topups"}, optional
Optionally, include only a subset of event types (for example: ["calls", "sms"]).
If None, include all event types in the query.
subscriber_subset : dict or None, default None
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
@@ -241,6 +268,7 @@ def unique_location_counts_spec(
"start_date": start_date,
"end_date": end_date,
"aggregation_unit": aggregation_unit,
"event_types": event_types,
"subscriber_subset": subscriber_subset,
"mapping_table": mapping_table,
"geom_table": geom_table,
@@ -290,6 +318,7 @@ def subscriber_degree_spec(
start: str,
stop: str,
direction: str = "both",
event_types: Optional[List[str]] = None,
subscriber_subset: Union[dict, None] = None,
) -> dict:
"""
@@ -303,6 +332,9 @@ def subscriber_degree_spec(
ISO format date of the day _after_ the final date of the count, e.g. "2016-01-08"
direction : {"in", "out", "both"}, default "both"
Optionally, include only ingoing or outbound calls/texts. Can be one of "in", "out" or "both".
event_types : list of {"calls", "sms", "mds", "topups"}, optional
Optionally, include only a subset of event types (for example: ["calls", "sms"]).
If None, include all event types in the query.
subscriber_subset : dict or None, default None
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
@@ -318,6 +350,7 @@ def subscriber_degree_spec(
"start": start,
"stop": stop,
"direction": direction,
"event_types": event_types,
"subscriber_subset": subscriber_subset,
}

@@ -378,9 +411,9 @@ def event_count_spec(
ISO format date of the day _after_ the final date of the count, e.g. "2016-01-08"
direction : {"in", "out", "both"}, default "both"
Optionally, include only ingoing or outbound calls/texts. Can be one of "in", "out" or "both".
event_types : list of str, optional
The event types to include in the count (for example: ["calls", "sms"]).
If None, include all event types in the count.
event_types : list of {"calls", "sms", "mds", "topups"}, optional
Optionally, include only a subset of event types (for example: ["calls", "sms"]).
If None, include all event types in the query.
subscriber_subset : dict or None, default None
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
@@ -407,6 +440,7 @@ def displacement_spec(
stop: str,
statistic: str,
reference_location: Dict[str, str],
event_types: Optional[List[str]] = None,
subscriber_subset: Union[dict, None] = None,
) -> dict:
"""
@@ -420,8 +454,12 @@ def displacement_spec(
ISO format date of the day _after_ the final date of the count, e.g. "2016-01-08"
statistic : {"avg", "max", "min", "median", "mode", "stddev", "variance"}
Statistic type one of "avg", "max", "min", "median", "mode", "stddev" or "variance".
reference_location:

reference_location : dict
Query specification for the locations (daily or modal location) from which to
calculate displacement.
event_types : list of {"calls", "sms", "mds", "topups"}, optional
Optionally, include only a subset of event types (for example: ["calls", "sms"]).
If None, include all event types in the query.
subscriber_subset : dict or None, default None
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
@@ -438,6 +476,7 @@ def displacement_spec(
"stop": stop,
"statistic": statistic,
"reference_location": reference_location,
"event_types": event_types,
"subscriber_subset": subscriber_subset,
}

@@ -447,6 +486,7 @@ def pareto_interactions_spec(
start: str,
stop: str,
proportion: float,
event_types: Optional[List[str]] = None,
subscriber_subset: Union[dict, None] = None,
) -> dict:
"""
@@ -460,6 +500,9 @@ def pareto_interactions_spec(
ISO format date of the day _after_ the final date of the time interval to be considered, e.g. "2016-01-08"
proportion : float
proportion to track below
event_types : list of {"calls", "sms", "mds", "topups"}, optional
Optionally, include only a subset of event types (for example: ["calls", "sms"]).
If None, include all event types in the query.
subscriber_subset : dict or None, default None
Subset of subscribers to include in result. Must be None
(= all subscribers) or a dictionary with the specification of a
@@ -475,6 +518,7 @@ def pareto_interactions_spec(
"start": start,
"stop": stop,
"proportion": proportion,
"event_types": event_types,
"subscriber_subset": subscriber_subset,
}

@@ -484,6 +528,7 @@ def nocturnal_events_spec(
start: str,
stop: str,
hours: Tuple[int, int],
event_types: Optional[List[str]] = None,
subscriber_subset: Union[dict, None] = None,
) -> dict:
"""
@@ -497,7 +542,9 @@ def nocturnal_events_spec(
ISO format date of the day _after_ the final date for which to count nocturnal events, e.g. "2016-01-08"
hours: tuple(int,int)
Tuple defining beginning and end of night

event_types : list of {"calls", "sms", "mds", "topups"}, optional
Optionally, include only a subset of event types (for example: ["calls", "sms"]).
If None, include all event types in the query.
subscriber_subset : dict or None, default None
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
@@ -515,6 +562,7 @@ def nocturnal_events_spec(
"stop": stop,
"night_start_hour": hours[0],
"night_end_hour": hours[1],
"event_types": event_types,
"subscriber_subset": subscriber_subset,
}

@@ -525,6 +573,7 @@ def handset_spec(
end_date: str,
characteristic: str = "hnd_type",
method: str = "last",
event_types: Optional[List[str]] = None,
subscriber_subset: Union[dict, None] = None,
) -> dict:
"""
@@ -540,6 +589,9 @@ def handset_spec(
The required handset characteristic.
method: {"last", "most-common"}, default "last"
Method for choosing a handset to associate with subscriber.
event_types : list of {"calls", "sms", "mds", "topups"}, optional
Optionally, include only a subset of event types (for example: ["calls", "sms"]).
If None, include all event types in the query.
subscriber_subset : dict or None, default None
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
@@ -556,6 +608,7 @@ def handset_spec(
"end_date": end_date,
"characteristic": characteristic,
"method": method,
"event_types": event_types,
"subscriber_subset": subscriber_subset,
}

Original file line number Diff line number Diff line change
@@ -14,21 +14,28 @@
)
from . import BaseExposedQuery
from .base_schema import BaseSchema
from .custom_fields import SubscriberSubset, ISODateTime
from .custom_fields import EventTypes, SubscriberSubset, ISODateTime
from .aggregation_unit import AggregationUnitMixin

__all__ = ["ConsecutiveTripsODMatrixSchema", "ConsecutiveTripsODMatrixExposed"]


class ConsecutiveTripsODMatrixExposed(AggregationUnitMixin, BaseExposedQuery):
def __init__(
self, start_date, end_date, *, aggregation_unit, subscriber_subset=None,
self,
start_date,
end_date,
*,
aggregation_unit,
event_types,
subscriber_subset=None,
):
# Note: all input parameters need to be defined as attributes on `self`
# so that marshmallow can serialise the object correctly.
self.start_date = start_date
self.end_date = end_date
self.aggregation_unit = aggregation_unit
self.event_types = event_types
self.subscriber_subset = subscriber_subset

@property
@@ -46,6 +53,7 @@ def _flowmachine_query_obj(self):
self.start_date,
self.end_date,
spatial_unit=self.aggregation_unit,
table=self.event_types,
subscriber_subset=self.subscriber_subset,
)
)
@@ -57,6 +65,7 @@ class ConsecutiveTripsODMatrixSchema(AggregationUnitMixin, BaseSchema):
query_kind = fields.String(validate=OneOf(["consecutive_trips_od_matrix"]))
start_date = ISODateTime(required=True)
end_date = ISODateTime(required=True)
event_types = EventTypes()
subscriber_subset = SubscriberSubset()

__model__ = ConsecutiveTripsODMatrixExposed
Original file line number Diff line number Diff line change
@@ -35,7 +35,9 @@ class EventTypes(fields.List):
When deserialised, will be deduped, and prefixed with "events."
"""

def __init__(self, required=False, validate=None, **kwargs):
def __init__(
self, required=False, validate=None, allow_none=True, missing=None, **kwargs
):
if validate is not None:
raise ValueError(
"The EventTypes field provides its own validation "
@@ -46,7 +48,8 @@ def __init__(self, required=False, validate=None, **kwargs):
fields.String(validate=OneOf(["calls", "sms", "mds", "topups"])),
required=required,
validate=Length(min=1),
allow_none=True,
allow_none=allow_none,
missing=missing,
**kwargs,
)

Loading