Skip to content

Commit

Permalink
Merge branch 'master' into quickstart_update
Browse files Browse the repository at this point in the history
  • Loading branch information
Thingus authored Jan 4, 2023
2 parents 1b09c68 + c7cc1ea commit e4a907d
Show file tree
Hide file tree
Showing 12 changed files with 412 additions and 127 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

### Added
- Added views `etl.ingested_state`, `etl.available_dates` and `etl.deduped_post_etl_queries` in FlowDB, for convenient extraction of relevant information from the ETL tables. [#5641](https://github.com/Flowminder/FlowKit/issues/5641)
- Added `MajorityLocationWithUnlocatable` query class and `majority_location` function. [#5720](https://github.com/Flowminder/FlowKit/issues/5720)

### Changed
- *Important*; tokens issued by previous versions of Flowauth are not compatible with this version. Users will need to regenerate tokens using the updated Flowauth.
Expand All @@ -16,15 +17,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- RoleScopePicker component redesigned and reimplemented.
- Docs now recommend creating a separate bind mount for airflow scheduler logs, and include this in the secrets quickstart. [#3622](https://github.com/Flowminder/FlowKit/issues/3622)
- `jwt` tokens now use `sub` instead of `identity` for `JWT_IDENTITY_CLAIM`.
- A `majority_location` query with `include_unlocatable=True` will now include rows for all subscribers in the `subscriber_location_weights` sub-query, including those for whom all weights are negative (previously subscribers with only negative weights were excluded).


### Fixed
- Fixed a potential deadlock when using a small connection pool and `store`-ing queries
- AutoFlow can now be run in a docker container with non-default user. [#5574](https://github.com/Flowminder/FlowKit/issues/5574)
- Passing an empty list of events tables when creating a query now raises `ValueError: Empty tables list.` instead of a `MissingDateError`. [#436](https://github.com/Flowminder/FlowKit/issues/436)
- Flowmachine now looks at only the most recent state (per CDR type per CDR date) in `etl.etl_records` to determine available dates. [#5641](https://github.com/Flowminder/FlowKit/issues/5641)
- It is now possible to run API queries that include multiple different aggregation units (e.g. `joined_spatial_aggregate` with `displacement` metric). [#4649](https://github.com/Flowminder/FlowKit/issues/4649)

### Removed
- Removed the `include_unlocatable` parameter from `MajorityLocation` class (the `majority_location` function should be used instead if `include_unlocatable` is required). [#5720](https://github.com/Flowminder/FlowKit/issues/5720)

## [1.17.1]

Expand Down
38 changes: 30 additions & 8 deletions flowclient/flowclient/query_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,31 +845,53 @@ def random_sample_spec(


def majority_location_spec(
*, subscriber_location_weights: Dict, include_unlocatable=False
*,
subscriber_location_weights: Dict[str, Union[str, dict]],
minimum_total_weight: float = 0.0,
include_unlocatable: bool = False,
) -> dict:
"""
A class for producing a list of subscribers along with the location (derived from `spatial_unit')
that they visited more than half the time.
A query for producing a list of subscribers along with the location that they visited
more than half the time. Takes a 'subscriber location weights' query that assigns weights
to the locations visited by each subscriber (currently, only `location_visits_spec` queries are supported).
A subscriber will only be assigned a location if that location represents more than half
of the total weight for that subscriber. This means that each subscriber can be assigned at most one location.
Subscribers for whom there is no single location with an outright majority will either be excluded from the
query result (if `include_unlocatable==False`), or included in the result with `NULL` value in the location ID
column(s) (if `include_unlocatable==True`).
Parameters
----------
subscriber_location_weights: dict
A `location_visits_spec` query specification
include_unlocatable: bool default False
subscriber_location_weights : dict
A `location_visits_spec` query specification
minimum_total_weight : float, default 0
If the summed weight for a subscriber is less than `minimum_total_weight`,
that subscriber will only be assigned a location with weight greater than `minimum_total_weight/2`.
This is useful if, for example, `subscriber_location_weights` is a count of the number of days
a location was a subscriber's daily location over one week - if a subscriber was not active every day,
their total weight would be less than 7, which would lower the threshold for a majority.
Setting `minimum_total_weight=7` in this case ensures that a subscriber must have the same
daily location on a majority of _all_ days during the week, not just a majority of their _active_ days.
include_unlocatable : bool, default False
If `True`, returns every unique subscriber in the `subscriber_location_weights` query, with
the location column as `NULL` if no majority is reached.
the location column(s) as `NULL` if no majority is reached.
If `False`, returns only subscribers that have achieved a majority location
Returns
-------
dict
A dictionary of the query specification
Notes
-----
Any rows with value < 0 in the `subscriber_location_weights` query will be dropped.
This is necessary to ensure the query can return at most one location per subscriber.
"""
return {
"query_kind": "majority_location",
"subscriber_location_weights": subscriber_location_weights,
"minimum_total_weight": minimum_total_weight,
"include_unlocatable": include_unlocatable,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
BaseExposedQueryWithSampling,
)
from flowmachine.core.server.query_schemas.location_visits import LocationVisitsSchema
from flowmachine.features.subscriber.majority_location import MajorityLocation
from flowmachine.features.subscriber.majority_location import majority_location

from .one_of_query import OneOfQuerySchema

Expand All @@ -21,18 +21,25 @@ class MajorityLocationExposed(BaseExposedQueryWithSampling):
query_kind = "majority_location"

def __init__(
self, *, subscriber_location_weights, include_unlocatable, sampling=None
self,
*,
subscriber_location_weights,
minimum_total_weight,
include_unlocatable,
sampling=None
):
self.subscriber_location_weights = subscriber_location_weights
self.minimum_total_weight = minimum_total_weight
self.include_unlocatable = include_unlocatable
self.sampling = sampling
self.aggregation_unit = subscriber_location_weights.aggregation_unit

@property
def _unsampled_query_obj(self):
return MajorityLocation(
return majority_location(
subscriber_location_weights=self.subscriber_location_weights._flowmachine_query_obj,
weight_column="value",
minimum_total_weight=self.minimum_total_weight,
include_unlocatable=self.include_unlocatable,
)

Expand All @@ -47,4 +54,5 @@ class MajorityLocationSchema(BaseQueryWithSamplingSchema):
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf([__model__.query_kind]), required=True)
subscriber_location_weights = fields.Nested(WeightedLocationQueries, required=True)
include_unlocatable = fields.Boolean(missing=False)
minimum_total_weight = fields.Float(load_default=0.0)
include_unlocatable = fields.Boolean(load_default=False)
166 changes: 131 additions & 35 deletions flowmachine/flowmachine/features/subscriber/majority_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,49 @@

# -*- coding: utf-8 -*-

from typing import List
from typing import List, Union

from flowmachine.core import Query
from flowmachine.core.mixins import GeoDataMixin
from flowmachine.features.utilities.subscriber_locations import BaseLocation
from flowmachine.core.errors import InvalidSpatialUnitError


class MajorityLocation(BaseLocation, Query):
"""
A class for producing a list of subscribers along with the location (derived from `spatial_unit')
that they visited more than half the time. Takes a query that includes a 'subscribers' column,
a 'spatial_unit' attribute and a column to be used as weighting for locations (`location_count` for example)
A query for producing a list of subscribers along with the location that they visited
more than half the time. Takes a 'subscriber location weights' query that includes a 'subscribers' column,
location ID column(s) and a column to be used as weighting for locations (e.g. a `LocationVisits` query).
A subscriber will only be assigned a location if that location represents more than half
of the total weight for that subscriber. This means that each subscriber can be assigned at most one location.
Parameters
----------
subscriber_location_weights: Query
subscriber_location_weights : Query
The query object containing subscribers, locations, and weights.
weight_column: str
weight_column : str
The column, when summed, that will produce the count used to threshold the majority
include_unlocatable: bool default False
If `True`, returns every unique subscriber in the `subscriber_location_weights` query, with
the location column as `NULL` if no majority is reached.
If `False`, returns only subscribers that have achieved a majority location
minimum_total_weight : float, default 0
If the summed weight for a subscriber is less than `minimum_total_weight`,
that subscriber will only be assigned a location with weight greater than `minimum_total_weight/2`.
This is useful if, for example, `subscriber_location_weights` is a count of the number of days
a location was a subscriber's daily location over one week - if a subscriber was not active every day,
their total weight would be less than 7, which would lower the threshold for a majority.
Setting `minimum_total_weight=7` in this case ensures that a subscriber must have the same
daily location on a majority of _all_ days during the week, not just a majority of their _active_ days.
Notes
-----
Any rows where weight < 0 will be dropped
Any rows where weight < 0 in the `subscriber_location_weights` query will be dropped.
This is necessary to ensure the query can return at most one location per subscriber.
"""

def __init__(
self,
*,
subscriber_location_weights: Query,
weight_column: str,
include_unlocatable: bool = False,
minimum_total_weight: float = 0.0,
):
if "subscriber" not in subscriber_location_weights.column_names:
raise ValueError("`subscriber` not in subscriber_location_weights query")
Expand All @@ -49,10 +56,12 @@ def __init__(
raise InvalidSpatialUnitError(
"subscriber_location_weights query needs a spatial_unit attribute"
)
if minimum_total_weight < 0:
raise ValueError("minimum_total_weight cannot be negative")

self.subscriber_location_weights = subscriber_location_weights
self.weight_column = weight_column
self.include_unlocatable = include_unlocatable
self.minimum_total_weight = minimum_total_weight
self.spatial_unit = subscriber_location_weights.spatial_unit
super().__init__()

Expand All @@ -63,26 +72,113 @@ def column_names(self) -> List[str]:
def _make_query(self):
loc_id_columns_string = ",".join(self.spatial_unit.location_id_columns)
sql = f"""
WITH subscriber_subset AS (
{self.subscriber_location_weights.get_query()}
), summed_weights AS (
SELECT subscriber, sum({self.weight_column}) AS total_weight
FROM subscriber_subset
WHERE {self.weight_column} >= 0
GROUP BY subscriber
), seen_subs AS (
SELECT subscriber, {loc_id_columns_string}
FROM summed_weights JOIN subscriber_subset USING(subscriber)
WHERE {self.weight_column} > total_weight/2.0
)
"""

if self.include_unlocatable:
sql += f"""
SELECT subscriber, seen_subs.{loc_id_columns_string}
FROM seen_subs RIGHT OUTER JOIN summed_weights USING(subscriber)
"""
else:
sql += f"""SELECT subscriber, {loc_id_columns_string} FROM seen_subs"""
WITH summed_weights AS (
SELECT subscriber,
greatest(sum({self.weight_column}), {self.minimum_total_weight}) AS total_weight
FROM ({self.subscriber_location_weights.get_query()}) subscriber_location_weights
WHERE {self.weight_column} >= 0
GROUP BY subscriber
)
SELECT subscriber, {loc_id_columns_string}
FROM summed_weights
INNER JOIN ({self.subscriber_location_weights.get_query()}) subscriber_location_weights
USING (subscriber)
WHERE {self.weight_column} > total_weight/2.0
"""

return sql


class MajorityLocationWithUnlocatable(BaseLocation, Query):
"""
A query for producing a list of subscribers along with the location that they visited
more than half the time. Similar to MajorityLocation, except that subscribers with
no majority location will be included in the query result (with `NULL` location).
Parameters
----------
majority_location : MajorityLocation
MajorityLocation query whose result will be augmented with unlocatable subscribers
"""

def __init__(self, *, majority_location: MajorityLocation):
self.majority_location = majority_location
self.spatial_unit = majority_location.spatial_unit
super().__init__()

@property
def column_names(self) -> List[str]:
return self.majority_location.column_names

def _make_query(self):
columns_string = ",".join(self.majority_location.column_names)
sql = f"""
WITH all_subscribers AS (
SELECT subscriber
FROM ({self.majority_location.subscriber_location_weights.get_query()}) subscriber_location_weights
GROUP BY subscriber
)
SELECT {columns_string}
FROM ({self.majority_location.get_query()}) majority_locations
RIGHT JOIN all_subscribers
USING (subscriber)
"""
return sql


def majority_location(
*,
subscriber_location_weights: Query,
weight_column: str,
minimum_total_weight: float = 0.0,
include_unlocatable: bool = False,
) -> Union[MajorityLocation, MajorityLocationWithUnlocatable]:
"""
A query for producing a list of subscribers along with the location that they visited
more than half the time. Takes a 'subscriber location weights' query that includes a 'subscribers' column,
location ID column(s) and a column to be used as weighting for locations (e.g. a `LocationVisits` query).
A subscriber will only be assigned a location if that location represents more than half
of the total weight for that subscriber. This means that each subscriber can be assigned at most one location.
Subscribers for whom there is no single location with an outright majority will either be excluded from the
query result (if `include_unlocatable==False`), or included in the result with `NULL` value in the location ID
column(s) (if `include_unlocatable==True`).
Parameters
----------
subscriber_location_weights : Query
The query object containing subscribers, locations, and weights.
weight_column : str
The column in `subscriber_location_weights`, when summed, that will produce the count used to threshold the majority
minimum_total_weight : float, default 0
If the summed weight for a subscriber is less than `minimum_total_weight`,
that subscriber will only be assigned a location with weight greater than `minimum_total_weight/2`.
This is useful if, for example, `subscriber_location_weights` is a count of the number of days
a location was a subscriber's daily location over one week - if a subscriber was not active every day,
their total weight would be less than 7, which would lower the threshold for a majority.
Setting `minimum_total_weight=7` in this case ensures that a subscriber must have the same
daily location on a majority of _all_ days during the week, not just a majority of their _active_ days.
include_unlocatable : bool, default False
If `True`, returns every unique subscriber in the `subscriber_location_weights` query, with
the location column(s) as `NULL` if no majority is reached.
If `False`, returns only subscribers that have achieved a majority location
Returns
-------
MajorityLocation or MajorityLocationWithUnlocatable
Majority location query object
Notes
-----
Any rows where weight < 0 in the `subscriber_location_weights` query will be dropped.
This is necessary to ensure the query can return at most one location per subscriber.
"""
ml = MajorityLocation(
subscriber_location_weights=subscriber_location_weights,
weight_column=weight_column,
minimum_total_weight=minimum_total_weight,
)
if include_unlocatable:
return MajorityLocationWithUnlocatable(majority_location=ml)
else:
return ml
Loading

0 comments on commit e4a907d

Please sign in to comment.