Skip to content

Commit

Permalink
Merge pull request #590 from Flowminder/total_network_objects
Browse files Browse the repository at this point in the history
Total Network Objects
  • Loading branch information
mergify[bot] authored Apr 9, 2019
2 parents a030995 + e5771e1 commit 5adf2f1
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [Unreleased]
### Added
- [#581] Implemented new flowclient API entrypoint, total_network_objects(), to access (with simplified parameters) equivalent flowmachine query

- [#577] Implemented new flowclient API entrypoint, location_introversion(), to access (with simplified parameters) equivalent flowmachine query

- [#562] Implemented new flowclient API entrypoint, unique_subscriber_counts(), to access (with simplified parameters) equivalent flowmachine query
Expand Down
4 changes: 4 additions & 0 deletions docs/notebook_preamble.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ def format_dict(x):
"permissions": {"run": True, "poll": True, "get_result": True},
"spatial_aggregation": ["admin3", "admin2", "admin1"],
},
"total_network_objects": {
"permissions": {"run": True, "poll": True, "get_result": True},
"spatial_aggregation": ["admin3", "admin2", "admin1"],
},
}

TOKEN = make_token(
Expand Down
2 changes: 1 addition & 1 deletion flowapi/flowapi/stream_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def stream_result_as_json(
logger.debug(f"Running {sql_query}", request_id=request.request_id)
try:
async for row in connection.cursor(sql_query):
yield f"{prepend}{json.dumps(dict(row.items()), number_mode=json.NM_DECIMAL)}".encode()
yield f"{prepend}{json.dumps(dict(row.items()), number_mode=json.NM_DECIMAL, datetime_mode=json.DM_ISO8601)}".encode()
prepend = ", "
logger.debug("Finishing up.", request_id=request.request_id)
yield b"]}"
Expand Down
1 change: 1 addition & 0 deletions flowauth/backend/flowauth/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ def make_demodata(): # pragma: no cover
"geography",
"unique_subscriber_counts",
"location_introversion",
"total_network_objects",
):
c = Capability(name=c)
db.session.add(c)
Expand Down
2 changes: 2 additions & 0 deletions flowclient/flowclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
get_available_dates,
unique_subscriber_counts,
location_introversion,
total_network_objects,
)

__all__ = [
Expand All @@ -52,4 +53,5 @@
"get_available_dates",
"unique_subscriber_counts",
"location_introversion",
"total_network_objects",
]
30 changes: 29 additions & 1 deletion flowclient/flowclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ def location_introversion(
Unit of aggregation, e.g. "admin3"
direction : {"in", "out", "both"}, default "both"
Optionally, include only ingoing or outbound calls/texts
>
Returns
-------
dict
Expand All @@ -994,3 +994,31 @@ def location_introversion(
"aggregation_unit": aggregation_unit,
"direction": direction,
}


def total_network_objects(
start_date: str, end_date: str, aggregation_unit: str
) -> dict:
"""
Return query spec for total network objects
Parameters
----------
start_date : str
ISO format date of the first day of the count, e.g. "2016-01-01"
end_date : str
ISO format date of the day _after_ the final date of the count, e.g. "2016-01-08"
aggregation_unit : str
Unit of aggregation, e.g. "admin3"
Returns
-------
dict
Dict which functions as the query specification
"""
return {
"query_kind": "total_network_objects",
"start_date": start_date,
"end_date": end_date,
"aggregation_unit": aggregation_unit,
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
LocationIntroversionSchema,
LocationIntroversionExposed,
)
from .total_network_objects import TotalNetworkObjectsSchema, TotalNetworkObjectsExposed
from .dfs_metric_total_amount import (
DFSTotalMetricAmountSchema,
DFSTotalMetricAmountExposed,
Expand All @@ -46,6 +47,7 @@ class FlowmachineQuerySchema(OneOfSchema):
"location_event_counts": LocationEventCountsSchema,
"unique_subscriber_counts": UniqueSubscriberCountsSchema,
"location_introversion": LocationIntroversionSchema,
"total_network_objects": TotalNetworkObjectsSchema,
"dfs_metric_total_amount": DFSTotalMetricAmountSchema,
}

Expand All @@ -72,6 +74,8 @@ def get_obj_type(self, obj):
return "unique_subscriber_counts"
elif isinstance(obj, LocationIntroversionExposed):
return "location_introversion"
elif isinstance(obj, TotalNetworkObjectsExposed):
return "total_network_objects"
elif isinstance(obj, DFSTotalMetricAmountExposed):
return "dfs_metric_total_amount"
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from marshmallow import Schema, fields, post_load
from marshmallow.validate import OneOf, Length

from flowmachine.features import TotalNetworkObjects
from .base_exposed_query import BaseExposedQuery
from .custom_fields import AggregationUnit

__all__ = ["TotalNetworkObjectsSchema", "TotalNetworkObjectsExposed"]


class TotalNetworkObjectsSchema(Schema):

start_date = fields.Date(required=True)
end_date = fields.Date(required=True)
aggregation_unit = AggregationUnit()

@post_load
def make_query_object(self, params):
return TotalNetworkObjectsExposed(**params)


class TotalNetworkObjectsExposed(BaseExposedQuery):
def __init__(self, *, start_date, end_date, aggregation_unit):
# Note: all input parameters need to be defined as attributes on `self`
# so that marshmallow can serialise the object correctly.
self.start_date = start_date
self.end_date = end_date
self.aggregation_unit = aggregation_unit

@property
def _flowmachine_query_obj(self):
"""
Return the underlying flowmachine daily_location object.
Returns
-------
Query
"""
return TotalNetworkObjects(
start=self.start_date, stop=self.end_date, level=self.aggregation_unit
)
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def test_get_available_queries(send_zmq_message_and_receive_reply):
"location_event_counts",
"unique_subscriber_counts",
"location_introversion",
"total_network_objects",
"dfs_metric_total_amount",
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"meaningful_locations_between_dates_od_matrix": "#/components/schemas/MeaningfulLocationsBetweenDatesODMatrix",
"meaningful_locations_between_label_od_matrix": "#/components/schemas/MeaningfulLocationsBetweenLabelODMatrix",
"modal_location": "#/components/schemas/ModalLocation",
"total_network_objects": "#/components/schemas/TotalNetworkObjects",
"unique_subscriber_counts": "#/components/schemas/UniqueSubscriberCounts"
},
"propertyName": "query_kind"
Expand Down Expand Up @@ -136,6 +137,9 @@
{
"$ref": "#/components/schemas/ModalLocation"
},
{
"$ref": "#/components/schemas/TotalNetworkObjects"
},
{
"$ref": "#/components/schemas/UniqueSubscriberCounts"
}
Expand Down Expand Up @@ -582,6 +586,33 @@
],
"type": "object"
},
"TotalNetworkObjects": {
"properties": {
"aggregation_unit": {
"enum": [
"admin0",
"admin1",
"admin2",
"admin3"
],
"type": "string"
},
"end_date": {
"format": "date",
"type": "string"
},
"start_date": {
"format": "date",
"type": "string"
}
},
"required": [
"aggregation_unit",
"end_date",
"start_date"
],
"type": "object"
},
"UniqueSubscriberCounts": {
"properties": {
"aggregation_unit": {
Expand Down
8 changes: 8 additions & 0 deletions integration_tests/tests/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@
"aggregation_unit": "admin3",
},
),
(
"total_network_objects",
{
"start_date": "2016-01-01",
"end_date": "2016-01-02",
"aggregation_unit": "admin3",
},
),
(
"location_introversion",
{
Expand Down

0 comments on commit 5adf2f1

Please sign in to comment.