Skip to content

Commit

Permalink
Refactoring HistogramMetics to NumericSubscriberMetricsSchema + other…
Browse files Browse the repository at this point in the history
… PR comments
  • Loading branch information
Thingus committed Nov 23, 2021
1 parent 333b460 commit 0ae659e
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 84 deletions.
3 changes: 1 addition & 2 deletions flowclient/flowclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
unique_locations_spec,
most_frequent_location_spec,
total_active_periods_spec,
per_subscriber_aggregate_spec,
)
from . import aggregates
from .aggregates import (
Expand All @@ -67,7 +68,6 @@
unmoving_counts,
consecutive_trips_od_matrix,
trips_od_matrix,
per_subscriber_aggregate,
)

__all__ = [
Expand Down Expand Up @@ -102,5 +102,4 @@
"unmoving_counts",
"consecutive_trips_od_matrix",
"trips_od_matrix",
"per_subscriber_aggregate",
]
40 changes: 0 additions & 40 deletions flowclient/flowclient/aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -1544,43 +1544,3 @@ def histogram_aggregate(*, connection: Connection, **kwargs) -> APIQuery:
Histogram aggregate query
"""
return connection.make_api_query(parameters=histogram_aggregate_spec(**kwargs))


def per_subscriber_aggregate_spec(*, subscriber_query: Dict, agg_method: str):
"""
Query that performs per-subscriber aggregation of a column. Returns a column
'subscriber' containing unique subscribers and a column 'value' containing the
aggregration.
Parameters
----------
subscriber_query: SubscriberFeature
A query with a `subscriber` column
agg_column: str
The name of the column in `subscriber_query` to aggregate. Cannot be 'subscriber'.
agg_method: {"count", "sum", "avg", "max", "min", "median", "stddev", "variance"} default "avg"
The method of aggregation to perform
"""
return {
"query_kind": "per_subscriber_aggregate",
"subscriber_query": subscriber_query,
"agg_method": agg_method,
}


def per_subscriber_aggregate(*, connection: Connection, **kwargs) -> APIQuery:
"""
Query that performs per-subscriber aggregation of a column. Returns a column
'subscriber' containing unique subscribers and a column 'value' containing the
aggregration.
Parameters
----------
subscriber_query: SubscriberFeature
A query with a `subscriber` column
agg_column: str
The name of the column in `subscriber_query` to aggregate. Cannot be 'subscriber'.
agg_method: {"count", "sum", "avg", "max", "min", "median", "stddev", "variance"} default "avg"
The method of aggregation to perform
"""
return connection.make_api_query(parameters=per_subscriber_aggregate_spec(**kwargs))
20 changes: 20 additions & 0 deletions flowclient/flowclient/query_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,3 +840,23 @@ def random_sample_spec(
)
sampled_query["sampling"] = sampling
return sampled_query


def per_subscriber_aggregate_spec(*, subscriber_query: Dict, agg_method: str):
"""
Query that performs per-subscriber aggregation of a table. Returns a column
'subscriber' containing unique subscribers and a column 'value' containing the
aggregration.
Parameters
----------
subscriber_query: SubscriberFeature
A query with a `subscriber` column
agg_method: {"count", "sum", "avg", "max", "min", "median", "stddev", "variance"} default "avg"
The method of aggregation to perform
"""
return {
"query_kind": "per_subscriber_aggregate",
"subscriber_query": subscriber_query,
"agg_method": agg_method,
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,8 @@

from marshmallow import Schema, fields, post_load, validates_schema, ValidationError
from marshmallow.validate import OneOf
from marshmallow_oneofschema import OneOfSchema

from flowmachine.core.server.query_schemas.custom_fields import Bounds
from flowmachine.core.server.query_schemas.radius_of_gyration import (
RadiusOfGyrationSchema,
)
from flowmachine.core.server.query_schemas.subscriber_degree import (
SubscriberDegreeSchema,
)
from flowmachine.core.server.query_schemas.topup_amount import TopUpAmountSchema
from flowmachine.core.server.query_schemas.event_count import EventCountSchema
from flowmachine.core.server.query_schemas.nocturnal_events import NocturnalEventsSchema
from flowmachine.core.server.query_schemas.unique_location_counts import (
UniqueLocationCountsSchema,
)
from flowmachine.core.server.query_schemas.displacement import DisplacementSchema
from flowmachine.core.server.query_schemas.pareto_interactions import (
ParetoInteractionsSchema,
)
from flowmachine.core.server.query_schemas.topup_balance import TopUpBalanceSchema

from flowmachine.features import HistogramAggregation
from .base_exposed_query import BaseExposedQuery
Expand All @@ -32,23 +14,7 @@
__all__ = ["HistogramAggregateSchema", "HistogramAggregateExposed"]

from .base_schema import BaseSchema
from .total_active_periods import TotalActivePeriodsSchema


class HistogrammableMetrics(OneOfSchema):
type_field = "query_kind"
type_schemas = {
"radius_of_gyration": RadiusOfGyrationSchema,
"unique_location_counts": UniqueLocationCountsSchema,
"topup_balance": TopUpBalanceSchema,
"subscriber_degree": SubscriberDegreeSchema,
"topup_amount": TopUpAmountSchema,
"event_count": EventCountSchema,
"pareto_interactions": ParetoInteractionsSchema,
"nocturnal_events": NocturnalEventsSchema,
"displacement": DisplacementSchema,
"total_active_periods": TotalActivePeriodsSchema,
}
from .numeric_subscriber_metrics import NumericSubscriberMetricsSchema


class HistogramBins(Schema):
Expand Down Expand Up @@ -99,7 +65,7 @@ def _flowmachine_query_obj(self):
class HistogramAggregateSchema(BaseSchema):
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf(["histogram_aggregate"]))
metric = fields.Nested(HistogrammableMetrics, required=True)
metric = fields.Nested(NumericSubscriberMetricsSchema, required=True)
range = fields.Nested(Bounds)
bins = fields.Nested(HistogramBins)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from marshmallow_oneofschema import OneOfSchema

from flowmachine.core.server.query_schemas.displacement import DisplacementSchema
from flowmachine.core.server.query_schemas.event_count import EventCountSchema
from flowmachine.core.server.query_schemas.nocturnal_events import NocturnalEventsSchema
from flowmachine.core.server.query_schemas.pareto_interactions import (
ParetoInteractionsSchema,
)
from flowmachine.core.server.query_schemas.radius_of_gyration import (
RadiusOfGyrationSchema,
)
from flowmachine.core.server.query_schemas.subscriber_degree import (
SubscriberDegreeSchema,
)
from flowmachine.core.server.query_schemas.topup_amount import TopUpAmountSchema
from flowmachine.core.server.query_schemas.topup_balance import TopUpBalanceSchema
from flowmachine.core.server.query_schemas.total_active_periods import (
TotalActivePeriodsSchema,
)
from flowmachine.core.server.query_schemas.unique_location_counts import (
UniqueLocationCountsSchema,
)


class NumericSubscriberMetricsSchema(OneOfSchema):
type_field = "query_kind"
type_schemas = {
"radius_of_gyration": RadiusOfGyrationSchema,
"unique_location_counts": UniqueLocationCountsSchema,
"topup_balance": TopUpBalanceSchema,
"subscriber_degree": SubscriberDegreeSchema,
"topup_amount": TopUpAmountSchema,
"event_count": EventCountSchema,
"pareto_interactions": ParetoInteractionsSchema,
"nocturnal_events": NocturnalEventsSchema,
"displacement": DisplacementSchema,
"total_active_periods": TotalActivePeriodsSchema,
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from functools import reduce

from marshmallow import fields
from marshmallow.validate import OneOf
from marshmallow.validate import OneOf, Length

from flowmachine.core.server.query_schemas import BaseExposedQuery
from flowmachine.core.server.query_schemas.subscriber_subset import SubscriberSubset
from flowmachine.core.server.query_schemas.numeric_subscriber_metrics import (
NumericSubscriberMetricsSchema,
)
from flowmachine.core.server.query_schemas.base_schema import BaseSchema
from flowmachine.features.subscriber.per_subscriber_aggregate import (
PerSubscriberAggregate,
Expand All @@ -17,12 +19,16 @@

class PerSubscriberAggregateExposed(BaseExposedQuery):
def __init__(self, subscriber_queries, agg_method):
self.subscriber_query = reduce(lambda x, y: x.join(y), subscriber_queries)
self.subscriber_queries = subscriber_queries
self.agg_method = agg_method

def _flomachine_query_obj(self):
subscriber_query = reduce(
lambda x, y: x._flowmachine_query_obj.union(y._flowmachine_query_obj),
self.subscriber_queries,
)
return PerSubscriberAggregate(
subscriber_query=self.subscriber_query,
subscriber_query=subscriber_query,
agg_column="value",
agg_method=self.agg_method,
)
Expand All @@ -31,7 +37,9 @@ def _flomachine_query_obj(self):
class PerSubscriberAggregateSchema(BaseSchema):
query_kind = fields.String(validate=OneOf(["per_subscriber_aggregate"]))
# TODO: Can we make this a list or a single query?
subscriber_query = fields.List(SubscriberSubset)
subscriber_query = fields.List(
fields.Nested(NumericSubscriberMetricsSchema), validate=Length(min=1)
)
agg_method = fields.String(validate=OneOf(agg_methods))

__model__ = PerSubscriberAggregateExposed
15 changes: 14 additions & 1 deletion integration_tests/tests/query_tests/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,20 @@
event_types=["calls", "sms"],
),
partial(
flowclient.per_subscriber_aggregate,
flowclient.per_subscriber_aggregate_spec,
subscriber_queries=[
flowclient.total_active_periods_spec(
start_date="2016-01-01",
total_periods=1,
event_types=["calls", "sms"],
),
flowclient.total_active_periods_spec(
start_date="2016-01-02",
total_periods=1,
event_types=["calls", "sms"],
),
],
agg_method="min",
),
]

Expand Down

0 comments on commit 0ae659e

Please sign in to comment.