diff --git a/CHANGELOG.md b/CHANGELOG.md index 27dc355a5d..8f17bc9e83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Added `PerSubscriberAggregate` query. [#4559](https://github.com/Flowminder/FlowKit/issues/4559) - Added FlowETL QA checks 'count_imeis', 'count_imsis', 'count_locatable_location_ids', 'count_null_imeis', 'count_null_imsis', 'count_null_location_ids', 'max_msisdns_per_imei', 'max_msisdns_per_imsi', 'count_added_rows_outgoing', 'count_null_counterparts', 'count_null_durations', 'count_onnet_msisdns_incoming', 'count_onnet_msisdns_outgoing', 'count_onnet_msisdns', 'max_duration' and 'median_duration'. [#4552](https://github.com/Flowminder/FlowKit/issues/4552) - Added `FilteredReferenceLocation` query, which returns only rows where a subscriber visited a reference location the required number of times. [#4584](https://github.com/Flowminder/FlowKit/issues/4584) +- Added `LabelledSpatialAggregate` query and redaction, which sub-aggregates by subscriber labels. [#4668](https://github.com/Flowminder/FlowKit/issues/4668) ### Changed - Harmonised FlowAPI parameter names for start and end dates. They are now all `start_date` and `end_date` diff --git a/docs/source/developer/adr/0011-redaction-strategy-for-labelled-aggregates.md b/docs/source/developer/adr/0011-redaction-strategy-for-labelled-aggregates.md new file mode 100644 index 0000000000..4807f27b40 --- /dev/null +++ b/docs/source/developer/adr/0011-redaction-strategy-for-labelled-aggregates.md @@ -0,0 +1,25 @@ +# Redaction strategy for labelled aggregation + +Date: 08-12-2021 + +# Status + +Proposed + +## Context + +At present, any rows in a spatial query are dropped if they return an aggregate of 15 subscribers or less. With new +labelling disaggregation queries being added to Flowkit, there is an increased risk of deanonymization attacks +using the increased resolution of information - we need to consider further redaction strategies to mitigate this. + +## Decision + +For any labelled spatial aggregate queries, we drop any aggregation zone that contains any disaggregation less than 15 +(for consistency with the rest of the dissagregation strategy). + +## Consequences + +This disaggregation strategy could reveal that a given zone contains less than 15 of _something_, by comparing a +disaggregated query to it's non-disaggregated counterpart. +This could lead to zones being dropped prematurely in disaggregations if a label choice leads to a small number of +subscribers appearing in a corner-case disaggregation diff --git a/flowmachine/flowmachine/core/spatial_unit.py b/flowmachine/flowmachine/core/spatial_unit.py index 3ab85fc1d8..c4a2e11b24 100644 --- a/flowmachine/flowmachine/core/spatial_unit.py +++ b/flowmachine/flowmachine/core/spatial_unit.py @@ -12,9 +12,9 @@ from flowmachine.utils import get_name_and_alias from flowmachine.core.errors import InvalidSpatialUnitError -from . import Query, Table -from .context import get_db -from .grid import Grid +from flowmachine.core import Query, Table +from flowmachine.core.context import get_db +from flowmachine.core.grid import Grid # TODO: Currently most spatial units require a FlowDB connection at init time. # It would be useful to remove this requirement wherever possible, and instead diff --git a/flowmachine/flowmachine/features/location/labelled_spatial_aggregate.py b/flowmachine/flowmachine/features/location/labelled_spatial_aggregate.py new file mode 100644 index 0000000000..2c3fa7c858 --- /dev/null +++ b/flowmachine/flowmachine/features/location/labelled_spatial_aggregate.py @@ -0,0 +1,113 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +from typing import List + +from flowmachine.core import Query +from flowmachine.core.mixins import GeoDataMixin +from flowmachine.features.location.spatial_aggregate import SpatialAggregate + + +class LabelledSpatialAggregate(GeoDataMixin, Query): + """ + Class representing a disaggregation of a SpatialAggregate by some label or set of labels + + Parameters + ---------- + locations: Query + Any query with a subscriber and location columns + subscriber_labels: Query + Any query with a subscriber column + label_columns: List[str] + A list of columns in subscriber_labels to aggregate on + + Examples + -------- + + >>> locations = locate_subscribers( + ... "2016-01-01", + ... "2016-01-02", + ... spatial_unit=make_spatial_unit("admin", level=3), + ... method="most-common", + ... ) + >>> metric = SubscriberHandsetCharacteristic( + ... "2016-01-01", "2016-01-02", characteristic="hnd_type" + ... ) + >>> labelled = LabelledSpatialAggregate(locations=locations, subscriber_labels=metric) + >>> labelled.get_dataframe() + pcod label_value value + 0 524 3 08 44 Feature 36 + 1 524 3 08 44 Smart 28 + 2 524 4 12 62 Feature 44 + 3 524 4 12 62 Smart 19' + + """ + + def __init__( + self, + locations: Query, + subscriber_labels: Query, + label_columns: List[str] = ("value",), + ): + + for label_column in label_columns: + if label_column not in subscriber_labels.column_names: + raise ValueError(f"{label_column} not a column of {subscriber_labels}") + if "subscriber" not in locations.column_names: + raise ValueError(f"Locations query must have a subscriber column") + if not hasattr(locations, "spatial_unit"): + raise ValueError(f"Locations must have a spatial_unit attribute") + if "subscriber" in label_columns: + raise ValueError(f"'subscriber' cannot be a label") + + self.locations = locations + self.subscriber_labels = subscriber_labels + self.label_columns = list(label_columns) + self.spatial_unit = locations.spatial_unit + self.label_columns = label_columns + self.out_label_columns = [ + f"label_{label_col}" for label_col in self.label_columns + ] + super().__init__() + + @property + def column_names(self) -> List[str]: + return ( + list(self.spatial_unit.location_id_columns) + + self.out_label_columns + + ["value"] + ) + + @property + def out_label_columns_as_string_list(self): + """ + Returns the label column heading as a single string + """ + return ",".join(self.out_label_columns) + + def _make_query(self): + + aggregate_cols = ",".join( + f"agg.{agg_col}" for agg_col in self.spatial_unit.location_id_columns + ) + label_select = ",".join( + f"labels.{label_col} AS {out_label_col}" + for label_col, out_label_col in zip( + self.label_columns, self.out_label_columns + ) + ) + label_group = ",".join( + f"labels.{label_col}" for label_col in self.label_columns + ) + + sql = f""" + SELECT + {aggregate_cols}, {label_select}, count(*) AS value + FROM + ({self.locations.get_query()}) AS agg + LEFT JOIN + ({self.subscriber_labels.get_query()}) AS labels USING (subscriber) + GROUP BY + {aggregate_cols}, {label_group} + """ + return sql diff --git a/flowmachine/flowmachine/features/location/redacted_labelled_spatial_aggregate.py b/flowmachine/flowmachine/features/location/redacted_labelled_spatial_aggregate.py new file mode 100644 index 0000000000..b0ddd058ff --- /dev/null +++ b/flowmachine/flowmachine/features/location/redacted_labelled_spatial_aggregate.py @@ -0,0 +1,62 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from typing import List + +from flowmachine.features.location.labelled_spatial_aggregate import ( + LabelledSpatialAggregate, +) +from flowmachine.core import Query +from flowmachine.core.mixins import GeoDataMixin + + +class RedactedLabelledSpatialAggregate(GeoDataMixin, Query): + """ + Query that drops any locations that, when disaggregated by label, reveal a number of subscribers + less than redaction_threshold + Parameters + ---------- + labelled_spatial_aggregate: LabelledSpatialAggregate + The LabelledSpatialAggregate query to redact + redaction_threshold: int default 15 + If any labels within a location reveal less than this number of subscribers, that location is dropped + """ + + def __init__( + self, + *, + labelled_spatial_aggregate: LabelledSpatialAggregate, + redaction_threshold: int = 15, + ): + + self.labelled_spatial_aggregate = labelled_spatial_aggregate + self.redaction_threshold = redaction_threshold + self.spatial_unit = labelled_spatial_aggregate.spatial_unit + super().__init__() + + @property + def column_names(self) -> List[str]: + return self.labelled_spatial_aggregate.column_names + + def _make_query(self): + + aggs = ",".join( + self.labelled_spatial_aggregate.spatial_unit.location_id_columns + ) + all_cols = self.labelled_spatial_aggregate.column_names_as_string_list + + sql = f""" + WITH lsa_query AS( + SELECT * + FROM({self.labelled_spatial_aggregate.get_query()}) AS t + ), aggs_to_keep AS( + SELECT {aggs} + FROM lsa_query + GROUP BY {aggs} + HAVING min(value) > {self.redaction_threshold} + ) + SELECT {all_cols} + FROM lsa_query INNER JOIN aggs_to_keep USING ({aggs}) + """ + return sql diff --git a/flowmachine/tests/test_labelled_spatial_aggregate.py b/flowmachine/tests/test_labelled_spatial_aggregate.py new file mode 100644 index 0000000000..1a5a062b31 --- /dev/null +++ b/flowmachine/tests/test_labelled_spatial_aggregate.py @@ -0,0 +1,131 @@ +import pytest + +from pandas.testing import assert_series_equal + +from flowmachine.features.location.spatial_aggregate import SpatialAggregate +from flowmachine.core.dummy_query import DummyQuery +from flowmachine.features import TotalLocationEvents +from flowmachine.core import make_spatial_unit +from flowmachine.features import SubscriberHandsetCharacteristic +from flowmachine.features.location.labelled_spatial_aggregate import ( + LabelledSpatialAggregate, +) +from flowmachine.features.subscriber.daily_location import locate_subscribers + + +def test_one_label(get_dataframe): + """ + Tests disaggregation by a label query + """ + locations = locate_subscribers( + "2016-01-01", + "2016-01-02", + spatial_unit=make_spatial_unit("admin", level=3), + method="most-common", + ) + metric = SubscriberHandsetCharacteristic( + "2016-01-01", "2016-01-02", characteristic="hnd_type" + ) + labelled = LabelledSpatialAggregate(locations=locations, subscriber_labels=metric) + df = get_dataframe(labelled) + assert len(df) == 50 + assert len(df.pcod.unique()) == 25 + assert len(df.label_value.unique()) == 2 + # Total number of values should equal the initial number of subscribers + assert df.value.sum() == len(get_dataframe(locations)) + assert ( + df.groupby("pcod").value.sum().tolist() + == get_dataframe(SpatialAggregate(locations=locations)).value.tolist() + ) + + +def test_multiple_labels(get_dataframe): + """ + Tests disaggregation by multiple labels + """ + locations = locate_subscribers( + "2016-01-01", + "2016-01-02", + spatial_unit=make_spatial_unit("admin", level=3), + method="most-common", + ) + metric = SubscriberHandsetCharacteristic( + "2016-01-01", "2016-01-02", characteristic="hnd_type" + ).join( + SubscriberHandsetCharacteristic( + "2016-01-01", "2016-01-02", characteristic="model" + ), + on_left="subscriber", + left_append="_hnd_type", + right_append="_model", + ) + labelled = LabelledSpatialAggregate( + locations=locations, + subscriber_labels=metric, + label_columns=["value_hnd_type", "value_model"], + ) + df = get_dataframe(labelled) + assert all( + df.columns == ["pcod", "label_value_hnd_type", "label_value_model", "value"] + ) + assert all(df.columns == labelled.column_names) + assert len(df) > 300 + + +def test_label_validation(): + with pytest.raises(ValueError, match="not a column of"): + locations = locate_subscribers( + "2016-01-01", + "2016-01-02", + spatial_unit=make_spatial_unit("admin", level=3), + method="most-common", + ) + metric = SubscriberHandsetCharacteristic( + "2016-01-01", "2016-01-02", characteristic="hnd_type" + ) + labelled = LabelledSpatialAggregate( + locations=locations, subscriber_labels=metric, label_columns=["foo", "bar"] + ) + + +def test_loc_sub_validation(): + with pytest.raises(ValueError, match="Locations query must have a subscriber"): + _ = LabelledSpatialAggregate( + locations=TotalLocationEvents("2016-01-01", "2016-01-02"), + subscriber_labels=SubscriberHandsetCharacteristic( + "2016-01-01", "2016-01-02", characteristic="hnd_type" + ), + ) + + +def test_col_sub_validation(): + with pytest.raises(ValueError, match="cannot be a label"): + _ = LabelledSpatialAggregate( + locations=locate_subscribers( + "2016-01-01", + "2016-01-02", + spatial_unit=make_spatial_unit("admin", level=3), + method="most-common", + ), + subscriber_labels=SubscriberHandsetCharacteristic( + "2016-01-01", "2016-01-02", characteristic="hnd_type" + ), + label_columns=["subscriber"], + ) + + +class SubHavingQuery(DummyQuery): + @property + def column_names(self): + return ["subscriber"] + + +def test_spatial_unit_validation(): + dq = SubHavingQuery("foo") + with pytest.raises(ValueError, match="spatial_unit"): + _ = LabelledSpatialAggregate( + locations=dq, + subscriber_labels=SubscriberHandsetCharacteristic( + "2016-01-01", "2016-01-02", characteristic="hnd_type" + ), + ) diff --git a/flowmachine/tests/test_redacted_labelled_spatial_aggregate.py b/flowmachine/tests/test_redacted_labelled_spatial_aggregate.py new file mode 100644 index 0000000000..fb10b40d06 --- /dev/null +++ b/flowmachine/tests/test_redacted_labelled_spatial_aggregate.py @@ -0,0 +1,47 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import pytest + +import pandas as pd +from pandas.testing import assert_frame_equal + +from flowmachine.core import make_spatial_unit +from flowmachine.features import SubscriberHandsetCharacteristic +from flowmachine.features.location.labelled_spatial_aggregate import ( + LabelledSpatialAggregate, +) +from flowmachine.features.location.redacted_labelled_spatial_aggregate import ( + RedactedLabelledSpatialAggregate, +) +from flowmachine.features.subscriber.daily_location import locate_subscribers + + +def test_redaction(get_dataframe): + + locations = locate_subscribers( + "2016-01-01", + "2016-01-02", + spatial_unit=make_spatial_unit("admin", level=3), + method="most-common", + ) + metric = SubscriberHandsetCharacteristic( + "2016-01-01", "2016-01-02", characteristic="hnd_type" + ) + labelled = LabelledSpatialAggregate(locations=locations, subscriber_labels=metric) + + redacted = RedactedLabelledSpatialAggregate(labelled_spatial_aggregate=labelled) + redacted_df = get_dataframe(redacted) + + target = pd.DataFrame( + [ + ["524 3 08 44", "Feature", 36], + ["524 3 08 44", "Smart", 28], + ["524 4 12 62", "Feature", 44], + ["524 4 12 62", "Smart", 19], + ], + columns=["pcod", "label_value", "value"], + ) + + assert_frame_equal(redacted_df, target)