Skip to content

Commit

Permalink
Storage POC async
Browse files Browse the repository at this point in the history
  • Loading branch information
lmazuel committed Sep 19, 2019
1 parent c3e170b commit 34568b0
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ async def __aexit__(self, *args):

def _create_pipeline(self, credential, **kwargs):
# type: (Any, **Any) -> Tuple[Configuration, Pipeline]
credential_policy = None
self._credential_policy = None
if hasattr(credential, 'get_token'):
credential_policy = AsyncBearerTokenCredentialPolicy(credential, STORAGE_OAUTH_SCOPE)
self._credential_policy = AsyncBearerTokenCredentialPolicy(credential, STORAGE_OAUTH_SCOPE)
elif isinstance(credential, SharedKeyCredentialPolicy):
credential_policy = credential
self._credential_policy = credential
elif credential is not None:
raise TypeError("Unsupported credential: {}".format(credential))
config = kwargs.get('_configuration') or create_configuration(**kwargs)
Expand All @@ -76,7 +76,7 @@ def _create_pipeline(self, credential, **kwargs):
config.user_agent_policy,
StorageContentValidation(),
StorageRequestHook(**kwargs),
credential_policy,
self._credential_policy,
ContentDecodePolicy(),
AsyncRedirectPolicy(**kwargs),
StorageHosts(hosts=self._hosts, **kwargs), # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,166 @@ async def delete_blob(
timeout=timeout,
**kwargs)

@distributed_trace_async
async def delete_blobs(
self, *blobs, # type: Union[str, BlobProperties]
delete_snapshots=None, # type: Optional[str]
lease=None, # type: Optional[Union[str, LeaseClient]]
timeout=None, # type: Optional[int]
**kwargs
):
# type: (...) -> None
"""Marks the specified blobs or snapshots for deletion.
The blob is later deleted during garbage collection.
Note that in order to delete a blob, you must delete all of its
snapshots. You can delete both at the same time with the Delete
Blob operation.
If a delete retention policy is enabled for the service, then this operation soft deletes the blob or snapshot
and retains the blob or snapshot for specified number of days.
After specified number of days, blob's data is removed from the service during garbage collection.
Soft deleted blob or snapshot is accessible through List Blobs API specifying `include="deleted"` option.
Soft-deleted blob or snapshot can be restored using Undelete API.
:param blob: The blob with which to interact. If specified, this value will override
a blob value specified in the blob URL.
:type blob: str or ~azure.storage.blob.models.BlobProperties
:param str delete_snapshots:
Required if the blob has associated snapshots. Values include:
- "only": Deletes only the blobs snapshots.
- "include": Deletes the blob along with all snapshots.
:param lease:
Required if the blob has an active lease. Value can be a Lease object
or the lease ID as a string.
:type lease: ~azure.storage.blob.lease.LeaseClient or str
:param str delete_snapshots:
Required if the blob has associated snapshots. Values include:
- "only": Deletes only the blobs snapshots.
- "include": Deletes the blob along with all snapshots.
:param datetime if_modified_since:
A DateTime value. Azure expects the date value passed in to be UTC.
If timezone is included, any non-UTC datetimes will be converted to UTC.
If a date is passed in without timezone info, it is assumed to be UTC.
Specify this header to perform the operation only
if the resource has been modified since the specified time.
:param datetime if_unmodified_since:
A DateTime value. Azure expects the date value passed in to be UTC.
If timezone is included, any non-UTC datetimes will be converted to UTC.
If a date is passed in without timezone info, it is assumed to be UTC.
Specify this header to perform the operation only if
the resource has not been modified since the specified date/time.
:param str if_match:
An ETag value, or the wildcard character (*). Specify this header to perform
the operation only if the resource's ETag matches the value specified.
:param str if_none_match:
An ETag value, or the wildcard character (*). Specify this header
to perform the operation only if the resource's ETag does not match
the value specified. Specify the wildcard character (*) to perform
the operation only if the resource does not exist, and fail the
operation if it does exist.
:param int timeout:
The timeout parameter is expressed in seconds.
:rtype: None
"""
from azure.core.pipeline.transport import HttpRequest, HttpResponse, RequestsTransport
from azure.core.pipeline.policies import HeadersPolicy
from azure.core.exceptions import map_error, HttpResponseError
reqs = []
for blob in blobs:
reqs.append(HttpRequest(
"DELETE",
"/{}/{}".format(self.container_name, blob),
headers={
'Content-Length': '0'
}
))

request = self._client._client.post(
url='https://{}/?comp=batch'.format(self.primary_hostname),
headers={
'x-ms-version': "2019-02-02"
}
#params={'comp': 'batch'}
)

from wsgiref.handlers import format_date_time
from time import time
class LocalStorageHeadersPolicy(HeadersPolicy):

def on_request(self, request):
# type: (PipelineRequest, Any) -> None
super(LocalStorageHeadersPolicy, self).on_request(request)
current_time = format_date_time(time())
request.http_request.headers['x-ms-date'] = current_time

request.set_multipart_mixed(
*reqs,
policies=[
LocalStorageHeadersPolicy(),
self._credential_policy
]
)

multipart_helper = None
if request.multipart_mixed_info:
from azure.core.pipeline.transport.base import MultiPartHelper
multipart_helper = MultiPartHelper(request)
multipart_helper.prepare_request()

import aiohttp
with aiohttp.MultipartWriter('mixed') as mpwriter:
for req in reqs:
mpwriter.append(
req.serialize(),
{'CONTENT-TYPE': 'application/http', 'Content-Transfer-Encoding': 'binary' }
)
from io import BytesIO
buffer = BytesIO()

class AsyncBytesIO:
def __init__(self):
self.buffer = BytesIO()
async def write(self, *args, **kwargs):
self.buffer.write(*args, **kwargs)

async_buffer = AsyncBytesIO()
await mpwriter.write(async_buffer)

request.set_bytes_body(async_buffer.buffer.getvalue())
request.headers["Content-Type"] = "multipart/mixed; boundary="+mpwriter.boundary

pipeline_response = await self._pipeline.run(
request,
stream=True
)
response = pipeline_response.http_response

# reader = aiohttp.MultipartReader(
# response.internal_response.headers,
# response.internal_response.content,
# )
# parts = []
# while True:
# part = await reader.next()
# if part is None:
# break
# parts.append(part)

body = response.body()

try:
if response.status_code not in [202]:
raise HttpResponseError(response=response)
# This scenario to be discussed
if len([]) != len(reqs):
raise HttpResponseError(
message="Didn't receive the same number of parts",
response=response
)
except StorageErrorException as error:
process_storage_error(error)

def get_blob_client(
self, blob, # type: Union[str, BlobProperties]
snapshot=None # type: str
Expand Down
26 changes: 25 additions & 1 deletion sdk/storage/azure-storage-blob/tests/test_container_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def _create_container(self, prefix=TEST_CONTAINER_PREFIX):
return container

#--Test cases for containers -----------------------------------------

async def _test_create_container(self):
# Arrange
container_name = self._get_container_reference()
Expand Down Expand Up @@ -1193,6 +1193,30 @@ async def _test_list_blobs_with_delimiter(self):
self.assertNamedItemInContainer(resp, 'b/')
self.assertNamedItemInContainer(resp, 'blob4')

@record
def test_delete_blobs_simple(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._test_delete_blobs_simple())

async def _test_delete_blobs_simple(self):
# Arrange
container = await self._create_container()
data = b'hello world'

try:
await container.get_blob_client('blob1').upload_blob(data)
await container.get_blob_client('blob2').upload_blob(data)
await container.get_blob_client('blob3').upload_blob(data)
except:
pass

# Act
await container.delete_blobs(
'blob12',
# 'blob2',
# 'blob3',
)

@record
def test_list_blobs_with_delimiter(self):
loop = asyncio.get_event_loop()
Expand Down

0 comments on commit 34568b0

Please sign in to comment.