From 6ee584d861c79c37db3fa3a2670d4a0972920086 Mon Sep 17 00:00:00 2001 From: john Date: Mon, 22 Nov 2021 16:17:03 +0000 Subject: [PATCH 1/8] Exposing PerSubscriberAggregate --- .../query_schemas/per_subscriber_aggregate.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py diff --git a/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py b/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py new file mode 100644 index 0000000000..71a5305b34 --- /dev/null +++ b/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py @@ -0,0 +1,33 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from marshmallow import Schema, fields +from marshmallow.validate import OneOf + +from flowmachine.core.server.query_schemas import BaseExposedQuery +from flowmachine.core.server.query_schemas.subscriber_subset import SubscriberSubset +from flowmachine.core.server.query_schemas.base_schema import BaseSchema +from flowmachine.features.subscriber.per_subscriber_aggregate import ( + PerSubscriberAggregate, + agg_methods, +) + + +class PerSubscriberAggregateExposed(BaseExposedQuery): + def __init__(self, subscriber_query, agg_method): + self.subscriber_query = subscriber_query + self.agg_method = agg_method + + def _flomachine_query_obj(self): + return PerSubscriberAggregate( + subscriber_query=self.subscriber_query, + agg_column="value", + agg_method=self.agg_method, + ) + + +class PerSubscriberAggregateSchema(BaseSchema): + query_kind = fields.String(validate=OneOf(["per_subscriber_aggregate"])) + subscriber_query = fields.Nested(SubscriberSubset) + agg_method = fields.String(validate=OneOf(agg_methods)) From a568b85f05e8b2412cc512f497c2a9a5782cd994 Mon Sep 17 00:00:00 2001 From: john Date: Mon, 22 Nov 2021 16:33:43 +0000 Subject: [PATCH 2/8] Adding to flowclient --- flowclient/flowclient/query_specs.py | 22 +++++++++++++++++++ .../query_schemas/per_subscriber_aggregate.py | 4 +++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/flowclient/flowclient/query_specs.py b/flowclient/flowclient/query_specs.py index bfcb47ba95..1dbff591d9 100644 --- a/flowclient/flowclient/query_specs.py +++ b/flowclient/flowclient/query_specs.py @@ -840,3 +840,25 @@ def random_sample_spec( ) sampled_query["sampling"] = sampling return sampled_query + + +def per_subscriber_aggregate_spec(*, subscriber_query: Dict, agg_method: str): + """ + Query that performs per-subscriber aggregation of a column. Returns a column + 'subscriber' containing unique subscribers and a column 'value' containing the + aggregration. + + Parameters + ---------- + subscriber_query: SubscriberFeature + A query with a `subscriber` column + agg_column: str + The name of the column in `subscriber_query` to aggregate. Cannot be 'subscriber'. + agg_method: {"count", "sum", "avg", "max", "min", "median", "stddev", "variance"} default "avg" + The method of aggregation to perform + """ + return { + "query_kind": "per_subscriber_aggregate", + "subscriber_subset": subscriber_query, + "agg_method": agg_method, + } diff --git a/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py b/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py index 71a5305b34..71a0e82171 100644 --- a/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py +++ b/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py @@ -2,7 +2,7 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. -from marshmallow import Schema, fields +from marshmallow import fields from marshmallow.validate import OneOf from flowmachine.core.server.query_schemas import BaseExposedQuery @@ -31,3 +31,5 @@ class PerSubscriberAggregateSchema(BaseSchema): query_kind = fields.String(validate=OneOf(["per_subscriber_aggregate"])) subscriber_query = fields.Nested(SubscriberSubset) agg_method = fields.String(validate=OneOf(agg_methods)) + + __model__ = PerSubscriberAggregateExposed From 916789d9cc216d6fee5a00df2529bd1e5032201c Mon Sep 17 00:00:00 2001 From: john Date: Mon, 22 Nov 2021 16:49:10 +0000 Subject: [PATCH 3/8] Moving to aggregates --- flowclient/flowclient/aggregates.py | 22 ++++++++++++++++++++++ flowclient/flowclient/query_specs.py | 22 ---------------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/flowclient/flowclient/aggregates.py b/flowclient/flowclient/aggregates.py index 1a91568ac9..58a48ef409 100644 --- a/flowclient/flowclient/aggregates.py +++ b/flowclient/flowclient/aggregates.py @@ -1544,3 +1544,25 @@ def histogram_aggregate(*, connection: Connection, **kwargs) -> APIQuery: Histogram aggregate query """ return connection.make_api_query(parameters=histogram_aggregate_spec(**kwargs)) + + +def per_subscriber_aggregate_spec(*, subscriber_query: Dict, agg_method: str): + """ + Query that performs per-subscriber aggregation of a column. Returns a column + 'subscriber' containing unique subscribers and a column 'value' containing the + aggregration. + + Parameters + ---------- + subscriber_query: SubscriberFeature + A query with a `subscriber` column + agg_column: str + The name of the column in `subscriber_query` to aggregate. Cannot be 'subscriber'. + agg_method: {"count", "sum", "avg", "max", "min", "median", "stddev", "variance"} default "avg" + The method of aggregation to perform + """ + return { + "query_kind": "per_subscriber_aggregate", + "subscriber_subset": subscriber_query, + "agg_method": agg_method, + } diff --git a/flowclient/flowclient/query_specs.py b/flowclient/flowclient/query_specs.py index 1dbff591d9..bfcb47ba95 100644 --- a/flowclient/flowclient/query_specs.py +++ b/flowclient/flowclient/query_specs.py @@ -840,25 +840,3 @@ def random_sample_spec( ) sampled_query["sampling"] = sampling return sampled_query - - -def per_subscriber_aggregate_spec(*, subscriber_query: Dict, agg_method: str): - """ - Query that performs per-subscriber aggregation of a column. Returns a column - 'subscriber' containing unique subscribers and a column 'value' containing the - aggregration. - - Parameters - ---------- - subscriber_query: SubscriberFeature - A query with a `subscriber` column - agg_column: str - The name of the column in `subscriber_query` to aggregate. Cannot be 'subscriber'. - agg_method: {"count", "sum", "avg", "max", "min", "median", "stddev", "variance"} default "avg" - The method of aggregation to perform - """ - return { - "query_kind": "per_subscriber_aggregate", - "subscriber_subset": subscriber_query, - "agg_method": agg_method, - } From e5dcba390c844058f78fc6fe94a6eb400ee98930 Mon Sep 17 00:00:00 2001 From: john Date: Mon, 22 Nov 2021 17:04:57 +0000 Subject: [PATCH 4/8] Moving to aggregates --- .../core/server/query_schemas/per_subscriber_aggregate.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py b/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py index 71a0e82171..4fc3f10efc 100644 --- a/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py +++ b/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py @@ -1,6 +1,7 @@ # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +from functools import reduce from marshmallow import fields from marshmallow.validate import OneOf @@ -15,8 +16,8 @@ class PerSubscriberAggregateExposed(BaseExposedQuery): - def __init__(self, subscriber_query, agg_method): - self.subscriber_query = subscriber_query + def __init__(self, subscriber_queries, agg_method): + self.subscriber_query = reduce(lambda x, y: x.join(y), subscriber_queries) self.agg_method = agg_method def _flomachine_query_obj(self): @@ -29,7 +30,8 @@ def _flomachine_query_obj(self): class PerSubscriberAggregateSchema(BaseSchema): query_kind = fields.String(validate=OneOf(["per_subscriber_aggregate"])) - subscriber_query = fields.Nested(SubscriberSubset) + # TODO: Can we make this a list or a single query? + subscriber_query = fields.List(SubscriberSubset) agg_method = fields.String(validate=OneOf(agg_methods)) __model__ = PerSubscriberAggregateExposed From 858f53d1b6629c3085888f54d513e8e3d8807ea7 Mon Sep 17 00:00:00 2001 From: john Date: Tue, 23 Nov 2021 11:06:29 +0000 Subject: [PATCH 5/8] in_progress --- flowclient/flowclient/__init__.py | 2 ++ flowclient/flowclient/aggregates.py | 18 ++++++++++++++++++ .../tests/query_tests/test_queries.py | 3 +++ 3 files changed, 23 insertions(+) diff --git a/flowclient/flowclient/__init__.py b/flowclient/flowclient/__init__.py index 55b61c57bf..b92a7468fa 100644 --- a/flowclient/flowclient/__init__.py +++ b/flowclient/flowclient/__init__.py @@ -67,6 +67,7 @@ unmoving_counts, consecutive_trips_od_matrix, trips_od_matrix, + per_subscriber_aggregate, ) __all__ = [ @@ -101,4 +102,5 @@ "unmoving_counts", "consecutive_trips_od_matrix", "trips_od_matrix", + "per_subscriber_aggregate", ] diff --git a/flowclient/flowclient/aggregates.py b/flowclient/flowclient/aggregates.py index 58a48ef409..dc9240396d 100644 --- a/flowclient/flowclient/aggregates.py +++ b/flowclient/flowclient/aggregates.py @@ -1566,3 +1566,21 @@ def per_subscriber_aggregate_spec(*, subscriber_query: Dict, agg_method: str): "subscriber_subset": subscriber_query, "agg_method": agg_method, } + + +def per_subscriber_aggregate(*, connection: Connection, **kwargs) -> APIQuery: + """ + Query that performs per-subscriber aggregation of a column. Returns a column + 'subscriber' containing unique subscribers and a column 'value' containing the + aggregration. + + Parameters + ---------- + subscriber_query: SubscriberFeature + A query with a `subscriber` column + agg_column: str + The name of the column in `subscriber_query` to aggregate. Cannot be 'subscriber'. + agg_method: {"count", "sum", "avg", "max", "min", "median", "stddev", "variance"} default "avg" + The method of aggregation to perform + """ + return connection.make_api_query(parameters=per_subscriber_aggregate_spec(**kwargs)) diff --git a/integration_tests/tests/query_tests/test_queries.py b/integration_tests/tests/query_tests/test_queries.py index c35b1b5fcb..c55ba62389 100644 --- a/integration_tests/tests/query_tests/test_queries.py +++ b/integration_tests/tests/query_tests/test_queries.py @@ -706,6 +706,9 @@ aggregation_unit="admin3", event_types=["calls", "sms"], ), + partial( + flowclient.per_subscriber_aggregate, + ), ] From ef90f5f5c9130e19b9d70d2b95696665fe0e701d Mon Sep 17 00:00:00 2001 From: Thingus Date: Tue, 23 Nov 2021 13:57:13 +0000 Subject: [PATCH 6/8] Update flowclient/flowclient/aggregates.py Co-authored-by: James Harrison --- flowclient/flowclient/aggregates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowclient/flowclient/aggregates.py b/flowclient/flowclient/aggregates.py index 58a48ef409..3931ec1d06 100644 --- a/flowclient/flowclient/aggregates.py +++ b/flowclient/flowclient/aggregates.py @@ -1563,6 +1563,6 @@ def per_subscriber_aggregate_spec(*, subscriber_query: Dict, agg_method: str): """ return { "query_kind": "per_subscriber_aggregate", - "subscriber_subset": subscriber_query, + "subscriber_query": subscriber_query, "agg_method": agg_method, } From 0ae659e2e3acb2b500bc26e952de8301b175505d Mon Sep 17 00:00:00 2001 From: john Date: Tue, 23 Nov 2021 16:19:58 +0000 Subject: [PATCH 7/8] Refactoring HistogramMetics to NumericSubscriberMetricsSchema + other PR comments --- flowclient/flowclient/__init__.py | 3 +- flowclient/flowclient/aggregates.py | 40 ------------------- flowclient/flowclient/query_specs.py | 20 ++++++++++ .../query_schemas/histogram_aggregate.py | 38 +----------------- .../numeric_subscriber_metrics.py | 38 ++++++++++++++++++ .../query_schemas/per_subscriber_aggregate.py | 18 ++++++--- .../tests/query_tests/test_queries.py | 15 ++++++- 7 files changed, 88 insertions(+), 84 deletions(-) create mode 100644 flowmachine/flowmachine/core/server/query_schemas/numeric_subscriber_metrics.py diff --git a/flowclient/flowclient/__init__.py b/flowclient/flowclient/__init__.py index b92a7468fa..369cbfb120 100644 --- a/flowclient/flowclient/__init__.py +++ b/flowclient/flowclient/__init__.py @@ -47,6 +47,7 @@ unique_locations_spec, most_frequent_location_spec, total_active_periods_spec, + per_subscriber_aggregate_spec, ) from . import aggregates from .aggregates import ( @@ -67,7 +68,6 @@ unmoving_counts, consecutive_trips_od_matrix, trips_od_matrix, - per_subscriber_aggregate, ) __all__ = [ @@ -102,5 +102,4 @@ "unmoving_counts", "consecutive_trips_od_matrix", "trips_od_matrix", - "per_subscriber_aggregate", ] diff --git a/flowclient/flowclient/aggregates.py b/flowclient/flowclient/aggregates.py index c466abb578..1a91568ac9 100644 --- a/flowclient/flowclient/aggregates.py +++ b/flowclient/flowclient/aggregates.py @@ -1544,43 +1544,3 @@ def histogram_aggregate(*, connection: Connection, **kwargs) -> APIQuery: Histogram aggregate query """ return connection.make_api_query(parameters=histogram_aggregate_spec(**kwargs)) - - -def per_subscriber_aggregate_spec(*, subscriber_query: Dict, agg_method: str): - """ - Query that performs per-subscriber aggregation of a column. Returns a column - 'subscriber' containing unique subscribers and a column 'value' containing the - aggregration. - - Parameters - ---------- - subscriber_query: SubscriberFeature - A query with a `subscriber` column - agg_column: str - The name of the column in `subscriber_query` to aggregate. Cannot be 'subscriber'. - agg_method: {"count", "sum", "avg", "max", "min", "median", "stddev", "variance"} default "avg" - The method of aggregation to perform - """ - return { - "query_kind": "per_subscriber_aggregate", - "subscriber_query": subscriber_query, - "agg_method": agg_method, - } - - -def per_subscriber_aggregate(*, connection: Connection, **kwargs) -> APIQuery: - """ - Query that performs per-subscriber aggregation of a column. Returns a column - 'subscriber' containing unique subscribers and a column 'value' containing the - aggregration. - - Parameters - ---------- - subscriber_query: SubscriberFeature - A query with a `subscriber` column - agg_column: str - The name of the column in `subscriber_query` to aggregate. Cannot be 'subscriber'. - agg_method: {"count", "sum", "avg", "max", "min", "median", "stddev", "variance"} default "avg" - The method of aggregation to perform - """ - return connection.make_api_query(parameters=per_subscriber_aggregate_spec(**kwargs)) diff --git a/flowclient/flowclient/query_specs.py b/flowclient/flowclient/query_specs.py index bfcb47ba95..c825afbf9a 100644 --- a/flowclient/flowclient/query_specs.py +++ b/flowclient/flowclient/query_specs.py @@ -840,3 +840,23 @@ def random_sample_spec( ) sampled_query["sampling"] = sampling return sampled_query + + +def per_subscriber_aggregate_spec(*, subscriber_query: Dict, agg_method: str): + """ + Query that performs per-subscriber aggregation of a table. Returns a column + 'subscriber' containing unique subscribers and a column 'value' containing the + aggregration. + + Parameters + ---------- + subscriber_query: SubscriberFeature + A query with a `subscriber` column + agg_method: {"count", "sum", "avg", "max", "min", "median", "stddev", "variance"} default "avg" + The method of aggregation to perform + """ + return { + "query_kind": "per_subscriber_aggregate", + "subscriber_query": subscriber_query, + "agg_method": agg_method, + } diff --git a/flowmachine/flowmachine/core/server/query_schemas/histogram_aggregate.py b/flowmachine/flowmachine/core/server/query_schemas/histogram_aggregate.py index 29383781a3..e80df522fc 100644 --- a/flowmachine/flowmachine/core/server/query_schemas/histogram_aggregate.py +++ b/flowmachine/flowmachine/core/server/query_schemas/histogram_aggregate.py @@ -4,26 +4,8 @@ from marshmallow import Schema, fields, post_load, validates_schema, ValidationError from marshmallow.validate import OneOf -from marshmallow_oneofschema import OneOfSchema from flowmachine.core.server.query_schemas.custom_fields import Bounds -from flowmachine.core.server.query_schemas.radius_of_gyration import ( - RadiusOfGyrationSchema, -) -from flowmachine.core.server.query_schemas.subscriber_degree import ( - SubscriberDegreeSchema, -) -from flowmachine.core.server.query_schemas.topup_amount import TopUpAmountSchema -from flowmachine.core.server.query_schemas.event_count import EventCountSchema -from flowmachine.core.server.query_schemas.nocturnal_events import NocturnalEventsSchema -from flowmachine.core.server.query_schemas.unique_location_counts import ( - UniqueLocationCountsSchema, -) -from flowmachine.core.server.query_schemas.displacement import DisplacementSchema -from flowmachine.core.server.query_schemas.pareto_interactions import ( - ParetoInteractionsSchema, -) -from flowmachine.core.server.query_schemas.topup_balance import TopUpBalanceSchema from flowmachine.features import HistogramAggregation from .base_exposed_query import BaseExposedQuery @@ -32,23 +14,7 @@ __all__ = ["HistogramAggregateSchema", "HistogramAggregateExposed"] from .base_schema import BaseSchema -from .total_active_periods import TotalActivePeriodsSchema - - -class HistogrammableMetrics(OneOfSchema): - type_field = "query_kind" - type_schemas = { - "radius_of_gyration": RadiusOfGyrationSchema, - "unique_location_counts": UniqueLocationCountsSchema, - "topup_balance": TopUpBalanceSchema, - "subscriber_degree": SubscriberDegreeSchema, - "topup_amount": TopUpAmountSchema, - "event_count": EventCountSchema, - "pareto_interactions": ParetoInteractionsSchema, - "nocturnal_events": NocturnalEventsSchema, - "displacement": DisplacementSchema, - "total_active_periods": TotalActivePeriodsSchema, - } +from .numeric_subscriber_metrics import NumericSubscriberMetricsSchema class HistogramBins(Schema): @@ -99,7 +65,7 @@ def _flowmachine_query_obj(self): class HistogramAggregateSchema(BaseSchema): # query_kind parameter is required here for claims validation query_kind = fields.String(validate=OneOf(["histogram_aggregate"])) - metric = fields.Nested(HistogrammableMetrics, required=True) + metric = fields.Nested(NumericSubscriberMetricsSchema, required=True) range = fields.Nested(Bounds) bins = fields.Nested(HistogramBins) diff --git a/flowmachine/flowmachine/core/server/query_schemas/numeric_subscriber_metrics.py b/flowmachine/flowmachine/core/server/query_schemas/numeric_subscriber_metrics.py new file mode 100644 index 0000000000..7d09f2ae44 --- /dev/null +++ b/flowmachine/flowmachine/core/server/query_schemas/numeric_subscriber_metrics.py @@ -0,0 +1,38 @@ +from marshmallow_oneofschema import OneOfSchema + +from flowmachine.core.server.query_schemas.displacement import DisplacementSchema +from flowmachine.core.server.query_schemas.event_count import EventCountSchema +from flowmachine.core.server.query_schemas.nocturnal_events import NocturnalEventsSchema +from flowmachine.core.server.query_schemas.pareto_interactions import ( + ParetoInteractionsSchema, +) +from flowmachine.core.server.query_schemas.radius_of_gyration import ( + RadiusOfGyrationSchema, +) +from flowmachine.core.server.query_schemas.subscriber_degree import ( + SubscriberDegreeSchema, +) +from flowmachine.core.server.query_schemas.topup_amount import TopUpAmountSchema +from flowmachine.core.server.query_schemas.topup_balance import TopUpBalanceSchema +from flowmachine.core.server.query_schemas.total_active_periods import ( + TotalActivePeriodsSchema, +) +from flowmachine.core.server.query_schemas.unique_location_counts import ( + UniqueLocationCountsSchema, +) + + +class NumericSubscriberMetricsSchema(OneOfSchema): + type_field = "query_kind" + type_schemas = { + "radius_of_gyration": RadiusOfGyrationSchema, + "unique_location_counts": UniqueLocationCountsSchema, + "topup_balance": TopUpBalanceSchema, + "subscriber_degree": SubscriberDegreeSchema, + "topup_amount": TopUpAmountSchema, + "event_count": EventCountSchema, + "pareto_interactions": ParetoInteractionsSchema, + "nocturnal_events": NocturnalEventsSchema, + "displacement": DisplacementSchema, + "total_active_periods": TotalActivePeriodsSchema, + } diff --git a/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py b/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py index 4fc3f10efc..f558f49564 100644 --- a/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py +++ b/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py @@ -4,10 +4,12 @@ from functools import reduce from marshmallow import fields -from marshmallow.validate import OneOf +from marshmallow.validate import OneOf, Length from flowmachine.core.server.query_schemas import BaseExposedQuery -from flowmachine.core.server.query_schemas.subscriber_subset import SubscriberSubset +from flowmachine.core.server.query_schemas.numeric_subscriber_metrics import ( + NumericSubscriberMetricsSchema, +) from flowmachine.core.server.query_schemas.base_schema import BaseSchema from flowmachine.features.subscriber.per_subscriber_aggregate import ( PerSubscriberAggregate, @@ -17,12 +19,16 @@ class PerSubscriberAggregateExposed(BaseExposedQuery): def __init__(self, subscriber_queries, agg_method): - self.subscriber_query = reduce(lambda x, y: x.join(y), subscriber_queries) + self.subscriber_queries = subscriber_queries self.agg_method = agg_method def _flomachine_query_obj(self): + subscriber_query = reduce( + lambda x, y: x._flowmachine_query_obj.union(y._flowmachine_query_obj), + self.subscriber_queries, + ) return PerSubscriberAggregate( - subscriber_query=self.subscriber_query, + subscriber_query=subscriber_query, agg_column="value", agg_method=self.agg_method, ) @@ -31,7 +37,9 @@ def _flomachine_query_obj(self): class PerSubscriberAggregateSchema(BaseSchema): query_kind = fields.String(validate=OneOf(["per_subscriber_aggregate"])) # TODO: Can we make this a list or a single query? - subscriber_query = fields.List(SubscriberSubset) + subscriber_query = fields.List( + fields.Nested(NumericSubscriberMetricsSchema), validate=Length(min=1) + ) agg_method = fields.String(validate=OneOf(agg_methods)) __model__ = PerSubscriberAggregateExposed diff --git a/integration_tests/tests/query_tests/test_queries.py b/integration_tests/tests/query_tests/test_queries.py index c55ba62389..f0ab81d612 100644 --- a/integration_tests/tests/query_tests/test_queries.py +++ b/integration_tests/tests/query_tests/test_queries.py @@ -707,7 +707,20 @@ event_types=["calls", "sms"], ), partial( - flowclient.per_subscriber_aggregate, + flowclient.per_subscriber_aggregate_spec, + subscriber_queries=[ + flowclient.total_active_periods_spec( + start_date="2016-01-01", + total_periods=1, + event_types=["calls", "sms"], + ), + flowclient.total_active_periods_spec( + start_date="2016-01-02", + total_periods=1, + event_types=["calls", "sms"], + ), + ], + agg_method="min", ), ] From 36d4811590484bf694cdb7c5cb9352477bb5b6aa Mon Sep 17 00:00:00 2001 From: john Date: Tue, 23 Nov 2021 16:55:31 +0000 Subject: [PATCH 8/8] per_susbscriber_aggregate now in approvals for flowmachine --- .../server/query_schemas/flowmachine_query.py | 2 ++ .../query_schemas/per_subscriber_aggregate.py | 7 +++--- .../tests/test_query_object_construction.py | 18 ++++++++++++++ ...truction.test_construct_query.approved.txt | 24 +++++++++++++++++++ 4 files changed, 48 insertions(+), 3 deletions(-) diff --git a/flowmachine/flowmachine/core/server/query_schemas/flowmachine_query.py b/flowmachine/flowmachine/core/server/query_schemas/flowmachine_query.py index 57bd9a1b01..a101ddd3c9 100644 --- a/flowmachine/flowmachine/core/server/query_schemas/flowmachine_query.py +++ b/flowmachine/flowmachine/core/server/query_schemas/flowmachine_query.py @@ -30,6 +30,7 @@ from .geography import GeographySchema from .location_event_counts import LocationEventCountsSchema from .most_frequent_location import MostFrequentLocationSchema +from .per_subscriber_aggregate import PerSubscriberAggregateSchema from .trips_od_matrix import TripsODMatrixSchema from .unique_subscriber_counts import UniqueSubscriberCountsSchema from .location_introversion import LocationIntroversionSchema @@ -66,6 +67,7 @@ class FlowmachineQuerySchema(OneOfSchema): "unmoving_counts": UnmovingCountsSchema, "unmoving_at_reference_location_counts": UnmovingAtReferenceLocationCountsSchema, "trips_od_matrix": TripsODMatrixSchema, + "per_subscriber_aggregate": PerSubscriberAggregateSchema, } diff --git a/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py b/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py index f558f49564..00ed67e732 100644 --- a/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py +++ b/flowmachine/flowmachine/core/server/query_schemas/per_subscriber_aggregate.py @@ -22,8 +22,10 @@ def __init__(self, subscriber_queries, agg_method): self.subscriber_queries = subscriber_queries self.agg_method = agg_method - def _flomachine_query_obj(self): + @property + def _flowmachine_query_obj(self): subscriber_query = reduce( + # TODO: Replace with Jono's new list input to union lambda x, y: x._flowmachine_query_obj.union(y._flowmachine_query_obj), self.subscriber_queries, ) @@ -36,8 +38,7 @@ def _flomachine_query_obj(self): class PerSubscriberAggregateSchema(BaseSchema): query_kind = fields.String(validate=OneOf(["per_subscriber_aggregate"])) - # TODO: Can we make this a list or a single query? - subscriber_query = fields.List( + subscriber_queries = fields.List( fields.Nested(NumericSubscriberMetricsSchema), validate=Length(min=1) ) agg_method = fields.String(validate=OneOf(agg_methods)) diff --git a/flowmachine/tests/test_query_object_construction.py b/flowmachine/tests/test_query_object_construction.py index 73e9db9e51..8818d82f94 100644 --- a/flowmachine/tests/test_query_object_construction.py +++ b/flowmachine/tests/test_query_object_construction.py @@ -269,6 +269,24 @@ def test_construct_query(diff_reporter): "event_types": ["calls", "sms"], "subscriber_subset": None, }, + { + "query_kind": "per_subscriber_aggregate", + "subscriber_queries": [ + { + "query_kind": "total_active_periods", + "start_date": "2016-01-01", + "total_periods": 1, + "event_types": ["calls", "sms"], + }, + { + "query_kind": "total_active_periods", + "start_date": "2016-01-02", + "total_periods": 1, + "event_types": ["calls", "sms"], + }, + ], + "agg_method": "min", + }, ] def get_query_id_for_query_spec(query_spec): diff --git a/flowmachine/tests/test_query_object_construction.test_construct_query.approved.txt b/flowmachine/tests/test_query_object_construction.test_construct_query.approved.txt index ce33d069b2..e32a23daec 100644 --- a/flowmachine/tests/test_query_object_construction.test_construct_query.approved.txt +++ b/flowmachine/tests/test_query_object_construction.test_construct_query.approved.txt @@ -354,5 +354,29 @@ "sms" ], "subscriber_subset": null + }, + "ce4e263fd59363ef7c4632f87b924d02": { + "query_kind": "per_subscriber_aggregate", + "subscriber_queries": [ + { + "query_kind": "total_active_periods", + "start_date": "2016-01-01", + "total_periods": 1, + "event_types": [ + "calls", + "sms" + ] + }, + { + "query_kind": "total_active_periods", + "start_date": "2016-01-02", + "total_periods": 1, + "event_types": [ + "calls", + "sms" + ] + } + ], + "agg_method": "min" } }