Skip to content

Commit

Permalink
wip :: move dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
QuentinMadura committed Mar 17, 2020
1 parent 7880f35 commit 7d4984a
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 73 deletions.
4 changes: 2 additions & 2 deletions peakina/io/s3/s3_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import BinaryIO, List

from ..fetcher import Fetcher, register
from .s3_utils import S3_SCHEMES, s3_list_dir, s3_mtime, s3_open, s3_read
from .s3_utils import S3_SCHEMES, s3_list_dir, s3_mtime, s3_read


@register(schemes=S3_SCHEMES)
Expand All @@ -16,4 +16,4 @@ def listdir(s3_url, **kwargs) -> List[str]:

@staticmethod
def mtime(s3_url, **kwargs) -> int:
return s3_mtime(s3_url)
return s3_mtime(s3_url, **kwargs)
78 changes: 47 additions & 31 deletions peakina/io/s3/s3_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""This module gathers misc convenience functions to handle s3 objects"""
import tempfile
from datetime import datetime
from typing import BinaryIO, List, Optional, Tuple
from urllib.parse import unquote, urlparse

import boto3
import s3fs
from botocore.exceptions import (
BotoCoreError,
ClientError,
DataNotFoundError,
NoCredentialsError,
Expand Down Expand Up @@ -53,12 +51,13 @@ def parse_s3_url(url: str) -> Tuple[Optional[str], Optional[str], Optional[str],
return access_key, secret, urlchunks.hostname, objectname


def s3_open(self, **fetcher_kwargs) -> BinaryIO:
def s3_open(url: str, **fetcher_kwargs) -> BinaryIO:
"""opens a s3 url and returns a file-like object"""
access_key, secret, bucketname, objectname = parse_s3_url(url)
fs = s3fs.S3FileSystem(
key=access_key, secret=secret, client_kwargs=fetcher_kwargs.get('client_kwargs')
anon=False, key=access_key, secret=secret, client_kwargs=fetcher_kwargs.get('client_kwargs')
)
ls = fs.ls(bucketname)
ret = tempfile.NamedTemporaryFile(suffix='.s3tmp')
file = fs.open(f'{bucketname}/{objectname}')
ret.write(file.read())
Expand All @@ -67,10 +66,15 @@ def s3_open(self, **fetcher_kwargs) -> BinaryIO:


def create_s3_client(**kwargs):
# Create s3_client:
# aws_access_key_id :: The access key for your AWS account :: str
# aws_secret_access_key :: The secret key for your AWS account :: str,
# More informations: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
"""
Create s3_client
Return:
S3 client :: a low-level client representing Amazon Simple Storage Service (S3)
More informations on how to configure client:
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
"""
try:
session = boto3.session.Session()
s3_client = session.client(service_name='s3', **kwargs,)
Expand All @@ -80,22 +84,23 @@ def create_s3_client(**kwargs):


def s3_read(url: str, **kwargs) -> BinaryIO:
## Read a file from a s3 path
# Parameters:
# url :: s3 path
# kwargs:
"""
Read a file from a s3 path
Arguments:
url :: s3 path
Return:
file :: object data :: BinaryIO
"""

aws_access_key, aws_secret, bucket_name, key = parse_s3_url(url)
assert key, f"Cannot retrieve file without an empty path/key"
s3 = create_s3_client(
aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret, **kwargs,
aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret, **kwargs
)

## Get object from a bucket
# Bucket :: Amazon bucket name :: str
# Key :: Retrieve the key/key_path or object / object_name:: str
### More informations:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object
try:
response = s3.get_object(Bucket=bucket_name, Key=key)
response_body = response["Body"]
Expand All @@ -105,31 +110,42 @@ def s3_read(url: str, **kwargs) -> BinaryIO:


def s3_list_dir(url: str, **kwargs) -> List:
"""
List content on a s3 bucket
Arguments:
url :: s3 path
Return:
list_dir :: list of objects :: List
"""
aws_access_key, aws_secret, bucket_name, object_name = parse_s3_url(url)
s3 = create_s3_client(**kwargs)
try:
response = s3.list_objects(Bucket=bucket_name)
except DataNotFoundError as e:
print(e)
return response['Contents']
return [file['Key'] for file in response['Contents']]


def s3_mtime(url: str, **kwargs) -> int:
## Get the last modification of a S3 file
# url :: A S3 URL looks like s3://aws_key:aws_secret@bucketname/objectname where credentials are optional
# key :: Retrieve the key/key_path or object / object_name:: str
### More informations
"""
Get the last modification of a S3 file
Arguments:
url :: s3 path :: str
key :: Retrieve the 'keypath' or 'object' :: str
Return:
get object and return his last date of modification :: int
More informations:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object
"""
aws_access_key, aws_secret, bucket_name, key = parse_s3_url(url)
assert (
key
), f"Cannot retrieve file without an empty key in the url. E.G: 's3://aws_key:aws_secret@bucketname/key' "
assert key, f"Cannot retrieve file without an empty key in the url."
s3 = create_s3_client(**kwargs)

## Get object and return his last date of modification
# Bucket :: Amazon bucket name :: str
# Key :: Retrieve the key/key_path or object / object_name in the bucket :: str
### More informations:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object
try:
response = s3.get_object(Bucket=bucket_name, Key=key)
except DataNotFoundError as e:
Expand Down
3 changes: 1 addition & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ packages = find:
install_requires =
certifi
chardet
docker
fastparquet
jq
pandas
paramiko
pydantic
python-slugify
pyyaml
s3fs
tables
urllib3
Expand All @@ -46,6 +44,7 @@ test =
isort
mypy
pytest
pyyaml
pytest-cases
pytest-cov
pytest-isort
Expand Down
8 changes: 8 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ s3:
volumes:
## Mounting volume with /fixtures files
- ./fixtures/:/usr/src/app/localData/

s3-latest:
image: zenko/cloudserver
ports:
- 8000:8000
environment:
- AWS_ACCESS_KEY_ID=newAccessKey
- AWS_SECRET_ACCESS_KEY=newSecretKey
82 changes: 50 additions & 32 deletions tests/io/s3/test_s3_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,46 @@
from collections import namedtuple
from contextlib import suppress
from typing import BinaryIO

import boto3
from pytest import raises

import peakina as pk
from peakina.io.s3.s3_fetcher import S3Fetcher
from peakina.io.s3.s3_utils import (
create_s3_client,
parse_s3_url,
s3_bucket,
s3_container,
s3_list_dir,
s3_mtime,
s3_open,
s3_read,
)
from tests.io.s3.test_s3_utils import s3_bucket, s3_container
from peakina.io.s3.s3_utils import s3_list_dir, s3_mtime, s3_read, s3_open


def test_s3_read(mocker, s3_bucket):
# it should read the content of a file
file = s3_read(
url='s3://newAccessKey:newSecretKey@mybucket/0_0.csv', endpoint_url=s3_bucket._endpoint.host
)
file == b'a,b\n0,0\n0,1'
assert file == b'a,b\n0,0\n0,1'


def test_s3_list_dir(mocker, s3_bucket):
## it should return the list of elements presents on the s3_bucket
directory = s3_list_dir(
url='s3://newAccessKey:newSecretKey@mybucket', endpoint_url=s3_bucket._endpoint.host
)
assert len(directory) >= 13
assert [
'0_0.csv',
'0_0_sep.csv',
'0_1.csv',
'empty.csv',
'fixture-1.csv',
'fixture-1.staging.csv',
'fixture-2.csv',
'fixture.csv',
'fixture_empty.csv',
'fixturesep.csv',
'latin_1.csv',
'latin_1_sep.csv',
'sep_parse_error.csv',
] == directory


def test_s3_mtime(mocker, s3_bucket):
file_timestamp = s3_mtime(
# it should return the last modification date of a file in timestamp format
file_mtime = s3_mtime(
url='s3://newAccessKey:newSecretKey@mybucket/0_0.csv', endpoint_url=s3_bucket._endpoint.host
)
assert isinstance(file_timestamp, int)
assert isinstance(file_mtime, int)


def test_s3_fetcher(mocker, s3_bucket):
Expand All @@ -46,24 +49,39 @@ def test_s3_fetcher(mocker, s3_bucket):
file = s3_read(
url='s3://newAccessKey:newSecretKey@mybucket/0_0.csv', endpoint_url=s3_bucket._endpoint.host
)
breakpoint()
assert file == b'a,b\n0,0\n0,1'
directory = S3Fetcher.listdir(dirpath, endpoint_url=s3_bucket._endpoint.host)
timestamp = S3Fetcher.mtime(filepath, endpoint_url=s3_bucket._endpoint.host)
assert [
'0_0.csv',
'0_0_sep.csv',
'0_1.csv',
'empty.csv',
'fixture-1.csv',
'fixture-1.staging.csv',
'fixture-2.csv',
'fixture.csv',
'fixture_empty.csv',
'fixturesep.csv',
'latin_1.csv',
'latin_1_sep.csv',
'sep_parse_error.csv',
] == directory

file_mtime = S3Fetcher.mtime(filepath, endpoint_url=s3_bucket._endpoint.host)
assert isinstance(file_mtime, int)

def test_s3_fetcher_custom_endpoint(mocker):

m = mocker.patch('peakina.io.s3.s3_utils.s3fs.S3FileSystem')
def test_s3_fetcher_custom_endpoint(mocker, s3_bucket):
dirpath = 's3://newAccessKey:newSecretKey@mybucket'
filepath = f'{dirpath}/0_0.csv'
file = s3_open(
url='s3://newAccessKey:newSecretKey@mybucket/0_0.csv',
client_kwargs={'endpoint_url': s3_bucket._endpoint.host},
)

dirpath = 's3://aws_key:aws_secret@bucketname'
filepath = f'{dirpath}/objectname.csv'
with suppress(TypeError):
pk.read_pandas(
filepath,
fetcher_kwargs={"client_kwargs": {"endpoint_url": "https://internal.domain:8080/truc"}},
)
pk.read_pandas(uri=filepath, fetcher_kwargs={'endpoint_url': s3_bucket._endpoint.host})

kwargs = m.call_args[1]
kwargs = file.call_args[1]
assert 'key' in kwargs
assert 'secret' in kwargs
assert 'client_kwargs' in kwargs
Expand Down
13 changes: 7 additions & 6 deletions tests/io/s3/test_s3_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import io
import os
from collections import namedtuple

import boto3
import s3fs
from pytest import fixture, raises

from peakina.io.s3.s3_utils import create_s3_client, parse_s3_url as pu, s3_open
from peakina.io.s3.s3_utils import create_s3_client, parse_s3_url as pu

_BUCKET_NAME = 'mybucket'
_PREFIX = 'localData/'
Expand Down Expand Up @@ -52,7 +48,7 @@ def test_empty_object_name_raise_exception():
pu('s3://a/')


def test_s3_container(mocker, s3_bucket):
def test_s3_bucket(mocker, s3_bucket):
s3_list_objects = s3_bucket.list_objects(Bucket='mybucket')
assert s3_list_objects['ResponseMetadata']['HTTPStatusCode'] == 200
s3_object = s3_bucket.get_object(Bucket='mybucket', Key='0_0.csv')
Expand All @@ -68,6 +64,11 @@ def s3_container(service_container):

@fixture(scope='module')
def s3_bucket(s3_container):
"""
Create an Amazon S3 bucket with a S3 client
Arguments:
s3_container -- docker service container
"""
s3_bucket = create_s3_client(
aws_access_key_id='newAccessKey',
aws_secret_access_key='newSecretKey',
Expand Down

0 comments on commit 7d4984a

Please sign in to comment.