Skip to content

Commit

Permalink
Refactor max_connections to max_concurrency (#7531)
Browse files Browse the repository at this point in the history
* Rename max_connections to max_concurrency

* Other non breaking changes

* comment changes
  • Loading branch information
Rakshith Bhyravabhotla authored Oct 1, 2019
1 parent 075d8a1 commit 13c9f08
Show file tree
Hide file tree
Showing 27 changed files with 207 additions and 208 deletions.
14 changes: 7 additions & 7 deletions sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def upload_blob_to_url(
blob_url, # type: str
data, # type: Union[Iterable[AnyStr], IO[AnyStr]]
overwrite=False, # type: bool
max_connections=1, # type: int
max_concurrency=1, # type: int
encoding='UTF-8', # type: str
credential=None, # type: Any
**kwargs):
Expand Down Expand Up @@ -125,22 +125,22 @@ def upload_blob_to_url(
data=data,
blob_type=BlobType.BlockBlob,
overwrite=overwrite,
max_connections=max_connections,
max_concurrency=max_concurrency,
encoding=encoding,
**kwargs)


def _download_to_stream(client, handle, max_connections, **kwargs):
def _download_to_stream(client, handle, max_concurrency, **kwargs):
"""Download data to specified open file-handle."""
stream = client.download_blob(**kwargs)
stream.download_to_stream(handle, max_connections=max_connections)
stream.download_to_stream(handle, max_concurrency=max_concurrency)


def download_blob_from_url(
blob_url, # type: str
output, # type: str
overwrite=False, # type: bool
max_connections=1, # type: int
max_concurrency=1, # type: int
credential=None, # type: Any
**kwargs):
# type: (...) -> None
Expand All @@ -166,9 +166,9 @@ def download_blob_from_url(
"""
with BlobClient(blob_url, credential=credential) as client:
if hasattr(output, 'write'):
_download_to_stream(client, output, max_connections, **kwargs)
_download_to_stream(client, output, max_concurrency, **kwargs)
else:
if not overwrite and os.path.isfile(output):
raise ValueError("The file '{}' already exists.".format(output))
with open(output, 'wb') as file_handle:
_download_to_stream(client, file_handle, max_connections, **kwargs)
_download_to_stream(client, file_handle, max_concurrency, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -383,32 +383,32 @@ def _initial_request(self):

return response

def content_as_bytes(self, max_connections=1):
def content_as_bytes(self, max_concurrency=1):
"""Download the contents of this file.
This operation is blocking until all data is downloaded.
:param int max_connections:
:param int max_concurrency:
The number of parallel connections with which to download.
:rtype: bytes
"""
stream = BytesIO()
self.download_to_stream(stream, max_connections=max_connections)
self.download_to_stream(stream, max_concurrency=max_concurrency)
return stream.getvalue()

def content_as_text(self, max_connections=1, encoding="UTF-8"):
def content_as_text(self, max_concurrency=1, encoding="UTF-8"):
"""Download the contents of this file, and decode as text.
This operation is blocking until all data is downloaded.
:param int max_connections:
:param int max_concurrency:
The number of parallel connections with which to download.
:rtype: str
"""
content = self.content_as_bytes(max_connections=max_connections)
content = self.content_as_bytes(max_concurrency=max_concurrency)
return content.decode(encoding)

def download_to_stream(self, stream, max_connections=1):
def download_to_stream(self, stream, max_concurrency=1):
"""Download the contents of this file to a stream.
:param stream:
Expand All @@ -419,7 +419,7 @@ def download_to_stream(self, stream, max_connections=1):
:rtype: Any
"""
# the stream must be seekable if parallel download is required
if max_connections > 1:
if max_concurrency > 1:
error_message = "Target stream handle must be seekable."
if sys.version_info >= (3,) and not stream.seekable():
raise ValueError(error_message)
Expand Down Expand Up @@ -447,7 +447,7 @@ def download_to_stream(self, stream, max_connections=1):
# Use the length unless it is over the end of the file
data_end = min(self.file_size, self.length + 1)

downloader_class = ParallelChunkDownloader if max_connections > 1 else SequentialChunkDownloader
downloader_class = ParallelChunkDownloader if max_concurrency > 1 else SequentialChunkDownloader
downloader = downloader_class(
service=self.service,
total_size=self.download_size,
Expand All @@ -462,9 +462,9 @@ def download_to_stream(self, stream, max_connections=1):
**self.request_options
)

if max_connections > 1:
if max_concurrency > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
executor = concurrent.futures.ThreadPoolExecutor(max_concurrency)
list(executor.map(
with_current_context(downloader.process_chunk),
downloader.get_chunk_offsets()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,32 +328,32 @@ async def _initial_request(self):
self._download_complete = True
return response

async def content_as_bytes(self, max_connections=1):
async def content_as_bytes(self, max_concurrency=1):
"""Download the contents of this file.
This operation is blocking until all data is downloaded.
:param int max_connections:
:param int max_concurrency:
The number of parallel connections with which to download.
:rtype: bytes
"""
stream = BytesIO()
await self.download_to_stream(stream, max_connections=max_connections)
await self.download_to_stream(stream, max_concurrency=max_concurrency)
return stream.getvalue()

async def content_as_text(self, max_connections=1, encoding='UTF-8'):
async def content_as_text(self, max_concurrency=1, encoding='UTF-8'):
"""Download the contents of this file, and decode as text.
This operation is blocking until all data is downloaded.
:param int max_connections:
:param int max_concurrency:
The number of parallel connections with which to download.
:rtype: str
"""
content = await self.content_as_bytes(max_connections=max_connections)
content = await self.content_as_bytes(max_concurrency=max_concurrency)
return content.decode(encoding)

async def download_to_stream(self, stream, max_connections=1):
async def download_to_stream(self, stream, max_concurrency=1):
"""Download the contents of this file to a stream.
:param stream:
Expand All @@ -367,7 +367,7 @@ async def download_to_stream(self, stream, max_connections=1):
raise ValueError("Stream is currently being iterated.")

# the stream must be seekable if parallel download is required
parallel = max_connections > 1
parallel = max_concurrency > 1
if parallel:
error_message = "Target stream handle must be seekable."
if sys.version_info >= (3,) and not stream.seekable():
Expand Down Expand Up @@ -412,7 +412,7 @@ async def download_to_stream(self, stream, max_connections=1):
dl_tasks = downloader.get_chunk_offsets()
running_futures = [
asyncio.ensure_future(downloader.process_chunk(d))
for d in islice(dl_tasks, 0, max_connections)
for d in islice(dl_tasks, 0, max_concurrency)
]
while running_futures:
# Wait for some download to finish before adding a new one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def upload_data_chunks(
uploader_class=None,
total_size=None,
chunk_size=None,
max_connections=None,
max_concurrency=None,
stream=None,
validate_content=None,
encryption_options=None,
Expand All @@ -63,7 +63,7 @@ def upload_data_chunks(
kwargs['encryptor'] = encryptor
kwargs['padder'] = padder

parallel = max_connections > 1
parallel = max_concurrency > 1
if parallel and 'modified_access_conditions' in kwargs:
# Access conditions do not work with parallelism
kwargs['modified_access_conditions'] = None
Expand All @@ -77,11 +77,11 @@ def upload_data_chunks(
validate_content=validate_content,
**kwargs)
if parallel:
executor = futures.ThreadPoolExecutor(max_connections)
executor = futures.ThreadPoolExecutor(max_concurrency)
upload_tasks = uploader.get_chunk_streams()
running_futures = [
executor.submit(with_current_context(uploader.process_chunk), u)
for u in islice(upload_tasks, 0, max_connections)
for u in islice(upload_tasks, 0, max_concurrency)
]
range_ids = _parallel_uploads(executor, uploader.process_chunk, upload_tasks, running_futures)
else:
Expand All @@ -96,10 +96,10 @@ def upload_substream_blocks(
uploader_class=None,
total_size=None,
chunk_size=None,
max_connections=None,
max_concurrency=None,
stream=None,
**kwargs):
parallel = max_connections > 1
parallel = max_concurrency > 1
if parallel and 'modified_access_conditions' in kwargs:
# Access conditions do not work with parallelism
kwargs['modified_access_conditions'] = None
Expand All @@ -112,11 +112,11 @@ def upload_substream_blocks(
**kwargs)

if parallel:
executor = futures.ThreadPoolExecutor(max_connections)
executor = futures.ThreadPoolExecutor(max_concurrency)
upload_tasks = uploader.get_substream_blocks()
running_futures = [
executor.submit(with_current_context(uploader.process_substream_block), u)
for u in islice(upload_tasks, 0, max_connections)
for u in islice(upload_tasks, 0, max_concurrency)
]
range_ids = _parallel_uploads(executor, uploader.process_substream_block, upload_tasks, running_futures)
else:
Expand Down Expand Up @@ -420,7 +420,7 @@ def read(self, n):
# or read in just enough data for the current block/sub stream
current_max_buffer_size = min(self._max_buffer_size, self._length - self._position)

# lock is only defined if max_connections > 1 (parallel uploads)
# lock is only defined if max_concurrency > 1 (parallel uploads)
if self._lock:
with self._lock:
# reposition the underlying stream to match the start of the data to read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def upload_data_chunks(
uploader_class=None,
total_size=None,
chunk_size=None,
max_connections=None,
max_concurrency=None,
stream=None,
encryption_options=None,
**kwargs):
Expand All @@ -63,7 +63,7 @@ async def upload_data_chunks(
kwargs['encryptor'] = encryptor
kwargs['padder'] = padder

parallel = max_connections > 1
parallel = max_concurrency > 1
if parallel and 'modified_access_conditions' in kwargs:
# Access conditions do not work with parallelism
kwargs['modified_access_conditions'] = None
Expand All @@ -80,7 +80,7 @@ async def upload_data_chunks(
upload_tasks = uploader.get_chunk_streams()
running_futures = [
asyncio.ensure_future(uploader.process_chunk(u))
for u in islice(upload_tasks, 0, max_connections)
for u in islice(upload_tasks, 0, max_concurrency)
]
range_ids = await _parallel_uploads(uploader.process_chunk, upload_tasks, running_futures)
else:
Expand All @@ -98,10 +98,10 @@ async def upload_substream_blocks(
uploader_class=None,
total_size=None,
chunk_size=None,
max_connections=None,
max_concurrency=None,
stream=None,
**kwargs):
parallel = max_connections > 1
parallel = max_concurrency > 1
if parallel and 'modified_access_conditions' in kwargs:
# Access conditions do not work with parallelism
kwargs['modified_access_conditions'] = None
Expand All @@ -117,7 +117,7 @@ async def upload_substream_blocks(
upload_tasks = uploader.get_substream_blocks()
running_futures = [
asyncio.ensure_future(uploader.process_substream_block(u))
for u in islice(upload_tasks, 0, max_connections)
for u in islice(upload_tasks, 0, max_concurrency)
]
range_ids = await _parallel_uploads(uploader.process_substream_block, upload_tasks, running_futures)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
overwrite=None,
headers=None,
validate_content=None,
max_connections=None,
max_concurrency=None,
blob_settings=None,
encryption_options=None,
**kwargs):
Expand Down Expand Up @@ -121,7 +121,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
uploader_class=BlockBlobChunkUploader,
total_size=length,
chunk_size=blob_settings.max_block_size,
max_connections=max_connections,
max_concurrency=max_concurrency,
stream=stream,
validate_content=validate_content,
encryption_options=encryption_options,
Expand All @@ -133,7 +133,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
uploader_class=BlockBlobChunkUploader,
total_size=length,
chunk_size=blob_settings.max_block_size,
max_connections=max_connections,
max_concurrency=max_concurrency,
stream=stream,
validate_content=validate_content,
**kwargs
Expand Down Expand Up @@ -165,7 +165,7 @@ def upload_page_blob(
overwrite=None,
headers=None,
validate_content=None,
max_connections=None,
max_concurrency=None,
blob_settings=None,
encryption_options=None,
**kwargs):
Expand Down Expand Up @@ -203,7 +203,7 @@ def upload_page_blob(
total_size=length,
chunk_size=blob_settings.max_page_size,
stream=stream,
max_connections=max_connections,
max_concurrency=max_concurrency,
validate_content=validate_content,
encryption_options=encryption_options,
**kwargs)
Expand All @@ -224,7 +224,7 @@ def upload_append_blob( # pylint: disable=unused-argument
overwrite=None,
headers=None,
validate_content=None,
max_connections=None,
max_concurrency=None,
blob_settings=None,
encryption_options=None,
**kwargs):
Expand All @@ -248,7 +248,7 @@ def upload_append_blob( # pylint: disable=unused-argument
total_size=length,
chunk_size=blob_settings.max_block_size,
stream=stream,
max_connections=max_connections,
max_concurrency=max_concurrency,
validate_content=validate_content,
append_position_access_conditions=append_conditions,
**kwargs)
Expand All @@ -274,7 +274,7 @@ def upload_append_blob( # pylint: disable=unused-argument
total_size=length,
chunk_size=blob_settings.max_block_size,
stream=stream,
max_connections=max_connections,
max_concurrency=max_concurrency,
validate_content=validate_content,
append_position_access_conditions=append_conditions,
**kwargs)
Expand Down
Loading

0 comments on commit 13c9f08

Please sign in to comment.