-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prune TOI adapted to postgres and standalone tileserver #204
Changes from 11 commits
ccb565b
e931c6b
96fdb73
6bd1f8e
2a8997a
8692be2
5281f80
e95e234
bbfc011
2ca05fe
8e4e864
2258b56
2d1b08f
e999dc6
ff54142
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -188,9 +188,12 @@ toi-store: | |
|
||
# Configuration for the tiles of interest prune/garden command. | ||
toi-prune: | ||
# Connection and query configuration for a RedShift database containing | ||
# location of tileserver logs | ||
tile-traffic-log-path: ../tileserver/nohup.out | ||
|
||
# Connection and query configuration for a database containing | ||
# request information for tiles. | ||
redshift: | ||
tile-history: | ||
database-uri: postgresql://user:password@localhost:5439/database | ||
# The number of days of history to query for. | ||
days: 30 | ||
|
@@ -210,6 +213,10 @@ toi-prune: | |
path: osm | ||
layer: all | ||
format: zip | ||
# a reduced version of prev s3 entity | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you make this comment more generic (so it's less about what was and more about what the following config is for)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nvkelso yeah, actually forgot to change the .sample... done. |
||
store: | ||
layer: all | ||
format: zip | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My thoughts about the configuration:
What do you think @iandees ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
What does moments of transition mean here? But different commands still operate on the same store, right? They can differ only like in store:
seed:
type: directory
name: tiles
path: ../tiles
prune-tiles-of-interest:
type: s3
name: tiles
bucket: mapzen-tiles-dev-us-east
reduced-redundancy: true
date-prefix: 20170322 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Production deployments basically.
I think this has to do with the mechanics of our deployments more than anything else. The problem is that during a deployment we have some services use new store / database configurations, and others use the previous ones. It just ended up being easier to manage the configuration this way for some cases. To give you a more concrete idea, typically we have tilequeue (offline tile generation) updated first to use newer code and configuration, and do a seed run to put tiles in a new store location. Then when that is more or less finished, tileserver/tapalcatl/others will get updated to point to this location, as well as any code updates to correspond with the new tiles. We can probably clean this up more, but we're planning on re-evaluating our running infrastructure shortly anyway, so we'll probably end up considering different configuration changes in that light. |
||
always-include: | ||
# Sets of tiles to always include in the tiles of interest. | ||
# For more information about options here, see the code: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,12 +34,16 @@ | |
from tilequeue.toi import save_set_to_fp | ||
from tilequeue.top_tiles import parse_top_tiles | ||
from tilequeue.utils import grouper | ||
from tilequeue.utils import parse_log_file | ||
from tilequeue.utils import mimic_prune_tiles_of_interest_sql_structure | ||
from tilequeue.utils import postgres_add_compat_date_utils | ||
from tilequeue.worker import DataFetch | ||
from tilequeue.worker import ProcessAndFormatData | ||
from tilequeue.worker import QueuePrint | ||
from tilequeue.worker import S3Storage | ||
from tilequeue.worker import SqsQueueReader | ||
from tilequeue.worker import SqsQueueWriter | ||
from tilequeue.postgresql import DBAffinityConnectionsNoLimit | ||
from urllib2 import urlopen | ||
from zope.dottedname.resolve import resolve | ||
import argparse | ||
|
@@ -949,6 +953,43 @@ def tilequeue_enqueue_tiles_of_interest(cfg, peripherals): | |
logger.info('%d tiles of interest processed' % n_toi) | ||
|
||
|
||
def tilequeue_consume_tile_traffic(cfg, peripherals): | ||
logger = make_logger(cfg, 'consume_tile_traffic') | ||
logger.info('Consuming tile traffic logs ...') | ||
|
||
tile_log_records = None | ||
with open(cfg.tile_traffic_log_path, 'r') as log_file: | ||
tile_log_records = parse_log_file(log_file) | ||
|
||
if not tile_log_records: | ||
logger.info("Couldn't parse log file") | ||
sys.exit(1) | ||
|
||
conn_info = dict(cfg.postgresql_conn_info) | ||
dbnames = conn_info.pop('dbnames') | ||
sql_conn_pool = DBAffinityConnectionsNoLimit(dbnames, conn_info, False) | ||
sql_conn = sql_conn_pool.get_conns(1)[0] | ||
with sql_conn.cursor() as cursor: | ||
mimic_prune_tiles_of_interest_sql_structure(cursor) | ||
postgres_add_compat_date_utils(cursor) | ||
|
||
# insert the log records after the latest_date | ||
cursor.execute('SELECT max(date) from tile_traffic_v4') | ||
max_timestamp = cursor.fetchone()[0] | ||
|
||
n_coords_inserted = 0 | ||
for host, timestamp, coord_int in tile_log_records: | ||
if not max_timestamp or timestamp > max_timestamp: | ||
coord = coord_unmarshall_int(coord_int) | ||
cursor.execute("INSERT into tile_traffic_v4 (date, z, x, y, tilesize, service, host) VALUES ('%s', %d, %d, %d, %d, '%s', '%s')" | ||
% (timestamp, coord.zoom, coord.column, coord.row, 512, 'vector-tiles', host)) | ||
n_coords_inserted += 1 | ||
|
||
logger.info('Inserted %d records' % n_coords_inserted) | ||
|
||
sql_conn_pool.put_conns([sql_conn]) | ||
|
||
|
||
def emit_toi_stats(toi_set, peripherals): | ||
""" | ||
Calculates new TOI stats and emits them via statsd. | ||
|
@@ -968,7 +1009,6 @@ def emit_toi_stats(toi_set, peripherals): | |
count | ||
) | ||
|
||
|
||
def tilequeue_prune_tiles_of_interest(cfg, peripherals): | ||
logger = make_logger(cfg, 'prune_tiles_of_interest') | ||
logger.info('Pruning tiles of interest ...') | ||
|
@@ -982,28 +1022,36 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): | |
|
||
prune_cfg = cfg.yml.get('toi-prune', {}) | ||
|
||
redshift_cfg = prune_cfg.get('redshift', {}) | ||
redshift_uri = redshift_cfg.get('database-uri') | ||
assert redshift_uri, ("A redshift connection URI must " | ||
tile_history_cfg = prune_cfg.get('tile-history', {}) | ||
db_conn_info = tile_history_cfg.get('database-uri') | ||
assert db_conn_info, ("A postgres-compatible connection URI must " | ||
"be present in the config yaml") | ||
|
||
redshift_days_to_query = redshift_cfg.get('days') | ||
redshift_days_to_query = tile_history_cfg.get('days') | ||
assert redshift_days_to_query, ("Number of days to query " | ||
"redshift is not specified") | ||
|
||
redshift_zoom_cutoff = int(redshift_cfg.get('max-zoom', '16')) | ||
redshift_zoom_cutoff = int(tile_history_cfg.get('max-zoom', '16')) | ||
|
||
s3_parts = prune_cfg.get('s3') | ||
assert s3_parts, ("The name of an S3 bucket containing tiles " | ||
"to delete must be specified") | ||
# flag indicating that s3 entry in toi-prune is used for s3 store | ||
legacy_fallback = 's3' in prune_cfg | ||
store_parts = prune_cfg.get('s3') or prune_cfg.get('store') | ||
assert store_parts, ("The configuration of a store containing tiles " | ||
"to delete must be specified under toi-prune:store or toi-prune:s3") | ||
# explictly override the store configuration with values provided in toi-prune:s3 | ||
if legacy_fallback: | ||
cfg.store_type = 's3' | ||
cfg.s3_bucket = store_parts['bucket'] | ||
cfg.s3_date_prefix = store_parts['date-prefix'] | ||
cfg.s3_path = store_parts['path'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will definitely work, but related to the configuration discussion, I'd rather we normalized that where possible to avoid needing this kind of code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, waiting for what approach is decided regarding configuration. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rmarianski Do these changes work for you now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, LGTM 👍 |
||
|
||
redshift_results = defaultdict(int) | ||
with psycopg2.connect(redshift_uri) as conn: | ||
with psycopg2.connect(db_conn_info) as conn: | ||
with conn.cursor() as cur: | ||
cur.execute(""" | ||
select x, y, z, tilesize, count(*) | ||
from tile_traffic_v4 | ||
where (date >= dateadd(day, -{days}, getdate())) | ||
where (date >= dateadd('day', -{days}, getdate())) | ||
and (z between 0 and {max_zoom}) | ||
and (x between 0 and pow(2,z)-1) | ||
and (y between 0 and pow(2,z)-1) | ||
|
@@ -1012,7 +1060,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): | |
order by z, x, y, tilesize | ||
""".format( | ||
days=redshift_days_to_query, | ||
max_zoom=redshift_zoom_cutoff, | ||
max_zoom=redshift_zoom_cutoff | ||
)) | ||
for (x, y, z, tile_size, count) in cur: | ||
coord = create_coord(x, y, z) | ||
|
@@ -1097,25 +1145,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): | |
len(toi_to_remove)) | ||
peripherals.stats.gauge('gardener.removed', len(toi_to_remove)) | ||
|
||
def delete_from_s3(s3_parts, coord_ints): | ||
# Remove from S3 | ||
s3 = boto.connect_s3(cfg.aws_access_key_id, cfg.aws_secret_access_key) | ||
buk = s3.get_bucket(s3_parts['bucket'], validate=False) | ||
keys = [ | ||
s3_tile_key( | ||
s3_parts['date-prefix'], | ||
s3_parts['path'], | ||
s3_parts['layer'], | ||
coord_unmarshall_int(coord_int), | ||
s3_parts['format'] | ||
) | ||
for coord_int in coord_ints | ||
] | ||
del_result = buk.delete_keys(keys) | ||
removed = len(del_result.deleted) | ||
|
||
logger.info('Removed %s tiles from S3', removed) | ||
|
||
store = make_store(cfg.store_type, cfg.s3_bucket, cfg) | ||
if not toi_to_remove: | ||
logger.info('Skipping TOI remove step because there are ' | ||
'no tiles to remove') | ||
|
@@ -1124,7 +1154,9 @@ def delete_from_s3(s3_parts, coord_ints): | |
len(toi_to_remove)) | ||
|
||
for coord_ints in grouper(toi_to_remove, 1000): | ||
delete_from_s3(s3_parts, coord_ints) | ||
removed = store.delete_tiles(map(coord_unmarshall_int, coord_ints), | ||
lookup_format_by_extension(store_parts['format']), store_parts['layer']) | ||
logger.info('Removed %s tiles from S3', removed) | ||
|
||
logger.info('Removing %s tiles from TOI and S3 ... done', | ||
len(toi_to_remove)) | ||
|
@@ -1564,6 +1596,8 @@ def tilequeue_main(argv_args=None): | |
tilequeue_process_wof_neighbourhoods)), | ||
('wof-load-initial-neighbourhoods', create_command_parser( | ||
tilequeue_initial_load_wof_neighbourhoods)), | ||
('consume-tile-traffic', create_command_parser( | ||
tilequeue_consume_tile_traffic)) | ||
) | ||
for parser_name, parser_func in parser_config: | ||
subparser = subparsers.add_parser(parser_name) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,10 @@ | ||
import sys | ||
import traceback | ||
import re | ||
from itertools import islice | ||
|
||
from datetime import datetime | ||
from tilequeue.tile import coord_marshall_int | ||
from tilequeue.tile import create_coord | ||
|
||
def format_stacktrace_one_line(exc_info=None): | ||
# exc_info is expected to be an exception tuple from sys.exc_info() | ||
|
@@ -23,3 +26,51 @@ def grouper(iterable, n): | |
if not chunk: | ||
return | ||
yield chunk | ||
|
||
def parse_log_file(log_file): | ||
ip_pattern = '(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' | ||
# didn't match againts explicit date pattern, in case it changes | ||
date_pattern = '\[([\d\w\s\/:]+)\]' | ||
tile_id_pattern = '\/([\w]+)\/([\d]+)\/([\d]+)\/([\d]+)\.([\d\w]*)' | ||
|
||
log_pattern = '%s - - %s "([\w]+) %s.*' % (ip_pattern, date_pattern, tile_id_pattern) | ||
|
||
tile_log_records = [] | ||
for log_string in log_file: | ||
match = re.search(log_pattern, log_string) | ||
if match and len(match.groups()) == 8: | ||
tile_log_records.append((match.group(1), | ||
datetime.strptime(match.group(2), '%d/%B/%Y %H:%M:%S'), | ||
coord_marshall_int(create_coord(match.group(6), match.group(7), match.group(5))))) | ||
|
||
return tile_log_records | ||
|
||
def mimic_prune_tiles_of_interest_sql_structure(cursor): | ||
cursor.execute('''CREATE TABLE IF NOT EXISTS tile_traffic_v4 ( | ||
id bigserial primary key, | ||
date timestamp(6) not null, | ||
z integer not null, | ||
x integer not null, | ||
y integer not null, | ||
tilesize integer not null, | ||
service varchar(32), | ||
host inet not null | ||
)''') | ||
|
||
def postgres_add_compat_date_utils(cursor): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use the built-in postgres-compatible date functions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't want to alter your query to redshift or have 2 distinct queries in prune_tiles_of_interest, since standard Postgres doesn't have dateadd() and getdate() I just added their implementation so there is no need for altering the query or differentiation between Postgres and redshift There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer to use date functions that are shared between the two so that wrapping functions like this aren't required. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, I haven't used redshift thought before, can you suggest a preferable way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought we had found something that worked on both platforms in a previous discussion, but looking back I see that's not the case. It looks like both RedShift and PostgreSQL support interval literals: On PostgreSQLiandees=# select current_timestamp - interval '30 days' as dateplus;
dateplus
-------------------------------
2017-05-06 11:12:11.184408-04
(1 row) On RedShiftanalytics=# select current_timestamp - interval '30 days' as dateplus;
dateplus
-------------------------------
2017-05-06 15:12:20.733685+00
(1 row) Can you change to using this interval literal notation in both cases and remove the code to add a wrapping function? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Amazing, sure. On it. |
||
cursor.execute(''' | ||
CREATE OR REPLACE FUNCTION DATEADD(interval_kind VARCHAR(20), interval_offset INTEGER, dt DATE) | ||
RETURNS TIMESTAMP AS $$ | ||
BEGIN | ||
RETURN (SELECT dt + (interval_offset || ' ' || interval_kind)::INTERVAL); | ||
END; | ||
$$ language plpgsql | ||
''') | ||
cursor.execute(''' | ||
CREATE OR REPLACE FUNCTION GETDATE() | ||
RETURNS DATE AS $$ | ||
BEGIN | ||
RETURN (SELECT current_date); | ||
END; | ||
$$ language plpgsql | ||
''') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what this means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the primary store configuration specifies all entries from
toi-prune:s3
except layer and format. reduced version is a portion of the store-related configuration that specifies only remaining layer and format.