Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing labelling disaggregation #4677

Merged
merged 23 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
112f4b5
Adding labelling disaggregation to spatial_aggregate + initial test
Thingus Dec 7, 2021
4b90c62
Merge branch 'master' into labelled_aggs_and_flows
Thingus Dec 7, 2021
ffdca93
Spatial_aggregate returns labels if used
Thingus Dec 7, 2021
5e16eb8
Merge remote-tracking branch 'origin/labelled_aggs_and_flows' into la…
Thingus Dec 7, 2021
7c6b56c
Spatial_aggregate returns labels if used
Thingus Dec 7, 2021
96fa35f
Breaking labelled spatial aggregate into its own class + permitting m…
Thingus Dec 8, 2021
0d95848
Implementing specialised redaction + test
Thingus Dec 8, 2021
66be2f0
Fixing imports
Thingus Dec 8, 2021
f289b92
Validation + test
Thingus Dec 8, 2021
cddb3ee
Docs + tests + some renaming
Thingus Dec 8, 2021
1e718bb
Headers + more docs
Thingus Dec 8, 2021
dc9007a
Changelog
Thingus Dec 8, 2021
06e809d
Update flowmachine/flowmachine/features/location/labelled_spatial_agg…
Thingus Dec 8, 2021
dad684f
Update flowmachine/flowmachine/features/location/labelled_spatial_agg…
Thingus Dec 8, 2021
fc159d4
Update flowmachine/flowmachine/features/location/labelled_spatial_agg…
Thingus Dec 8, 2021
33a59fc
Changing to left join
Thingus Dec 8, 2021
8bc8835
More changes from review
Thingus Dec 8, 2021
c758cf8
Adding spatial_unit to redacted_labelled_spatial_aggregate.py
Thingus Dec 8, 2021
2d48f14
Extra tests
Thingus Dec 8, 2021
7cad644
More extra tests
Thingus Dec 8, 2021
0958ea1
ADR
Thingus Dec 8, 2021
6cff7cc
More test
Thingus Dec 8, 2021
0262577
Merge branch 'master' into labelled_aggs_and_flows
Thingus Dec 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Added `PerSubscriberAggregate` query. [#4559](https://github.com/Flowminder/FlowKit/issues/4559)
- Added FlowETL QA checks 'count_imeis', 'count_imsis', 'count_locatable_location_ids', 'count_null_imeis', 'count_null_imsis', 'count_null_location_ids', 'max_msisdns_per_imei', 'max_msisdns_per_imsi', 'count_added_rows_outgoing', 'count_null_counterparts', 'count_null_durations', 'count_onnet_msisdns_incoming', 'count_onnet_msisdns_outgoing', 'count_onnet_msisdns', 'max_duration' and 'median_duration'. [#4552](https://github.com/Flowminder/FlowKit/issues/4552)
- Added `FilteredReferenceLocation` query, which returns only rows where a subscriber visited a reference location the required number of times. [#4584](https://github.com/Flowminder/FlowKit/issues/4584)
- Added `LabelledSpatialAggregate` query and redaction, which sub-aggregates by subscriber labels. [#4668](https://github.com/Flowminder/FlowKit/issues/4668)

### Changed
- Harmonised FlowAPI parameter names for start and end dates. They are now all `start_date` and `end_date`
Expand Down
6 changes: 3 additions & 3 deletions flowmachine/flowmachine/core/spatial_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

from flowmachine.utils import get_name_and_alias
from flowmachine.core.errors import InvalidSpatialUnitError
from . import Query, Table
from .context import get_db
from .grid import Grid
from flowmachine.core import Query, Table
from flowmachine.core.context import get_db
from flowmachine.core.grid import Grid

# TODO: Currently most spatial units require a FlowDB connection at init time.
# It would be useful to remove this requirement wherever possible, and instead
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import List

from flowmachine.core import Query
from flowmachine.core.mixins import GeoDataMixin
from flowmachine.features.location.spatial_aggregate import SpatialAggregate


class LabelledSpatialAggregate(GeoDataMixin, Query):
"""
Class representing a disaggregation of a SpatialAggregate by some label or set of labels

Parameters
----------
locations: Query
Any query with a subscriber and location columns
subscriber_labels: Query
Any query with a subscriber column
label_columns: List[str]
A list of columns in subscriber_labels to aggregate on

Example:
Thingus marked this conversation as resolved.
Show resolved Hide resolved

>>> locations = locate_subscribers(
... "2016-01-01",
... "2016-01-02",
... spatial_unit=make_spatial_unit("admin", level=3),
... method="most-common",
... )
>>> metric = SubscriberHandsetCharacteristic(
... "2016-01-01", "2016-01-02", characteristic="hnd_type"
... )
>>> labelled = LabelledSpatialAggregate(locations=locations, subscriber_labels=metric)
>>> labelled.get_dataframe()
pcod label_value value
0 524 3 08 44 Feature 36
1 524 3 08 44 Smart 28
2 524 4 12 62 Feature 44
3 524 4 12 62 Smart 19'

"""

def __init__(
self,
locations: Query,
subscriber_labels: Query,
Thingus marked this conversation as resolved.
Show resolved Hide resolved
label_columns: List[str] = ("value",),
):

if any(
label_column not in subscriber_labels.column_names
for label_column in label_columns
):
raise ValueError(
f"One of {label_columns} not a column of {subscriber_labels}"
Thingus marked this conversation as resolved.
Show resolved Hide resolved
)
if "subscriber" not in locations.column_names:
raise ValueError(f"Locations query must have a subscriber column")
if "spatial_unit" not in dir(locations):
Thingus marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(f"Locations must have a spatial_unit attribute")
if "subscriber" in label_columns:
raise ValueError(f"'subscriber' cannot be a label")

self.locations = locations
self.subscriber_labels = subscriber_labels
self.label_columns = list(label_columns)
self.spatial_unit = locations.spatial_unit
self.label_columns = label_columns
self.out_label_columns = [
f"label_{label_col}" for label_col in self.label_columns
]
super().__init__()

@property
def column_names(self) -> List[str]:
return (
list(self.spatial_unit.location_id_columns)
+ [f"label_{label}" for label in self.label_columns]
Thingus marked this conversation as resolved.
Show resolved Hide resolved
+ ["value"]
)

@property
def out_label_columns_as_string_list(self):
jc-harrison marked this conversation as resolved.
Show resolved Hide resolved
"""
Returns the label column heading as a single string
"""
return ",".join(self.out_label_columns)

def _make_query(self):

aggregate_cols = ",".join(
f"agg.{agg_col}" for agg_col in self.spatial_unit.location_id_columns
)
label_select = ",".join(
f"labels.{label_col} AS {out_label_col}"
for label_col, out_label_col in zip(
self.label_columns, self.out_label_columns
)
)
label_group = ",".join(
f"labels.{label_col}" for label_col in self.label_columns
)

sql = f"""
SELECT
{aggregate_cols}, {label_select}, count(*) AS value
FROM
({self.locations.get_query()}) AS agg
JOIN
Thingus marked this conversation as resolved.
Show resolved Hide resolved
({self.subscriber_labels.get_query()}) AS labels USING (subscriber)
GROUP BY
{aggregate_cols}, {label_group}
"""
return sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from typing import List

from flowmachine.features.location.labelled_spatial_aggregate import (
LabelledSpatialAggregate,
)
from flowmachine.core import Query
from flowmachine.core.mixins import GeoDataMixin


class RedactedLabelledSpatialAggregate(GeoDataMixin, Query):
"""
Query that drops any locations that, when disaggregated by label, reveal a number of subscribers
less than redaction_threshold
Parameters
----------
labelled_spatial_aggregate: LabelledSpatialAggregate
The LabelledSpatialAggregate query to redact
redaction_threshold: int default 15
If any labels within a location reveal less than this number of subscribers, that location is dropeed
"""

def __init__(
self,
*,
labelled_spatial_aggregate: LabelledSpatialAggregate,
redaction_threshold: int = 15,
):

Thingus marked this conversation as resolved.
Show resolved Hide resolved
self.labelled_spatial_aggregate = labelled_spatial_aggregate
self.redaction_threshold = redaction_threshold
super().__init__()

@property
def column_names(self) -> List[str]:
return self.labelled_spatial_aggregate.column_names

def _make_query(self):
jc-harrison marked this conversation as resolved.
Show resolved Hide resolved

aggs = ",".join(
self.labelled_spatial_aggregate.spatial_unit.location_id_columns
)
labels = self.labelled_spatial_aggregate.out_label_columns_as_string_list
all_cols = self.labelled_spatial_aggregate.column_names_as_string_list

sql = f"""
WITH lsa_query AS(
SELECT *
FROM({self.labelled_spatial_aggregate.get_query()}) AS t
), aggs_to_keep AS(
SELECT {aggs}
FROM lsa_query
GROUP BY {aggs}
HAVING min(value) > {self.redaction_threshold}
)
SELECT {all_cols}
FROM lsa_query INNER JOIN aggs_to_keep USING ({aggs})
"""
return sql
89 changes: 89 additions & 0 deletions flowmachine/tests/test_labelled_spatial_aggregate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import pytest

from flowmachine.core.dummy_query import DummyQuery
from flowmachine.features import TotalLocationEvents
from flowmachine.core import make_spatial_unit
from flowmachine.features import SubscriberHandsetCharacteristic
from flowmachine.features.location.labelled_spatial_aggregate import (
LabelledSpatialAggregate,
)
from flowmachine.features.subscriber.daily_location import locate_subscribers


Thingus marked this conversation as resolved.
Show resolved Hide resolved
def test_labelled_spatial_aggregate(get_dataframe):
"""
Tests disaggregation by a label query
"""
locations = locate_subscribers(
"2016-01-01",
"2016-01-02",
spatial_unit=make_spatial_unit("admin", level=3),
method="most-common",
)
metric = SubscriberHandsetCharacteristic(
"2016-01-01", "2016-01-02", characteristic="hnd_type"
)
labelled = LabelledSpatialAggregate(locations=locations, subscriber_labels=metric)
df = get_dataframe(labelled)
assert len(df) == 50
assert len(df.pcod.unique()) == 25
assert len(df.label_value.unique()) == 2


def test_label_validation():
with pytest.raises(ValueError, match="not a column of"):
locations = locate_subscribers(
"2016-01-01",
"2016-01-02",
spatial_unit=make_spatial_unit("admin", level=3),
method="most-common",
)
metric = SubscriberHandsetCharacteristic(
"2016-01-01", "2016-01-02", characteristic="hnd_type"
)
labelled = LabelledSpatialAggregate(
locations=locations, subscriber_labels=metric, label_columns=["foo", "bar"]
)


def test_loc_sub_validation():
with pytest.raises(ValueError, match="Locations query must have a subscriber"):
_ = LabelledSpatialAggregate(
locations=TotalLocationEvents("2016-01-01", "2016-01-02"),
subscriber_labels=SubscriberHandsetCharacteristic(
"2016-01-01", "2016-01-02", characteristic="hnd_type"
),
)


def test_col_sub_validation():
with pytest.raises(ValueError, match="cannot be a label"):
_ = LabelledSpatialAggregate(
locations=locate_subscribers(
"2016-01-01",
"2016-01-02",
spatial_unit=make_spatial_unit("admin", level=3),
method="most-common",
),
subscriber_labels=SubscriberHandsetCharacteristic(
"2016-01-01", "2016-01-02", characteristic="hnd_type"
),
label_columns=["subscriber"],
)


class SubHavingQuery(DummyQuery):
@property
def column_names(self):
return ["subscriber"]


def test_spatial_unit_validation():
dq = SubHavingQuery("foo")
with pytest.raises(ValueError, match="spatial_unit"):
_ = LabelledSpatialAggregate(
locations=dq,
subscriber_labels=SubscriberHandsetCharacteristic(
"2016-01-01", "2016-01-02", characteristic="hnd_type"
),
)
47 changes: 47 additions & 0 deletions flowmachine/tests/test_redacted_labelled_spatial_aggregate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import pytest

import pandas as pd
from pandas.testing import assert_frame_equal

from flowmachine.core import make_spatial_unit
from flowmachine.features import SubscriberHandsetCharacteristic
from flowmachine.features.location.labelled_spatial_aggregate import (
LabelledSpatialAggregate,
)
from flowmachine.features.location.redacted_labelled_spatial_aggregate import (
RedactedLabelledSpatialAggregate,
)
from flowmachine.features.subscriber.daily_location import locate_subscribers


def test_redaction(get_dataframe):

locations = locate_subscribers(
"2016-01-01",
"2016-01-02",
spatial_unit=make_spatial_unit("admin", level=3),
method="most-common",
)
metric = SubscriberHandsetCharacteristic(
"2016-01-01", "2016-01-02", characteristic="hnd_type"
)
labelled = LabelledSpatialAggregate(locations=locations, subscriber_labels=metric)

redacted = RedactedLabelledSpatialAggregate(labelled_spatial_aggregate=labelled)
redacted_df = get_dataframe(redacted)

target = pd.DataFrame(
[
["524 3 08 44", "Feature", 36],
["524 3 08 44", "Smart", 28],
["524 4 12 62", "Feature", 44],
["524 4 12 62", "Smart", 19],
],
columns=["pcod", "label_value", "value"],
)

assert_frame_equal(redacted_df, target)