Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose random sampling via API #1168

Merged
merged 50 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f90e226
Define random sample schema
jc-harrison Jul 30, 2019
48c59df
Update Random docstring
jc-harrison Aug 2, 2019
0e01212
Change 'seed' value validation
jc-harrison Aug 2, 2019
a69e10e
Add post-load random sample object with method to create flowmachine …
jc-harrison Aug 2, 2019
41c4e04
Add missing import
jc-harrison Aug 6, 2019
c644fb7
Use a OneOffSchema for RandomSampleSchema
jc-harrison Aug 6, 2019
d696245
Add sampling parameter to daily_location
jc-harrison Aug 6, 2019
83178f9
Small fixes
jc-harrison Aug 7, 2019
ccb2e53
Re-implement random_ints to return the specified number of ints
jc-harrison Aug 7, 2019
8ee3404
Fix incorrect calling signature in random.py
jc-harrison Aug 7, 2019
a7b3614
Relax seed validity condition in query.py
jc-harrison Aug 7, 2019
972736b
No longer need a size_buffer for random_ids
jc-harrison Aug 7, 2019
7849440
Use random_sample method in random_sample schema
jc-harrison Aug 7, 2019
be641b2
Add tests for random sampling schema
jc-harrison Aug 7, 2019
1b1ffa8
Update docstring
jc-harrison Aug 7, 2019
1e792bd
Move seed check from Query class to Random class
jc-harrison Aug 8, 2019
84102d7
Refactor Random classes
jc-harrison Aug 8, 2019
c5f0ef2
Fix tests
jc-harrison Aug 9, 2019
90d5afd
Add sampling_method field for API spec visibility
jc-harrison Aug 9, 2019
8ea9a23
Approve integration tests
jc-harrison Aug 9, 2019
fa55c60
Allow sampling=None
jc-harrison Aug 9, 2019
12b7945
Add random_sample function to FlowClient
jc-harrison Aug 9, 2019
8c73a19
Fix default values in flowclient.handset
jc-harrison Aug 9, 2019
a8d955d
Add docstring for random_sample
jc-harrison Aug 9, 2019
2871525
Approve integration tests
jc-harrison Aug 9, 2019
ec3a4a0
Add integration tests for random sampling
jc-harrison Aug 9, 2019
c18226c
Remove _db_store_cache_metadata method
jc-harrison Aug 15, 2019
46414d8
Make random samples picklable
jc-harrison Aug 16, 2019
619c582
Add test for pickling Random objects
jc-harrison Aug 16, 2019
9e80c19
Fix tests that were skipped due to bad names
jc-harrison Aug 16, 2019
8c558b6
Merge branch 'master' of github.com:Flowminder/FlowKit into expose-ra…
jc-harrison Aug 16, 2019
f2e88a6
Move if/else logic for applying sampling into a helper function
jc-harrison Aug 16, 2019
95dc07f
Add sampling parameter to all non-aggregate query schemas
jc-harrison Aug 16, 2019
892f73b
Update CHANGELOG.md
jc-harrison Aug 16, 2019
2d676c9
Approve integration tests
jc-harrison Aug 16, 2019
1b4c107
Merge branch 'master' of github.com:Flowminder/FlowKit into expose-ra…
jc-harrison Aug 20, 2019
0102a7a
Pass query to random_sample in FlowClient
jc-harrison Aug 20, 2019
8eff4ec
Use dict() instead of copy()
jc-harrison Aug 20, 2019
847a0c1
Merge branch 'master' of github.com:Flowminder/FlowKit into expose-ra…
jc-harrison Aug 20, 2019
1c7b426
Move sampling stuff into parent classes
jc-harrison Aug 20, 2019
9fda795
Merge branch 'master' into expose-random-sampling
jc-harrison Aug 21, 2019
d3af3b4
Merge branch 'master' of github.com:Flowminder/FlowKit into expose-ra…
jc-harrison Aug 21, 2019
ad275e0
Add SeededRandom class
jc-harrison Aug 21, 2019
7f2e06d
Merge branch 'master' of github.com:Flowminder/FlowKit into expose-ra…
jc-harrison Aug 21, 2019
95ec982
Fix __init__
jc-harrison Aug 21, 2019
ae0a353
Rename SeededRandom to SeedableRandom
jc-harrison Aug 21, 2019
d7b87b4
Don't duplicate _sample_params
jc-harrison Aug 21, 2019
91db44d
Type annotations
jc-harrison Aug 21, 2019
de922fb
Merge branch 'master' of github.com:Flowminder/FlowKit into expose-ra…
jc-harrison Aug 21, 2019
3bc6eda
Merge branch 'master' into expose-random-sampling
jc-harrison Aug 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- FlowAuth now makes version information available at `/version` and displays it in the web ui. [#835](https://github.com/Flowminder/FlowKit/issues/835)
- FlowETL now comes with a deployment example (in `flowetl/deployment_example/`). [#1126](https://github.com/Flowminder/FlowKit/issues/1126)
- FlowETL now allows to run supplementary post-ETL queries. [#989](https://github.com/Flowminder/FlowKit/issues/989)
- Random sampling is now exposed via the API, for all non-aggregated query kinds. [#1007](https://github.com/Flowminder/FlowKit/issues/1007)

### Changed
- FlowDB is now based on PostgreSQL 11.5 and PostGIS 2.5.3
Expand Down
2 changes: 2 additions & 0 deletions flowclient/flowclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
pareto_interactions,
nocturnal_events,
handset,
random_sample,
)

__all__ = [
Expand Down Expand Up @@ -80,4 +81,5 @@
"pareto_interactions",
"nocturnal_events",
"handset",
"random_sample",
]
93 changes: 81 additions & 12 deletions flowclient/flowclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ def location_introversion(
Unit of aggregation, e.g. "admin3"
direction : {"in", "out", "both"}, default "both"
Optionally, include only ingoing or outbound calls/texts can be one of "in", "out" or "both"
>

Returns
-------
dict
Expand Down Expand Up @@ -1131,6 +1131,7 @@ def total_network_objects(
Unit of aggregation, e.g. "admin3"
total_by : {"second", "minute", "hour", "day", "month", "year"}
Time period to bucket by one of "second", "minute", "hour", "day", "month" or "year"

Returns
-------
dict
Expand Down Expand Up @@ -1276,6 +1277,7 @@ def unique_location_counts(
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
subset query.

Returns
-------
dict
Expand Down Expand Up @@ -1349,6 +1351,7 @@ def subscriber_degree(
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
subset query.

Returns
-------
dict
Expand Down Expand Up @@ -1385,6 +1388,7 @@ def topup_amount(
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
subset query.

Returns
-------
dict
Expand Down Expand Up @@ -1425,6 +1429,7 @@ def event_count(
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
subset query.

Returns
-------
dict
Expand Down Expand Up @@ -1465,6 +1470,7 @@ def displacement(
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
subset query.

Returns
-------
dict
Expand Down Expand Up @@ -1502,6 +1508,7 @@ def pareto_interactions(
Subset of subscribers to include in result. Must be None
(= all subscribers) or a dictionary with the specification of a
subset query.

Returns
-------
dict
Expand All @@ -1520,7 +1527,7 @@ def nocturnal_events(
*,
start: str,
stop: str,
hours: tuple((int, int)),
hours: Tuple[int, int],
subscriber_subset: Union[dict, None] = None,
) -> dict:
"""
Expand All @@ -1538,6 +1545,7 @@ def nocturnal_events(
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
subset query.

Returns
-------
dict
Expand All @@ -1557,14 +1565,8 @@ def handset(
*,
start_date: str,
end_date: str,
characteristic: str = [
"hnd_type",
"brand",
"model",
"software_os_name",
"software_os_vendor",
],
method: str = ["last", "most-common"],
characteristic: str = "hnd_type",
method: str = "last",
subscriber_subset: Union[dict, None] = None,
) -> dict:
"""
Expand All @@ -1576,14 +1578,15 @@ def handset(
ISO format date of the first day for which to count handsets, e.g. "2016-01-01"
stop : str
ISO format date of the day _after_ the final date for which to count handsets, e.g. "2016-01-08"
characteristic: ["hnd_type", "brand", "model", "software_os_name", "software_os_vendor"], default "hnd_type"
characteristic: {"hnd_type", "brand", "model", "software_os_name", "software_os_vendor"}, default "hnd_type"
The required handset characteristic.
method: ["last", "most-common"], default "last"
method: {"last", "most-common"}, default "last"
Method for choosing a handset to associate with subscriber.
subscriber_subset : dict or None, default None
Subset of subscribers to include in event counts. Must be None
(= all subscribers) or a dictionary with the specification of a
subset query.

Returns
-------
dict
Expand All @@ -1597,3 +1600,69 @@ def handset(
"method": method,
"subscriber_subset": subscriber_subset,
}


def random_sample(
*,
query: Dict[str, Union[str, dict]],
sampling_method: str = "system_rows",
size: Union[int, None] = None,
fraction: Union[float, None] = None,
estimate_count: bool = True,
seed: Union[float, None] = None,
) -> dict:
"""
Return spec for a random sample from a query result.

Parameters
----------
query : dict
Specification of the query to be sampled.
sampling_method : {'system_rows', 'system', 'bernoulli', 'random_ids'}, default 'system_rows'
Specifies the method used to select the random sample.
'system_rows': performs block-level sampling by randomly sampling
each physical storage page of the underlying relation. This
sampling method is guaranteed to provide a sample of the specified
size
'system': performs block-level sampling by randomly sampling each
physical storage page for the underlying relation. This
sampling method is not guaranteed to generate a sample of the
specified size, but an approximation. This method may not
produce a sample at all, so it might be worth running it again
if it returns an empty dataframe.
'bernoulli': samples directly on each row of the underlying
relation. This sampling method is slower and is not guaranteed to
generate a sample of the specified size, but an approximation
'random_ids': samples rows by randomly sampling the row number.
size : int, optional
The number of rows to draw.
Exactly one of the 'size' or 'fraction' arguments must be provided.
fraction : float, optional
Fraction of rows to draw.
Exactly one of the 'size' or 'fraction' arguments must be provided.
estimate_count : bool, default True
Whether to estimate the number of rows in the table using
information contained in the `pg_class` or whether to perform an
actual count in the number of rows.
seed : float, optional
Optionally provide a seed for repeatable random samples.
If using random_ids method, seed must be between -/+1.
Not available in combination with the system_rows method.

Returns
-------
dict
Dict which functions as the query specification.
"""
sampled_query = dict(query)
sampling = {
"sampling_method": sampling_method,
"size": size,
"fraction": fraction,
"estimate_count": estimate_count,
}
if seed is not None:
# 'system_rows' method doesn't accept a seed parameter, so if seed is None we don't include it in the spec
sampling["seed"] = seed
sampled_query["sampling"] = sampling
return sampled_query
9 changes: 1 addition & 8 deletions flowdb/sql/functions_001_utilities.sql
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,11 @@ CREATE OR REPLACE FUNCTION random_ints (seed DOUBLE PRECISION, n_samples INT, ma
RETURNS TABLE (id INT)
AS $$
DECLARE new_seed NUMERIC;
DECLARE samples double precision[] := array[]::double precision[];
BEGIN
new_seed = random();
PERFORM setseed(seed);
FOR i in 1..n_samples LOOP
samples := array_append(samples, random());
END LOOP;
RETURN QUERY SELECT generate_series AS id FROM generate_series(1, max_val) ORDER BY random() LIMIT n_samples;
PERFORM setseed(new_seed);
RETURN QUERY SELECT
round(samples[generate_series] * max_val)::integer as id
FROM generate_series(1, n_samples)
GROUP BY id;
END; $$

LANGUAGE plpgsql
Expand Down
12 changes: 11 additions & 1 deletion flowdb/tests/test_utility_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,18 @@ def test_seeded_random_ints(cursor):
"""Seeded random integers should return some predictable outputs."""
sql = "SELECT * from random_ints(0, 5, 10)"
cursor.execute(sql)
first_vals = [x["id"] for x in cursor.fetchall()]
cursor.execute(sql)
second_vals = [x["id"] for x in cursor.fetchall()]
assert first_vals == second_vals


def test_random_ints_n_samples(cursor):
"""random_ints should return the requested number of random integers."""
sql = "SELECT * from random_ints(0, 5, 10)"
cursor.execute(sql)
vals = [x["id"] for x in cursor.fetchall()]
assert [9, 4, 8] == vals
assert len(vals) == 5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥇



def test_seeded_random_ints_seed_reset(cursor):
Expand Down
2 changes: 1 addition & 1 deletion flowmachine/flowmachine/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def write_cache_metadata(
try:
self_storage = pickle.dumps(query)
except Exception as e:
logger.debug("Can't pickle ({e}), attempting to cache anyway.")
logger.debug(f"Can't pickle ({e}), attempting to cache anyway.")
pass

try:
Expand Down
Loading