Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune TOI adapted to postgres and standalone tileserver #204

Merged
merged 15 commits into from
Jun 5, 2017
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions config.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ toi-store:

# Configuration for the tiles of interest prune/garden command.
toi-prune:
# Connection and query configuration for a RedShift database containing
# location of tileserver logs
tile-traffic-log-path: ../tileserver/nohup.out

# Connection and query configuration for a database containing
# request information for tiles.
redshift:
tile-history:
database-uri: postgresql://user:password@localhost:5439/database
# The number of days of history to query for.
days: 30
Expand All @@ -210,6 +213,10 @@ toi-prune:
path: osm
layer: all
format: zip
# a reduced version of prev s3 entity
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this means.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the primary store configuration specifies all entries from toi-prune:s3 except layer and format. reduced version is a portion of the store-related configuration that specifies only remaining layer and format.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this comment more generic (so it's less about what was and more about what the following config is for)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ambientlight ☝️

Copy link
Contributor Author

@ambientlight ambientlight Jun 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvkelso yeah, actually forgot to change the .sample... done.

store:
layer: all
format: zip
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thoughts about the configuration:

  • we can probably just use store throughout and grow any api needed to support any operations across all backends.
  • regarding the duplication, on the one hand it's nice to just have it in one place. But in practice, it's very useful to have the configuration for the different operations separate, which allows us to handle production deploys and moments of transition more readily. Yaml allows us to specify pointers, so maybe we can do something clever here to refer to other sections?

What do you think @iandees ?

Copy link
Contributor Author

@ambientlight ambientlight May 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's very useful to have the configuration for the different operations separate, which allows us to handle production deploys and moments of transition more readily

What does moments of transition mean here?

But different commands still operate on the same store, right? They can differ only like in .alpha .beta .rc .release where different stores might be used? In this circumstance why not having separate configuration like config.alpha.yaml for each maturity? Or you mean having something like:

store:
  seed:
    type: directory
    name: tiles
    path: ../tiles
  prune-tiles-of-interest:
    type: s3
    name: tiles
    bucket: mapzen-tiles-dev-us-east
    reduced-redundancy: true
    date-prefix: 20170322

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does moments of transition mean here?

Production deployments basically.

But different commands still operate on the same store, right? They can differ only like in .alpha .beta .rc .release where different stores might be used? In this circumstance why not having separate configuration like config.alpha.yaml for each maturity? Or you mean having something like:

I think this has to do with the mechanics of our deployments more than anything else. The problem is that during a deployment we have some services use new store / database configurations, and others use the previous ones. It just ended up being easier to manage the configuration this way for some cases.

To give you a more concrete idea, typically we have tilequeue (offline tile generation) updated first to use newer code and configuration, and do a seed run to put tiles in a new store location. Then when that is more or less finished, tileserver/tapalcatl/others will get updated to point to this location, as well as any code updates to correspond with the new tiles.

We can probably clean this up more, but we're planning on re-evaluating our running infrastructure shortly anyway, so we'll probably end up considering different configuration changes in that light.

always-include:
# Sets of tiles to always include in the tiles of interest.
# For more information about options here, see the code:
Expand Down
8 changes: 7 additions & 1 deletion logging.conf.sample
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[loggers]
keys=root,process,seed,intersect,drain,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,dump_tiles_of_interest_from_redis,load_tiles_of_interest,wof_process_neighbourhoods,query
keys=root,process,seed,intersect,drain,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,dump_tiles_of_interest_from_redis,load_tiles_of_interest,wof_process_neighbourhoods,query,consume_tile_traffic

[handlers]
keys=consoleHandler
Expand Down Expand Up @@ -77,6 +77,12 @@ handlers=consoleHandler
qualName=query
propagate=0

[logger_consume_tile_traffic]
level=INFO
handlers=consoleHandler
qualName=consume_tile_traffic
propagate=0

[handler_consoleHandler]
class=StreamHandler
formatter=simpleFormatter
Expand Down
100 changes: 67 additions & 33 deletions tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@
from tilequeue.toi import save_set_to_fp
from tilequeue.top_tiles import parse_top_tiles
from tilequeue.utils import grouper
from tilequeue.utils import parse_log_file
from tilequeue.utils import mimic_prune_tiles_of_interest_sql_structure
from tilequeue.utils import postgres_add_compat_date_utils
from tilequeue.worker import DataFetch
from tilequeue.worker import ProcessAndFormatData
from tilequeue.worker import QueuePrint
from tilequeue.worker import S3Storage
from tilequeue.worker import SqsQueueReader
from tilequeue.worker import SqsQueueWriter
from tilequeue.postgresql import DBAffinityConnectionsNoLimit
from urllib2 import urlopen
from zope.dottedname.resolve import resolve
import argparse
Expand Down Expand Up @@ -949,6 +953,43 @@ def tilequeue_enqueue_tiles_of_interest(cfg, peripherals):
logger.info('%d tiles of interest processed' % n_toi)


def tilequeue_consume_tile_traffic(cfg, peripherals):
logger = make_logger(cfg, 'consume_tile_traffic')
logger.info('Consuming tile traffic logs ...')

tile_log_records = None
with open(cfg.tile_traffic_log_path, 'r') as log_file:
tile_log_records = parse_log_file(log_file)

if not tile_log_records:
logger.info("Couldn't parse log file")
sys.exit(1)

conn_info = dict(cfg.postgresql_conn_info)
dbnames = conn_info.pop('dbnames')
sql_conn_pool = DBAffinityConnectionsNoLimit(dbnames, conn_info, False)
sql_conn = sql_conn_pool.get_conns(1)[0]
with sql_conn.cursor() as cursor:
mimic_prune_tiles_of_interest_sql_structure(cursor)
postgres_add_compat_date_utils(cursor)

# insert the log records after the latest_date
cursor.execute('SELECT max(date) from tile_traffic_v4')
max_timestamp = cursor.fetchone()[0]

n_coords_inserted = 0
for host, timestamp, coord_int in tile_log_records:
if not max_timestamp or timestamp > max_timestamp:
coord = coord_unmarshall_int(coord_int)
cursor.execute("INSERT into tile_traffic_v4 (date, z, x, y, tilesize, service, host) VALUES ('%s', %d, %d, %d, %d, '%s', '%s')"
% (timestamp, coord.zoom, coord.column, coord.row, 512, 'vector-tiles', host))
n_coords_inserted += 1

logger.info('Inserted %d records' % n_coords_inserted)

sql_conn_pool.put_conns([sql_conn])


def emit_toi_stats(toi_set, peripherals):
"""
Calculates new TOI stats and emits them via statsd.
Expand All @@ -968,7 +1009,6 @@ def emit_toi_stats(toi_set, peripherals):
count
)


def tilequeue_prune_tiles_of_interest(cfg, peripherals):
logger = make_logger(cfg, 'prune_tiles_of_interest')
logger.info('Pruning tiles of interest ...')
Expand All @@ -982,28 +1022,36 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals):

prune_cfg = cfg.yml.get('toi-prune', {})

redshift_cfg = prune_cfg.get('redshift', {})
redshift_uri = redshift_cfg.get('database-uri')
assert redshift_uri, ("A redshift connection URI must "
tile_history_cfg = prune_cfg.get('tile-history', {})
db_conn_info = tile_history_cfg.get('database-uri')
assert db_conn_info, ("A postgres-compatible connection URI must "
"be present in the config yaml")

redshift_days_to_query = redshift_cfg.get('days')
redshift_days_to_query = tile_history_cfg.get('days')
assert redshift_days_to_query, ("Number of days to query "
"redshift is not specified")

redshift_zoom_cutoff = int(redshift_cfg.get('max-zoom', '16'))
redshift_zoom_cutoff = int(tile_history_cfg.get('max-zoom', '16'))

s3_parts = prune_cfg.get('s3')
assert s3_parts, ("The name of an S3 bucket containing tiles "
"to delete must be specified")
# flag indicating that s3 entry in toi-prune is used for s3 store
legacy_fallback = 's3' in prune_cfg
store_parts = prune_cfg.get('s3') or prune_cfg.get('store')
assert store_parts, ("The configuration of a store containing tiles "
"to delete must be specified under toi-prune:store or toi-prune:s3")
# explictly override the store configuration with values provided in toi-prune:s3
if legacy_fallback:
cfg.store_type = 's3'
cfg.s3_bucket = store_parts['bucket']
cfg.s3_date_prefix = store_parts['date-prefix']
cfg.s3_path = store_parts['path']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will definitely work, but related to the configuration discussion, I'd rather we normalized that where possible to avoid needing this kind of code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, waiting for what approach is decided regarding configuration.
this was done just to adapt the existing behavior to make_store's generated store instances.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmarianski Do these changes work for you now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, LGTM 👍


redshift_results = defaultdict(int)
with psycopg2.connect(redshift_uri) as conn:
with psycopg2.connect(db_conn_info) as conn:
with conn.cursor() as cur:
cur.execute("""
select x, y, z, tilesize, count(*)
from tile_traffic_v4
where (date >= dateadd(day, -{days}, getdate()))
where (date >= dateadd('day', -{days}, getdate()))
and (z between 0 and {max_zoom})
and (x between 0 and pow(2,z)-1)
and (y between 0 and pow(2,z)-1)
Expand All @@ -1012,7 +1060,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals):
order by z, x, y, tilesize
""".format(
days=redshift_days_to_query,
max_zoom=redshift_zoom_cutoff,
max_zoom=redshift_zoom_cutoff
))
for (x, y, z, tile_size, count) in cur:
coord = create_coord(x, y, z)
Expand Down Expand Up @@ -1097,25 +1145,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals):
len(toi_to_remove))
peripherals.stats.gauge('gardener.removed', len(toi_to_remove))

def delete_from_s3(s3_parts, coord_ints):
# Remove from S3
s3 = boto.connect_s3(cfg.aws_access_key_id, cfg.aws_secret_access_key)
buk = s3.get_bucket(s3_parts['bucket'], validate=False)
keys = [
s3_tile_key(
s3_parts['date-prefix'],
s3_parts['path'],
s3_parts['layer'],
coord_unmarshall_int(coord_int),
s3_parts['format']
)
for coord_int in coord_ints
]
del_result = buk.delete_keys(keys)
removed = len(del_result.deleted)

logger.info('Removed %s tiles from S3', removed)

store = make_store(cfg.store_type, cfg.s3_bucket, cfg)
if not toi_to_remove:
logger.info('Skipping TOI remove step because there are '
'no tiles to remove')
Expand All @@ -1124,7 +1154,9 @@ def delete_from_s3(s3_parts, coord_ints):
len(toi_to_remove))

for coord_ints in grouper(toi_to_remove, 1000):
delete_from_s3(s3_parts, coord_ints)
removed = store.delete_tiles(map(coord_unmarshall_int, coord_ints),
lookup_format_by_extension(store_parts['format']), store_parts['layer'])
logger.info('Removed %s tiles from S3', removed)

logger.info('Removing %s tiles from TOI and S3 ... done',
len(toi_to_remove))
Expand Down Expand Up @@ -1564,6 +1596,8 @@ def tilequeue_main(argv_args=None):
tilequeue_process_wof_neighbourhoods)),
('wof-load-initial-neighbourhoods', create_command_parser(
tilequeue_initial_load_wof_neighbourhoods)),
('consume-tile-traffic', create_command_parser(
tilequeue_consume_tile_traffic))
)
for parser_name, parser_func in parser_config:
subparser = subparsers.add_parser(parser_name)
Expand Down
2 changes: 2 additions & 0 deletions tilequeue/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def __init__(self, yml):
self.proc_queue_buffer_size = self._cfg('queue_buffer_size proc')
self.s3_queue_buffer_size = self._cfg('queue_buffer_size s3')

self.tile_traffic_log_path = self._cfg('toi-prune tile-traffic-log-path')

def _cfg(self, yamlkeys_str):
yamlkeys = yamlkeys_str.split()
yamlval = self.yml
Expand Down
2 changes: 2 additions & 0 deletions tilequeue/format/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def format_vtm(fp, feature_layers, zoom, bounds_merc, bounds_lnglat):
vtm=vtm_format,
mvt=mvt_format,
mvtb=mvtb_format,
zip=zip_format
)

name_to_format = {
Expand All @@ -116,6 +117,7 @@ def format_vtm(fp, feature_layers, zoom, bounds_merc, bounds_lnglat):
'TopoJSON': topojson_format,
'MVT': mvt_format,
'MVT Buffered': mvtb_format,
'ZIP Metatile': zip_format
}


Expand Down
5 changes: 3 additions & 2 deletions tilequeue/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ class DBAffinityConnectionsNoLimit(object):
# the connections. It's the caller's responsibility to call us
# back with the connection objects so that we can close them.

def __init__(self, dbnames, conn_info):
def __init__(self, dbnames, conn_info, readonly=True):
self.dbnames = cycle(dbnames)
self.conn_info = conn_info
self.conn_mapping = {}
self.lock = threading.Lock()
self.readonly = readonly

def _make_conn(self, conn_info):
conn = psycopg2.connect(**conn_info)
conn.set_session(readonly=True, autocommit=True)
conn.set_session(readonly=self.readonly, autocommit=True)
register_hstore(conn)
register_json(conn, loads=ujson.loads)
return conn
Expand Down
18 changes: 18 additions & 0 deletions tilequeue/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ def read_tile(self, coord, format, layer):
tile_data = key.get_contents_as_string()
return tile_data

def delete_tiles(self, coords, format, layer):
key_names = [
s3_tile_key(self.date_prefix, self.path, layer, coord, format.extension)
for coord in coords
]
del_result = self.bucket.delete_keys(key_names)
return len(del_result.deleted)


def make_dir_path(base_path, coord, layer):
path = os.path.join(
Expand Down Expand Up @@ -200,6 +208,16 @@ def read_tile(self, coord, format, layer):
except IOError:
return None

def delete_tiles(self, coords, format, layer):
delete_count = 0
for coord in coords:
file_path = make_file_path(self.base_path, coord, layer, format.extension)
if os.path.isfile(file_path):
os.remove(file_path)
delete_count += 1

return delete_count


def make_tile_file_store(base_path=None):
if base_path is None:
Expand Down
53 changes: 52 additions & 1 deletion tilequeue/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import sys
import traceback
import re
from itertools import islice

from datetime import datetime
from tilequeue.tile import coord_marshall_int
from tilequeue.tile import create_coord

def format_stacktrace_one_line(exc_info=None):
# exc_info is expected to be an exception tuple from sys.exc_info()
Expand All @@ -23,3 +26,51 @@ def grouper(iterable, n):
if not chunk:
return
yield chunk

def parse_log_file(log_file):
ip_pattern = '(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
# didn't match againts explicit date pattern, in case it changes
date_pattern = '\[([\d\w\s\/:]+)\]'
tile_id_pattern = '\/([\w]+)\/([\d]+)\/([\d]+)\/([\d]+)\.([\d\w]*)'

log_pattern = '%s - - %s "([\w]+) %s.*' % (ip_pattern, date_pattern, tile_id_pattern)

tile_log_records = []
for log_string in log_file:
match = re.search(log_pattern, log_string)
if match and len(match.groups()) == 8:
tile_log_records.append((match.group(1),
datetime.strptime(match.group(2), '%d/%B/%Y %H:%M:%S'),
coord_marshall_int(create_coord(match.group(6), match.group(7), match.group(5)))))

return tile_log_records

def mimic_prune_tiles_of_interest_sql_structure(cursor):
cursor.execute('''CREATE TABLE IF NOT EXISTS tile_traffic_v4 (
id bigserial primary key,
date timestamp(6) not null,
z integer not null,
x integer not null,
y integer not null,
tilesize integer not null,
service varchar(32),
host inet not null
)''')

def postgres_add_compat_date_utils(cursor):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the built-in postgres-compatible date functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to alter your query to redshift or have 2 distinct queries in prune_tiles_of_interest, since standard Postgres doesn't have dateadd() and getdate() I just added their implementation so there is no need for altering the query or differentiation between Postgres and redshift

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to use date functions that are shared between the two so that wrapping functions like this aren't required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I haven't used redshift thought before, can you suggest a preferable way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we had found something that worked on both platforms in a previous discussion, but looking back I see that's not the case.

It looks like both RedShift and PostgreSQL support interval literals:

On PostgreSQL

iandees=# select current_timestamp - interval '30 days' as dateplus;
           dateplus
-------------------------------
 2017-05-06 11:12:11.184408-04
(1 row)

On RedShift

analytics=# select current_timestamp - interval '30 days' as dateplus;
           dateplus
-------------------------------
 2017-05-06 15:12:20.733685+00
(1 row)

Can you change to using this interval literal notation in both cases and remove the code to add a wrapping function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing, sure. On it.

cursor.execute('''
CREATE OR REPLACE FUNCTION DATEADD(interval_kind VARCHAR(20), interval_offset INTEGER, dt DATE)
RETURNS TIMESTAMP AS $$
BEGIN
RETURN (SELECT dt + (interval_offset || ' ' || interval_kind)::INTERVAL);
END;
$$ language plpgsql
''')
cursor.execute('''
CREATE OR REPLACE FUNCTION GETDATE()
RETURNS DATE AS $$
BEGIN
RETURN (SELECT current_date);
END;
$$ language plpgsql
''')