From ccb565b3b0615c0294239499219508ae1be6f32b Mon Sep 17 00:00:00 2001 From: ambientlight Date: Mon, 8 May 2017 13:49:09 +0800 Subject: [PATCH 01/13] consume-tile-traffic base implementation --- tilequeue/command.py | 40 +++++++++++++++++++++++++++++++ tilequeue/config.py | 2 ++ tilequeue/postgresql.py | 5 ++-- tilequeue/utils.py | 53 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 97 insertions(+), 3 deletions(-) diff --git a/tilequeue/command.py b/tilequeue/command.py index c594c039..261e82d5 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -34,12 +34,16 @@ from tilequeue.toi import save_set_to_fp from tilequeue.top_tiles import parse_top_tiles from tilequeue.utils import grouper +from tilequeue.utils import parse_log_file +from tilequeue.utils import mimic_prune_tiles_of_interest_sql_structure +from tilequeue.utils import postgres_add_compat_date_utils from tilequeue.worker import DataFetch from tilequeue.worker import ProcessAndFormatData from tilequeue.worker import QueuePrint from tilequeue.worker import S3Storage from tilequeue.worker import SqsQueueReader from tilequeue.worker import SqsQueueWriter +from tilequeue.postgresql import DBAffinityConnectionsNoLimit from urllib2 import urlopen from zope.dottedname.resolve import resolve import argparse @@ -948,6 +952,40 @@ def tilequeue_enqueue_tiles_of_interest(cfg, peripherals): logger.info('%d tiles of interest processed' % n_toi) +def tilequeue_consume_tile_traffic(cfg, peripherals): + logger = make_logger(cfg, 'consume_tile_traffic') + logger.info('Consuming tile traffic logs ...') + logger.info(cfg.tile_traffic_log_path) + + iped_dated_coords = None + with open(cfg.tile_traffic_log_path, 'r') as log_file: + iped_dated_coords = parse_log_file(log_file) + + if not iped_dated_coords: + logger.info("Couldn't parse log file") + sys.exit(1) + + conn_info = dict(cfg.postgresql_conn_info) + dbnames = conn_info.pop('dbnames') + sql_conn_pool = DBAffinityConnectionsNoLimit(dbnames, conn_info, False) + sql_conn = sql_conn_pool.get_conns(1)[0] + with sql_conn.cursor() as cursor: + mimic_prune_tiles_of_interest_sql_structure(cursor) + postgres_add_compat_date_utils(cursor) + + # insert the log records after the latest_date + cursor.execute('SELECT max(date) from tile_traffic_v4') + max_timestamp = cursor.fetchone()[0] + iped_dated_coords_to_insert = filter(lambda iped_dated_coord: iped_dated_coord[1] > max_timestamp, iped_dated_coords) if max_timestamp else iped_dated_coords + for (host, timestamp, marchalled_coord) in iped_dated_coords_to_insert: + coord = coord_unmarshall_int(marchalled_coord) + cursor.execute("INSERT into tile_traffic_v4 (date, z, x, y, tilesize, service, host) VALUES ('%s', %d, %d, %d, %d, '%s', '%s')" + % (timestamp, coord.zoom, coord.column, coord.row, 512, 'vector-tiles', host)) + + logger.info('Inserted %d records' % len(iped_dated_coords_to_insert)) + + sql_conn_pool.put_conns([sql_conn]) + def tilequeue_prune_tiles_of_interest(cfg, peripherals): logger = make_logger(cfg, 'prune_tiles_of_interest') logger.info('Pruning tiles of interest ...') @@ -1541,6 +1579,8 @@ def tilequeue_main(argv_args=None): tilequeue_process_wof_neighbourhoods)), ('wof-load-initial-neighbourhoods', create_command_parser( tilequeue_initial_load_wof_neighbourhoods)), + ('consume-tile-traffic', create_command_parser( + tilequeue_consume_tile_traffic)) ) for parser_name, parser_func in parser_config: subparser = subparsers.add_parser(parser_name) diff --git a/tilequeue/config.py b/tilequeue/config.py index 601e324d..2b7ad055 100644 --- a/tilequeue/config.py +++ b/tilequeue/config.py @@ -128,6 +128,8 @@ def __init__(self, yml): self.proc_queue_buffer_size = self._cfg('queue_buffer_size proc') self.s3_queue_buffer_size = self._cfg('queue_buffer_size s3') + self.tile_traffic_log_path = self._cfg('toi-prune tile-traffic-log-path') + def _cfg(self, yamlkeys_str): yamlkeys = yamlkeys_str.split() yamlval = self.yml diff --git a/tilequeue/postgresql.py b/tilequeue/postgresql.py index 9ccdaf4b..9cb73541 100644 --- a/tilequeue/postgresql.py +++ b/tilequeue/postgresql.py @@ -11,15 +11,16 @@ class DBAffinityConnectionsNoLimit(object): # the connections. It's the caller's responsibility to call us # back with the connection objects so that we can close them. - def __init__(self, dbnames, conn_info): + def __init__(self, dbnames, conn_info, readonly=True): self.dbnames = cycle(dbnames) self.conn_info = conn_info self.conn_mapping = {} self.lock = threading.Lock() + self.readonly = readonly def _make_conn(self, conn_info): conn = psycopg2.connect(**conn_info) - conn.set_session(readonly=True, autocommit=True) + conn.set_session(readonly=self.readonly, autocommit=True) register_hstore(conn) register_json(conn, loads=ujson.loads) return conn diff --git a/tilequeue/utils.py b/tilequeue/utils.py index 7fceacb2..a8a629ec 100644 --- a/tilequeue/utils.py +++ b/tilequeue/utils.py @@ -1,7 +1,10 @@ import sys import traceback +import re from itertools import islice - +from datetime import datetime +from tilequeue.tile import coord_marshall_int +from tilequeue.tile import create_coord def format_stacktrace_one_line(exc_info=None): # exc_info is expected to be an exception tuple from sys.exc_info() @@ -23,3 +26,51 @@ def grouper(iterable, n): if not chunk: return yield chunk + +def parse_log_file(log_file): + ip_pattern = '(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' + # didn't match againts explicit date pattern, in case it changes + date_pattern = '\[([\d\w\s\/:]+)\]' + tile_id_pattern = '\/([\w]+)\/([\d]+)\/([\d]+)\/([\d]+)\.([\d\w]*)' + + log_pattern = '%s - - %s "([\w]+) %s.*' % (ip_pattern, date_pattern, tile_id_pattern) + + matches = filter( + lambda match: match and len(match.groups()) == 8, + map(lambda log_string: re.search(log_pattern, log_string), log_file) + ) + + iped_dated_coords = map(lambda match: (match.group(1), + datetime.strptime(match.group(2), '%d/%B/%Y %H:%M:%S'), + coord_marshall_int(create_coord(match.group(6), match.group(7), match.group(5)))), matches) + return iped_dated_coords + +def mimic_prune_tiles_of_interest_sql_structure(cursor): + cursor.execute('''CREATE TABLE IF NOT EXISTS tile_traffic_v4 ( + id bigserial primary key, + date timestamp(6) not null, + z integer not null, + x integer not null, + y integer not null, + tilesize integer not null, + service varchar(32), + host inet not null + )''') + +def postgres_add_compat_date_utils(cursor): + cursor.execute(''' + CREATE OR REPLACE FUNCTION DATEADD(interval_kind VARCHAR(20), interval_offset INTEGER, dt DATE) + RETURNS TIMESTAMP AS $$ + BEGIN + RETURN (SELECT dt + (interval_offset || ' ' || interval_kind)::INTERVAL); + END; + $$ language plpgsql + ''') + cursor.execute(''' + CREATE OR REPLACE FUNCTION GETDATE() + RETURNS DATE AS $$ + BEGIN + RETURN (SELECT current_date); + END; + $$ language plpgsql + ''') From e931c6bfccfa6a151ca4866a93527c0ad7d09b92 Mon Sep 17 00:00:00 2001 From: ambientlight Date: Mon, 8 May 2017 14:07:44 +0800 Subject: [PATCH 02/13] added new config entry --- config.yaml.sample | 3 +++ 1 file changed, 3 insertions(+) diff --git a/config.yaml.sample b/config.yaml.sample index 53bb0dc1..f174f60d 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -188,6 +188,9 @@ toi-store: # Configuration for the tiles of interest prune/garden command. toi-prune: + # location of tileserver logs + tile-traffic-log-path: ../tileserver/nohup.out + # Connection and query configuration for a RedShift database containing # request information for tiles. redshift: From 96fdb737aab9e9f9c8b2f2f5e6868fc0bfa64ffd Mon Sep 17 00:00:00 2001 From: ambientlight Date: Mon, 8 May 2017 15:34:30 +0800 Subject: [PATCH 03/13] fallback to postgres settings when redshift database-uri is not specified --- config.yaml.sample | 6 +++++- tilequeue/command.py | 17 +++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/config.yaml.sample b/config.yaml.sample index f174f60d..937c2122 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -190,10 +190,14 @@ toi-store: toi-prune: # location of tileserver logs tile-traffic-log-path: ../tileserver/nohup.out - + # Connection and query configuration for a RedShift database containing # request information for tiles. redshift: + # if database-uri is not specified, postgresql configuration above is used + # which relies on following precondition: tilequeue consume-tile-traffic + # needs to be used to pre-populate the designated table with tile traffic logs + # and add date util functions for postgres/redshift compatability database-uri: postgresql://user:password@localhost:5439/database # The number of days of history to query for. days: 30 diff --git a/tilequeue/command.py b/tilequeue/command.py index 261e82d5..045069de 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -1000,9 +1000,15 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): prune_cfg = cfg.yml.get('toi-prune', {}) redshift_cfg = prune_cfg.get('redshift', {}) - redshift_uri = redshift_cfg.get('database-uri') - assert redshift_uri, ("A redshift connection URI must " + db_conn_info = redshift_cfg.get('database-uri') or cfg.postgresql_conn_info + assert db_conn_info, ("A redshift connection or postgres configuration URI must " "be present in the config yaml") + + is_postgres_conn_info = isinstance(db_conn_info, dict) + if is_postgres_conn_info: + # use first database specified in postgres config for connection + dbname = db_conn_info.pop('dbnames')[0] + db_conn_info = dict(db_conn_info, dbname=dbname) redshift_days_to_query = redshift_cfg.get('days') assert redshift_days_to_query, ("Number of days to query " @@ -1015,12 +1021,13 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): "to delete must be specified") redshift_results = defaultdict(int) - with psycopg2.connect(redshift_uri) as conn: + # db_conn_info is uri when using redshift, dict otherwise + with psycopg2.connect(**db_conn_info if is_postgres_conn_info else db_conn_info) as conn: with conn.cursor() as cur: cur.execute(""" select x, y, z, tilesize, count(*) from tile_traffic_v4 - where (date >= dateadd(day, -{days}, getdate())) + where (date >= dateadd({opt_quote}day{opt_quote}, -{days}, getdate())) and (z between 0 and {max_zoom}) and (x between 0 and pow(2,z)-1) and (y between 0 and pow(2,z)-1) @@ -1030,6 +1037,8 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): """.format( days=redshift_days_to_query, max_zoom=redshift_zoom_cutoff, + # postgres replica of dateadd(check utils.py) first arg(interval kind) is a string + opt_quote="'" if is_postgres_conn_info else "" )) for (x, y, z, tile_size, count) in cur: coord = create_coord(x, y, z) From 6bd1f8ed6bc81aeda829e813f972c9ccb287665b Mon Sep 17 00:00:00 2001 From: ambientlight Date: Mon, 8 May 2017 17:03:19 +0800 Subject: [PATCH 04/13] toi-prune to use store configuration instead of redundant s3 config, delete_tiles for S3/TileDirectory store, zip_metatile added into extension_to_format/name_to_format --- config.yaml.sample | 4 ++++ tilequeue/command.py | 38 ++++++++++++++---------------------- tilequeue/format/__init__.py | 2 ++ tilequeue/store.py | 18 +++++++++++++++++ 4 files changed, 39 insertions(+), 23 deletions(-) diff --git a/config.yaml.sample b/config.yaml.sample index 937c2122..263d016f 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -217,6 +217,10 @@ toi-prune: path: osm layer: all format: zip + # a reduced version of prev s3 entity + store: + layer: all + format: zip always-include: # Sets of tiles to always include in the tiles of interest. # For more information about options here, see the code: diff --git a/tilequeue/command.py b/tilequeue/command.py index 045069de..1d0f51b2 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -1016,9 +1016,17 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): redshift_zoom_cutoff = int(redshift_cfg.get('max-zoom', '16')) - s3_parts = prune_cfg.get('s3') - assert s3_parts, ("The name of an S3 bucket containing tiles " - "to delete must be specified") + # flag indicating that s3 entry in toi-prune is used for s3 store + legacy_fallback = 's3' in prune_cfg + store_parts = prune_cfg.get('s3') or prune_cfg.get('store') + assert store_parts, ("The configuration of a store containing tiles " + "to delete must be specified under toi-prune:store or toi-prune:s3") + # explictly override the store configuration with values provided in toi-prune:s3 + if legacy_fallback: + cfg.store_type = 's3' + cfg.s3_bucket = store_parts['bucket'] + cfg.s3_date_prefix = store_parts['date-prefix'] + cfg.s3_path = store_parts['path'] redshift_results = defaultdict(int) # db_conn_info is uri when using redshift, dict otherwise @@ -1123,25 +1131,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): len(toi_to_remove)) peripherals.stats.gauge('gardener.removed', len(toi_to_remove)) - def delete_from_s3(s3_parts, coord_ints): - # Remove from S3 - s3 = boto.connect_s3(cfg.aws_access_key_id, cfg.aws_secret_access_key) - buk = s3.get_bucket(s3_parts['bucket'], validate=False) - keys = [ - s3_tile_key( - s3_parts['date-prefix'], - s3_parts['path'], - s3_parts['layer'], - coord_unmarshall_int(coord_int), - s3_parts['format'] - ) - for coord_int in coord_ints - ] - del_result = buk.delete_keys(keys) - removed = len(del_result.deleted) - - logger.info('Removed %s tiles from S3', removed) - + store = make_store(cfg.store_type, cfg.s3_bucket, cfg) if not toi_to_remove: logger.info('Skipping TOI remove step because there are ' 'no tiles to remove') @@ -1150,7 +1140,9 @@ def delete_from_s3(s3_parts, coord_ints): len(toi_to_remove)) for coord_ints in grouper(toi_to_remove, 1000): - delete_from_s3(s3_parts, coord_ints) + removed = store.delete_tiles(map(lambda coord_int: coord_unmarshall_int(coord_int), coord_ints), + lookup_format_by_extension(store_parts['format']), store_parts['layer']) + logger.info('Removed %s tiles from S3', removed) logger.info('Removing %s tiles from TOI and S3 ... done', len(toi_to_remove)) diff --git a/tilequeue/format/__init__.py b/tilequeue/format/__init__.py index 539ed385..91297cf3 100644 --- a/tilequeue/format/__init__.py +++ b/tilequeue/format/__init__.py @@ -108,6 +108,7 @@ def format_vtm(fp, feature_layers, zoom, bounds_merc, bounds_lnglat): vtm=vtm_format, mvt=mvt_format, mvtb=mvtb_format, + zip=zip_format ) name_to_format = { @@ -116,6 +117,7 @@ def format_vtm(fp, feature_layers, zoom, bounds_merc, bounds_lnglat): 'TopoJSON': topojson_format, 'MVT': mvt_format, 'MVT Buffered': mvtb_format, + 'ZIP Metatile': zip_format } diff --git a/tilequeue/store.py b/tilequeue/store.py index 328aaf44..2b47f99e 100644 --- a/tilequeue/store.py +++ b/tilequeue/store.py @@ -67,6 +67,14 @@ def read_tile(self, coord, format, layer): tile_data = key.get_contents_as_string() return tile_data + def delete_tiles(self, coords, format, layer): + key_names = [ + s3_tile_key(self.date_prefix, self.path, layer, coord, format.extension) + for coord in coords + ] + del_result = self.bucket.delete_keys(key_names) + return len(del_result.deleted) + def make_dir_path(base_path, coord, layer): path = os.path.join( @@ -200,6 +208,16 @@ def read_tile(self, coord, format, layer): except IOError: return None + def delete_tiles(self, coords, format, layer): + delete_count = 0 + for coord in coords: + file_path = make_file_path(self.base_path, coord, layer, format.extension) + if os.path.isfile(file_path): + os.remove(file_path) + delete_count += 1 + + return delete_count + def make_tile_file_store(base_path=None): if base_path is None: From 8692be2102c6bae1dec7d4723979a6df1217cf02 Mon Sep 17 00:00:00 2001 From: ambientlight Date: Mon, 8 May 2017 17:16:38 +0800 Subject: [PATCH 05/13] corrected assertion message --- tilequeue/command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tilequeue/command.py b/tilequeue/command.py index 402ea34d..d0d04115 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -1022,7 +1022,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): redshift_cfg = prune_cfg.get('redshift', {}) db_conn_info = redshift_cfg.get('database-uri') or cfg.postgresql_conn_info - assert db_conn_info, ("A redshift connection or postgres configuration URI must " + assert db_conn_info, ("A redshift connection URI or postgres configuration must " "be present in the config yaml") is_postgres_conn_info = isinstance(db_conn_info, dict) From 5281f80e2e8e984c71b1b45e4c5af623d8a78f6f Mon Sep 17 00:00:00 2001 From: ambientlight Date: Tue, 9 May 2017 15:09:55 +0800 Subject: [PATCH 06/13] logging configuration for consume_tile_traffic --- logging.conf.sample | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/logging.conf.sample b/logging.conf.sample index 0c4d6c8c..ab154b2e 100644 --- a/logging.conf.sample +++ b/logging.conf.sample @@ -1,5 +1,5 @@ [loggers] -keys=root,process,seed,intersect,drain,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,dump_tiles_of_interest_from_redis,load_tiles_of_interest,wof_process_neighbourhoods,query +keys=root,process,seed,intersect,drain,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,dump_tiles_of_interest_from_redis,load_tiles_of_interest,wof_process_neighbourhoods,query,consume_tile_traffic [handlers] keys=consoleHandler @@ -77,6 +77,12 @@ handlers=consoleHandler qualName=query propagate=0 +[logger_consume_tile_traffic] +level=INFO +handlers=consoleHandler +qualName=consume_tile_traffic +propagate=0 + [handler_consoleHandler] class=StreamHandler formatter=simpleFormatter From e95e234d89c5531f46582ff3833743e31703061b Mon Sep 17 00:00:00 2001 From: ambientlight Date: Tue, 9 May 2017 15:59:01 +0800 Subject: [PATCH 07/13] converted lambdas to loops in consume_tile_traffic --- tilequeue/command.py | 16 +++++++++------- tilequeue/utils.py | 16 ++++++++-------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/tilequeue/command.py b/tilequeue/command.py index d0d04115..075c6d45 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -956,8 +956,7 @@ def tilequeue_enqueue_tiles_of_interest(cfg, peripherals): def tilequeue_consume_tile_traffic(cfg, peripherals): logger = make_logger(cfg, 'consume_tile_traffic') logger.info('Consuming tile traffic logs ...') - logger.info(cfg.tile_traffic_log_path) - + iped_dated_coords = None with open(cfg.tile_traffic_log_path, 'r') as log_file: iped_dated_coords = parse_log_file(log_file) @@ -977,13 +976,16 @@ def tilequeue_consume_tile_traffic(cfg, peripherals): # insert the log records after the latest_date cursor.execute('SELECT max(date) from tile_traffic_v4') max_timestamp = cursor.fetchone()[0] - iped_dated_coords_to_insert = filter(lambda iped_dated_coord: iped_dated_coord[1] > max_timestamp, iped_dated_coords) if max_timestamp else iped_dated_coords - for (host, timestamp, marchalled_coord) in iped_dated_coords_to_insert: - coord = coord_unmarshall_int(marchalled_coord) - cursor.execute("INSERT into tile_traffic_v4 (date, z, x, y, tilesize, service, host) VALUES ('%s', %d, %d, %d, %d, '%s', '%s')" + + n_coords_inserted = 0 + for host, timestamp, coord_int in iped_dated_coords: + if not max_timestamp or timestamp > max_timestamp: + coord = coord_unmarshall_int(coord_int) + cursor.execute("INSERT into tile_traffic_v4 (date, z, x, y, tilesize, service, host) VALUES ('%s', %d, %d, %d, %d, '%s', '%s')" % (timestamp, coord.zoom, coord.column, coord.row, 512, 'vector-tiles', host)) + n_coords_inserted += 1 - logger.info('Inserted %d records' % len(iped_dated_coords_to_insert)) + logger.info('Inserted %d records' % n_coords_inserted) sql_conn_pool.put_conns([sql_conn]) diff --git a/tilequeue/utils.py b/tilequeue/utils.py index a8a629ec..a14dbda1 100644 --- a/tilequeue/utils.py +++ b/tilequeue/utils.py @@ -34,15 +34,15 @@ def parse_log_file(log_file): tile_id_pattern = '\/([\w]+)\/([\d]+)\/([\d]+)\/([\d]+)\.([\d\w]*)' log_pattern = '%s - - %s "([\w]+) %s.*' % (ip_pattern, date_pattern, tile_id_pattern) - - matches = filter( - lambda match: match and len(match.groups()) == 8, - map(lambda log_string: re.search(log_pattern, log_string), log_file) - ) - iped_dated_coords = map(lambda match: (match.group(1), - datetime.strptime(match.group(2), '%d/%B/%Y %H:%M:%S'), - coord_marshall_int(create_coord(match.group(6), match.group(7), match.group(5)))), matches) + iped_dated_coords = [] + for log_string in log_file: + match = re.search(log_pattern, log_string) + if match and len(match.groups()) == 8: + iped_dated_coords.append((match.group(1), + datetime.strptime(match.group(2), '%d/%B/%Y %H:%M:%S'), + coord_marshall_int(create_coord(match.group(6), match.group(7), match.group(5))))) + return iped_dated_coords def mimic_prune_tiles_of_interest_sql_structure(cursor): From bbfc0118baa6e222cec50901417de9cc0a9e84b4 Mon Sep 17 00:00:00 2001 From: ambientlight Date: Tue, 9 May 2017 16:15:48 +0800 Subject: [PATCH 08/13] removed conditional single quote in SQL since it both works for redshift/postgres, simplified map in delete_tiles --- tilequeue/command.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tilequeue/command.py b/tilequeue/command.py index 075c6d45..b64f6d0f 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -1058,7 +1058,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): cur.execute(""" select x, y, z, tilesize, count(*) from tile_traffic_v4 - where (date >= dateadd({opt_quote}day{opt_quote}, -{days}, getdate())) + where (date >= dateadd('day', -{days}, getdate())) and (z between 0 and {max_zoom}) and (x between 0 and pow(2,z)-1) and (y between 0 and pow(2,z)-1) @@ -1067,9 +1067,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): order by z, x, y, tilesize """.format( days=redshift_days_to_query, - max_zoom=redshift_zoom_cutoff, - # postgres replica of dateadd(check utils.py) first arg(interval kind) is a string - opt_quote="'" if is_postgres_conn_info else "" + max_zoom=redshift_zoom_cutoff )) for (x, y, z, tile_size, count) in cur: coord = create_coord(x, y, z) @@ -1163,7 +1161,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): len(toi_to_remove)) for coord_ints in grouper(toi_to_remove, 1000): - removed = store.delete_tiles(map(lambda coord_int: coord_unmarshall_int(coord_int), coord_ints), + removed = store.delete_tiles(map(coord_unmarshall_int, coord_ints), lookup_format_by_extension(store_parts['format']), store_parts['layer']) logger.info('Removed %s tiles from S3', removed) From 2ca05fef2a882553f2ed9c50353e19083ef0b07d Mon Sep 17 00:00:00 2001 From: ambientlight Date: Wed, 10 May 2017 23:21:13 +0800 Subject: [PATCH 09/13] removed unnecessary differentiation between redshift and postgres, ditched the fallback to default postgres config, renamed .yaml redshift entry to a tile-history for clarity --- config.yaml.sample | 8 ++------ tilequeue/command.py | 19 ++++++------------- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/config.yaml.sample b/config.yaml.sample index 263d016f..4f59188b 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -191,13 +191,9 @@ toi-prune: # location of tileserver logs tile-traffic-log-path: ../tileserver/nohup.out - # Connection and query configuration for a RedShift database containing + # Connection and query configuration for a database containing # request information for tiles. - redshift: - # if database-uri is not specified, postgresql configuration above is used - # which relies on following precondition: tilequeue consume-tile-traffic - # needs to be used to pre-populate the designated table with tile traffic logs - # and add date util functions for postgres/redshift compatability + tile-history: database-uri: postgresql://user:password@localhost:5439/database # The number of days of history to query for. days: 30 diff --git a/tilequeue/command.py b/tilequeue/command.py index b64f6d0f..ae1ad0f8 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -1022,22 +1022,16 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): prune_cfg = cfg.yml.get('toi-prune', {}) - redshift_cfg = prune_cfg.get('redshift', {}) - db_conn_info = redshift_cfg.get('database-uri') or cfg.postgresql_conn_info - assert db_conn_info, ("A redshift connection URI or postgres configuration must " + tile_history_cfg = prune_cfg.get('tile-history', {}) + db_conn_info = tile_history_cfg.get('database-uri') + assert db_conn_info, ("A postgres-compatible connection URI must " "be present in the config yaml") - is_postgres_conn_info = isinstance(db_conn_info, dict) - if is_postgres_conn_info: - # use first database specified in postgres config for connection - dbname = db_conn_info.pop('dbnames')[0] - db_conn_info = dict(db_conn_info, dbname=dbname) - - redshift_days_to_query = redshift_cfg.get('days') + redshift_days_to_query = tile_history_cfg.get('days') assert redshift_days_to_query, ("Number of days to query " "redshift is not specified") - redshift_zoom_cutoff = int(redshift_cfg.get('max-zoom', '16')) + redshift_zoom_cutoff = int(tile_history_cfg.get('max-zoom', '16')) # flag indicating that s3 entry in toi-prune is used for s3 store legacy_fallback = 's3' in prune_cfg @@ -1052,8 +1046,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): cfg.s3_path = store_parts['path'] redshift_results = defaultdict(int) - # db_conn_info is uri when using redshift, dict otherwise - with psycopg2.connect(**db_conn_info if is_postgres_conn_info else db_conn_info) as conn: + with psycopg2.connect(db_conn_info) as conn: with conn.cursor() as cur: cur.execute(""" select x, y, z, tilesize, count(*) From 8e4e86490899d22e654b98acf505fe47c939d44e Mon Sep 17 00:00:00 2001 From: ambientlight Date: Wed, 10 May 2017 23:38:47 +0800 Subject: [PATCH 10/13] renamed iped_dated_coords to tile_log_records --- tilequeue/command.py | 8 ++++---- tilequeue/utils.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/tilequeue/command.py b/tilequeue/command.py index ae1ad0f8..0f18ba27 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -957,11 +957,11 @@ def tilequeue_consume_tile_traffic(cfg, peripherals): logger = make_logger(cfg, 'consume_tile_traffic') logger.info('Consuming tile traffic logs ...') - iped_dated_coords = None + tile_log_records = None with open(cfg.tile_traffic_log_path, 'r') as log_file: - iped_dated_coords = parse_log_file(log_file) + tile_log_records = parse_log_file(log_file) - if not iped_dated_coords: + if not tile_log_records: logger.info("Couldn't parse log file") sys.exit(1) @@ -978,7 +978,7 @@ def tilequeue_consume_tile_traffic(cfg, peripherals): max_timestamp = cursor.fetchone()[0] n_coords_inserted = 0 - for host, timestamp, coord_int in iped_dated_coords: + for host, timestamp, coord_int in tile_log_records: if not max_timestamp or timestamp > max_timestamp: coord = coord_unmarshall_int(coord_int) cursor.execute("INSERT into tile_traffic_v4 (date, z, x, y, tilesize, service, host) VALUES ('%s', %d, %d, %d, %d, '%s', '%s')" diff --git a/tilequeue/utils.py b/tilequeue/utils.py index a14dbda1..c0caf3e0 100644 --- a/tilequeue/utils.py +++ b/tilequeue/utils.py @@ -35,15 +35,15 @@ def parse_log_file(log_file): log_pattern = '%s - - %s "([\w]+) %s.*' % (ip_pattern, date_pattern, tile_id_pattern) - iped_dated_coords = [] + tile_log_records = [] for log_string in log_file: match = re.search(log_pattern, log_string) if match and len(match.groups()) == 8: - iped_dated_coords.append((match.group(1), + tile_log_records.append((match.group(1), datetime.strptime(match.group(2), '%d/%B/%Y %H:%M:%S'), coord_marshall_int(create_coord(match.group(6), match.group(7), match.group(5))))) - return iped_dated_coords + return tile_log_records def mimic_prune_tiles_of_interest_sql_structure(cursor): cursor.execute('''CREATE TABLE IF NOT EXISTS tile_traffic_v4 ( From 2258b56e14ad44eead2f4a629b92da316b5c4cb2 Mon Sep 17 00:00:00 2001 From: ambientlight Date: Mon, 5 Jun 2017 23:48:45 +0800 Subject: [PATCH 11/13] removed postgres compatibility utilities in favour of build-in date utils supported both in postgres and redshift --- tilequeue/command.py | 6 +----- tilequeue/utils.py | 30 ------------------------------ 2 files changed, 1 insertion(+), 35 deletions(-) diff --git a/tilequeue/command.py b/tilequeue/command.py index 0f18ba27..e92639c5 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -35,8 +35,6 @@ from tilequeue.top_tiles import parse_top_tiles from tilequeue.utils import grouper from tilequeue.utils import parse_log_file -from tilequeue.utils import mimic_prune_tiles_of_interest_sql_structure -from tilequeue.utils import postgres_add_compat_date_utils from tilequeue.worker import DataFetch from tilequeue.worker import ProcessAndFormatData from tilequeue.worker import QueuePrint @@ -970,8 +968,6 @@ def tilequeue_consume_tile_traffic(cfg, peripherals): sql_conn_pool = DBAffinityConnectionsNoLimit(dbnames, conn_info, False) sql_conn = sql_conn_pool.get_conns(1)[0] with sql_conn.cursor() as cursor: - mimic_prune_tiles_of_interest_sql_structure(cursor) - postgres_add_compat_date_utils(cursor) # insert the log records after the latest_date cursor.execute('SELECT max(date) from tile_traffic_v4') @@ -1051,7 +1047,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals): cur.execute(""" select x, y, z, tilesize, count(*) from tile_traffic_v4 - where (date >= dateadd('day', -{days}, getdate())) + where (date >= (current_timestamp - interval '{days} days')) and (z between 0 and {max_zoom}) and (x between 0 and pow(2,z)-1) and (y between 0 and pow(2,z)-1) diff --git a/tilequeue/utils.py b/tilequeue/utils.py index c0caf3e0..03bceee4 100644 --- a/tilequeue/utils.py +++ b/tilequeue/utils.py @@ -44,33 +44,3 @@ def parse_log_file(log_file): coord_marshall_int(create_coord(match.group(6), match.group(7), match.group(5))))) return tile_log_records - -def mimic_prune_tiles_of_interest_sql_structure(cursor): - cursor.execute('''CREATE TABLE IF NOT EXISTS tile_traffic_v4 ( - id bigserial primary key, - date timestamp(6) not null, - z integer not null, - x integer not null, - y integer not null, - tilesize integer not null, - service varchar(32), - host inet not null - )''') - -def postgres_add_compat_date_utils(cursor): - cursor.execute(''' - CREATE OR REPLACE FUNCTION DATEADD(interval_kind VARCHAR(20), interval_offset INTEGER, dt DATE) - RETURNS TIMESTAMP AS $$ - BEGIN - RETURN (SELECT dt + (interval_offset || ' ' || interval_kind)::INTERVAL); - END; - $$ language plpgsql - ''') - cursor.execute(''' - CREATE OR REPLACE FUNCTION GETDATE() - RETURNS DATE AS $$ - BEGIN - RETURN (SELECT current_date); - END; - $$ language plpgsql - ''') From 2d1b08f89692887dd868ddac8544a4952336f545 Mon Sep 17 00:00:00 2001 From: ambientlight Date: Tue, 6 Jun 2017 00:20:26 +0800 Subject: [PATCH 12/13] resolve conflict in logging.conf.sample --- logging.conf.sample | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logging.conf.sample b/logging.conf.sample index ab154b2e..9653a315 100644 --- a/logging.conf.sample +++ b/logging.conf.sample @@ -1,5 +1,5 @@ [loggers] -keys=root,process,seed,intersect,drain,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,dump_tiles_of_interest_from_redis,load_tiles_of_interest,wof_process_neighbourhoods,query,consume_tile_traffic +keys=root,process,seed,intersect,drain,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,load_tiles_of_interest,wof_process_neighbourhoods,query,consume_tile_traffic [handlers] keys=consoleHandler From ff54142d81a28594f90e0af1b6ddc69639123cd0 Mon Sep 17 00:00:00 2001 From: ambientlight Date: Tue, 6 Jun 2017 00:45:48 +0800 Subject: [PATCH 13/13] fixed ambiguous comment --- config.yaml.sample | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yaml.sample b/config.yaml.sample index c7c49d4d..f6869f61 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -209,7 +209,7 @@ toi-prune: path: osm layer: all format: zip - # a reduced version of prev s3 entity + # layer and format of tiles to be deleted store: layer: all format: zip