Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use boto3 for tile store #334

Merged
merged 4 commits into from
May 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_example_coord(self):
layer = 'all'
tile_key = s3_tile_key(date_str, path, layer, coord,
json_format.extension)
self.assertEqual(tile_key, '/20160121/b707d/osm/all/8/72/105.json')
self.assertEqual(tile_key, '20160121/b707d/osm/all/8/72/105.json')

def test_no_path(self):
from tilequeue.store import s3_tile_key
Expand All @@ -81,7 +81,7 @@ def test_no_path(self):
layer = 'all'
tile_key = s3_tile_key(date_str, path, layer, coord,
json_format.extension)
self.assertEqual(tile_key, '/20160121/cfc61/all/8/72/105.json')
self.assertEqual(tile_key, '20160121/cfc61/all/8/72/105.json')


class WriteTileIfChangedTest(unittest.TestCase):
Expand Down
3 changes: 2 additions & 1 deletion tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -1979,7 +1979,8 @@ def tilequeue_batch_enqueue(cfg, args):
logger = make_logger(cfg, 'batch_enqueue')

import boto3
client = boto3.client('batch', region_name='us-east-1')
region_name = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')
client = boto3.client('batch', region_name=region_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assume that boto3 would automatically check the environment variable for us, but this certainly doesn't hurt.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too. This was my attempt to keep the default the same as before, as I think that not setting the environment variable would otherwise be an error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yea, good point. It does error out if there is no default region set.

👍


logger.info('Batch enqueue ...')

Expand Down
137 changes: 85 additions & 52 deletions tilequeue/store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# define locations to store the rendered data

from boto import connect_s3
from boto.s3.bucket import Bucket
import boto3
from botocore.exceptions import ClientError
from builtins import range
from future.utils import raise_from
import md5
Expand All @@ -12,6 +12,7 @@
import random
import threading
import time
from cStringIO import StringIO


def calc_hash(s):
Expand All @@ -32,7 +33,7 @@ def s3_tile_key(date, path, layer, coord, extension):
ext=extension,
)
md5_hash = calc_hash(path_to_hash)
s3_path = '/%(date)s/%(md5)s%(path_to_hash)s' % dict(
s3_path = '%(date)s/%(md5)s%(path_to_hash)s' % dict(
date=date,
md5=md5_hash,
path_to_hash=path_to_hash,
Expand Down Expand Up @@ -96,39 +97,60 @@ def func(*args, **kwargs):
class S3(object):

def __init__(
self, bucket, date_prefix, path, reduced_redundancy,
delete_retry_interval, logger):
self.bucket = bucket
self, s3_client, bucket_name, date_prefix, path,
reduced_redundancy, delete_retry_interval, logger,
object_acl):
self.s3_client = s3_client
self.bucket_name = bucket_name
self.date_prefix = date_prefix
self.path = path
self.reduced_redundancy = reduced_redundancy
self.delete_retry_interval = delete_retry_interval
self.logger = logger
self.object_acl = object_acl

def write_tile(self, tile_data, coord, format, layer):
key_name = s3_tile_key(
self.date_prefix, self.path, layer, coord, format.extension)
key = self.bucket.new_key(key_name)

storage_class = 'STANDARD'
if self.reduced_redundancy:
storage_class = 'REDUCED_REDUNDANCY'

@_backoff_and_retry(Exception, logger=self.logger)
def write_to_s3():
key.set_contents_from_string(
tile_data,
headers={'Content-Type': format.mimetype},
policy='public-read',
reduced_redundancy=self.reduced_redundancy,
)
try:
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=key_name,
Body=tile_data,
ContentType=format.mimetype,
ACL=self.object_acl,
StorageClass=storage_class,
)
except ClientError as e:
# it's really useful for debugging if we know exactly what
# request is failing.
raise RuntimeError(
"Error while trying to write %r to bucket %r: %s"
% (key_name, self.bucket_name, str(e)))

write_to_s3()

def read_tile(self, coord, format, layer):
key_name = s3_tile_key(
self.date_prefix, self.path, layer, coord, format.extension)
key = self.bucket.get_key(key_name)
if key is None:
return None
tile_data = key.get_contents_as_string()
return tile_data

try:
io = StringIO()
self.s3_client.download_fileobj(self.bucket_name, key_name, io)
return io.bytes()

except ClientError as e:
if e.response['Error']['Code'] != '404':
raise

return None

def delete_tiles(self, coords, format, layer):
key_names = [
Expand All @@ -138,22 +160,31 @@ def delete_tiles(self, coords, format, layer):
]

num_deleted = 0
while key_names:
del_result = self.bucket.delete_keys(key_names)
num_deleted += len(del_result.deleted)

key_names = []
for error in del_result.errors:
# retry on internal error. documentation implies that the only
# possible two errors are AccessDenied and InternalError.
# retrying when access denied seems unlikely to work, but an
# internal error might be transient.
if error.code == 'InternalError':
key_names.append(error.key)

# pause a bit to give transient errors a chance to clear.
if key_names:
time.sleep(self.delete_retry_interval)
chunk_size = 1000
for idx in xrange(0, len(key_names), chunk_size):
chunk = key_names[idx:idx+chunk_size]

while chunk:
objects = [dict(Key=k) for k in chunk]
del_result = self.s3_client.delete_objects(
Bucket=self.bucket_name,
Delete=dict(Objects=objects),
)
num_deleted += len(del_result['Deleted'])

chunk = []
for error in del_result['Errors']:
# retry on internal error. documentation implies that the
# only possible two errors are AccessDenied and
# InternalError. retrying when access denied seems
# unlikely to work, but an internal error might be
# transient.
if error['Code'] == 'InternalError':
chunk.append(error['Key'])

# pause a bit to give transient errors a chance to clear.
if chunk:
time.sleep(self.delete_retry_interval)

# make sure that we deleted all the tiles - this seems like the
# expected behaviour from the calling code.
Expand All @@ -164,11 +195,17 @@ def delete_tiles(self, coords, format, layer):

def list_tiles(self, format, layer):
ext = '.' + format.extension
for key_obj in self.bucket.list(prefix=self.date_prefix):
key = key_obj.key
coord = parse_coordinate_from_path(key, ext, layer)
if coord:
yield coord
paginator = self.s3_client.get_paginator('list_objects_v2')
page_iter = paginator.paginate(
Bucket=self.bucket_name,
Prefix=self.date_prefix
)
for page in page_iter:
for key_obj in page['Contents']:
key = key_obj['Key']
coord = parse_coordinate_from_path(key, ext, layer)
if coord:
yield coord


def make_dir_path(base_path, coord, layer):
Expand Down Expand Up @@ -352,13 +389,12 @@ def list_tiles(self, format, layer):


def make_s3_store(bucket_name,
aws_access_key_id=None, aws_secret_access_key=None,
path='osm', reduced_redundancy=False, date_prefix='',
delete_retry_interval=60, logger=None):
conn = connect_s3(aws_access_key_id, aws_secret_access_key)
bucket = Bucket(conn, bucket_name)
s3_store = S3(bucket, date_prefix, path, reduced_redundancy,
delete_retry_interval, logger)
delete_retry_interval=60, logger=None,
object_acl='public-read'):
s3 = boto3.client('s3')
s3_store = S3(s3, bucket_name, date_prefix, path, reduced_redundancy,
delete_retry_interval, logger, object_acl)
return s3_store


Expand Down Expand Up @@ -408,16 +444,13 @@ def make_store(yml, credentials={}, logger=None):
reduced_redundancy = yml.get('reduced-redundancy')
date_prefix = yml.get('date-prefix')
delete_retry_interval = yml.get('delete-retry-interval')

assert credentials, 'S3 store configured, but no AWS credentials ' \
'provided. AWS credentials are required to use S3.'
aws_access_key_id = credentials.get('aws_access_key_id')
aws_secret_access_key = credentials.get('aws_secret_access_key')
object_acl = yml.get('object-acl', 'public-read')

return make_s3_store(
bucket, aws_access_key_id, aws_secret_access_key, path=path,
bucket, path=path,
reduced_redundancy=reduced_redundancy, date_prefix=date_prefix,
delete_retry_interval=delete_retry_interval, logger=logger)
delete_retry_interval=delete_retry_interval, logger=logger,
object_acl=object_acl)

else:
raise ValueError('Unrecognized store type: `{}`'.format(store_type))