Skip to content

Commit

Permalink
🦉 Updates from OwlBot post-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gcf-owl-bot[bot] committed Mar 7, 2023
1 parent c2eff05 commit 54663c0
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 30 deletions.
2 changes: 1 addition & 1 deletion google/cloud/storage/transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ def download_chunks_concurrently(
with pool_class(max_workers=max_workers) as executor:
cursor = forced_start
# forced_end is zero-indexed here, so add 1
end = min(forced_end+1, blob.size) if forced_end else blob.size
end = min(forced_end + 1, blob.size) if forced_end else blob.size
while cursor < end:
start = cursor
cursor = min(cursor + chunk_size, end)
Expand Down
84 changes: 57 additions & 27 deletions tests/system/test_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def test_upload_many(shared_bucket, file_data, blobs_to_delete):
(file_data["simple"]["path"], shared_bucket.blob("simple2")),
]

results = transfer_manager.upload_many(FILE_BLOB_PAIRS, worker_type=transfer_manager.PROCESS)
results = transfer_manager.upload_many(
FILE_BLOB_PAIRS, worker_type=transfer_manager.PROCESS
)
assert results == [None, None]

blobs = shared_bucket.list_blobs()
Expand All @@ -38,13 +40,17 @@ def test_upload_many(shared_bucket, file_data, blobs_to_delete):
assert len(blobs_to_delete) == 2


def test_upload_many_with_threads_and_file_objs(shared_bucket, file_data, blobs_to_delete):
def test_upload_many_with_threads_and_file_objs(
shared_bucket, file_data, blobs_to_delete
):
FILE_BLOB_PAIRS = [
(open(file_data["simple"]["path"], "rb"), shared_bucket.blob("simple1")),
(open(file_data["simple"]["path"], "rb"), shared_bucket.blob("simple2")),
]

results = transfer_manager.upload_many(FILE_BLOB_PAIRS, worker_type=transfer_manager.THREAD)
results = transfer_manager.upload_many(
FILE_BLOB_PAIRS, worker_type=transfer_manager.THREAD
)
assert results == [None, None]

blobs = shared_bucket.list_blobs()
Expand Down Expand Up @@ -78,10 +84,15 @@ def test_upload_many_skip_if_exists(
def test_download_many(listable_bucket):
blobs = list(listable_bucket.list_blobs())
with tempfile.TemporaryDirectory() as tempdir:
filenames = [os.path.join(tempdir, "file_a.txt"), os.path.join(tempdir, "file_b.txt")]
filenames = [
os.path.join(tempdir, "file_a.txt"),
os.path.join(tempdir, "file_b.txt"),
]
BLOB_FILE_PAIRS = zip(blobs[:2], filenames)

results = transfer_manager.download_many(BLOB_FILE_PAIRS, worker_type=transfer_manager.PROCESS)
results = transfer_manager.download_many(
BLOB_FILE_PAIRS, worker_type=transfer_manager.PROCESS
)
assert results == [None, None]
for count, filename in enumerate(filenames):
with open(filename, "rb") as fp:
Expand All @@ -94,47 +105,66 @@ def test_download_many_with_threads_and_file_objs(listable_bucket):
tempfiles = [file_a, file_b]
BLOB_FILE_PAIRS = zip(blobs[:2], tempfiles)

results = transfer_manager.download_many(BLOB_FILE_PAIRS, worker_type=transfer_manager.THREAD)
results = transfer_manager.download_many(
BLOB_FILE_PAIRS, worker_type=transfer_manager.THREAD
)
assert results == [None, None]
for fp in tempfiles:
assert fp.tell() != 0


def test_download_chunks_concurrently(shared_bucket, file_data):
# Upload a big file
source_file = file_data['big']
upload_blob = shared_bucket.blob('chunky_file')
upload_blob.upload_from_filename(source_file['path'])
source_file = file_data["big"]
upload_blob = shared_bucket.blob("chunky_file")
upload_blob.upload_from_filename(source_file["path"])
upload_blob.reload()
size = upload_blob.size
chunk_size = size // 32
midpoint = size // 2

# Get a fresh blob obj w/o metadata for testing purposes
download_blob = shared_bucket.blob('chunky_file')
download_blob = shared_bucket.blob("chunky_file")

with tempfile.TemporaryDirectory() as tempdir:
full_filename = os.path.join(tempdir, 'chunky_file')
transfer_manager.download_chunks_concurrently(download_blob, full_filename, chunk_size=chunk_size)
with open(full_filename, 'rb') as file_obj:
assert _base64_md5hash(file_obj) == source_file['hash']
full_filename = os.path.join(tempdir, "chunky_file")
transfer_manager.download_chunks_concurrently(
download_blob, full_filename, chunk_size=chunk_size
)
with open(full_filename, "rb") as file_obj:
assert _base64_md5hash(file_obj) == source_file["hash"]

# Now test for case where last chunk is exactly 1 byte.
trailing_chunk_filename = os.path.join(tempdir, 'chunky_file')
transfer_manager.download_chunks_concurrently(download_blob, trailing_chunk_filename, chunk_size=size-1)
with open(trailing_chunk_filename, 'rb') as file_obj:
assert _base64_md5hash(file_obj) == source_file['hash']
trailing_chunk_filename = os.path.join(tempdir, "chunky_file")
transfer_manager.download_chunks_concurrently(
download_blob, trailing_chunk_filename, chunk_size=size - 1
)
with open(trailing_chunk_filename, "rb") as file_obj:
assert _base64_md5hash(file_obj) == source_file["hash"]

# Also test the start and end handling, and threaded mode.
first_half_filename = os.path.join(tempdir, 'chunky_file_half_a')
transfer_manager.download_chunks_concurrently(download_blob, first_half_filename, chunk_size=chunk_size, download_kwargs={'end': midpoint-1})
second_half_filename = os.path.join(tempdir, 'chunky_file_half_b')
transfer_manager.download_chunks_concurrently(download_blob, second_half_filename, chunk_size=chunk_size, download_kwargs={'start': midpoint}, worker_type=transfer_manager.THREAD)

joined_filename = os.path.join(tempdir, 'chunky_file_joined')
with open(joined_filename, 'wb') as joined, open(first_half_filename, 'rb') as half_a, open(second_half_filename, 'rb') as half_b:
first_half_filename = os.path.join(tempdir, "chunky_file_half_a")
transfer_manager.download_chunks_concurrently(
download_blob,
first_half_filename,
chunk_size=chunk_size,
download_kwargs={"end": midpoint - 1},
)
second_half_filename = os.path.join(tempdir, "chunky_file_half_b")
transfer_manager.download_chunks_concurrently(
download_blob,
second_half_filename,
chunk_size=chunk_size,
download_kwargs={"start": midpoint},
worker_type=transfer_manager.THREAD,
)

joined_filename = os.path.join(tempdir, "chunky_file_joined")
with open(joined_filename, "wb") as joined, open(
first_half_filename, "rb"
) as half_a, open(second_half_filename, "rb") as half_b:
joined.write(half_a.read())
joined.write(half_b.read())

with open(joined_filename, 'rb') as file_obj:
assert _base64_md5hash(file_obj) == source_file['hash']
with open(joined_filename, "rb") as file_obj:
assert _base64_md5hash(file_obj) == source_file["hash"]
8 changes: 6 additions & 2 deletions tests/unit/test_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,13 @@ def test__reduce_client():
def test__call_method_on_maybe_pickled_blob():
blob = mock.Mock(spec=Blob)
blob.download_to_file.return_value = "SUCCESS"
result = transfer_manager._call_method_on_maybe_pickled_blob(blob, "download_to_file")
result = transfer_manager._call_method_on_maybe_pickled_blob(
blob, "download_to_file"
)
assert result == "SUCCESS"

pickled_blob = pickle.dumps(_PickleableMockBlob())
result = transfer_manager._call_method_on_maybe_pickled_blob(pickled_blob, "download_to_file")
result = transfer_manager._call_method_on_maybe_pickled_blob(
pickled_blob, "download_to_file"
)
assert result == "SUCCESS"

0 comments on commit 54663c0

Please sign in to comment.