Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tag copy tests #600

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ markers =
bucket_encryption
checksum
cloud_transition
copy
encryption
fails_on_aws
fails_on_dbstore
Expand Down
33 changes: 33 additions & 0 deletions s3tests_boto3/functional/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5389,6 +5389,7 @@ def test_bucket_list_special_prefix():
objs_list = get_objects_list(bucket_name, prefix='_bla/')
assert len(objs_list) == 4

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_zero_size():
key = 'foo123bar'
Expand All @@ -5403,6 +5404,7 @@ def test_object_copy_zero_size():
response = client.get_object(Bucket=bucket_name, Key='bar321foo')
assert response['ContentLength'] == 0

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_16m():
bucket_name = get_new_bucket()
Expand All @@ -5416,6 +5418,7 @@ def test_object_copy_16m():
response = client.get_object(Bucket=bucket_name, Key=key2)
assert response['ContentLength'] == 16*1024*1024

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_same_bucket():
bucket_name = get_new_bucket()
Expand All @@ -5430,6 +5433,7 @@ def test_object_copy_same_bucket():
body = _get_body(response)
assert 'foo' == body

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_verify_contenttype():
bucket_name = get_new_bucket()
Expand All @@ -5448,6 +5452,7 @@ def test_object_copy_verify_contenttype():
response_content_type = response['ContentType']
assert response_content_type == content_type

@pytest.mark.copy
def test_object_copy_to_itself():
bucket_name = get_new_bucket()
client = get_client()
Expand All @@ -5460,6 +5465,7 @@ def test_object_copy_to_itself():
assert status == 400
assert error_code == 'InvalidRequest'

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_to_itself_with_metadata():
bucket_name = get_new_bucket()
Expand All @@ -5472,6 +5478,7 @@ def test_object_copy_to_itself_with_metadata():
response = client.get_object(Bucket=bucket_name, Key='foo123bar')
assert response['Metadata'] == metadata

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_diff_bucket():
bucket_name1 = get_new_bucket()
Expand All @@ -5488,6 +5495,7 @@ def test_object_copy_diff_bucket():
body = _get_body(response)
assert 'foo' == body

@pytest.mark.copy
def test_object_copy_not_owned_bucket():
client = get_client()
alt_client = get_alt_client()
Expand All @@ -5504,6 +5512,7 @@ def test_object_copy_not_owned_bucket():
status, error_code = _get_status_and_error_code(e.response)
assert status == 403

@pytest.mark.copy
def test_object_copy_not_owned_object_bucket():
client = get_client()
alt_client = get_alt_client()
Expand All @@ -5525,6 +5534,7 @@ def test_object_copy_not_owned_object_bucket():
copy_source = {'Bucket': bucket_name, 'Key': 'foo123bar'}
alt_client.copy(copy_source, bucket_name, 'bar321foo')

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_canned_acl():
bucket_name = get_new_bucket()
Expand All @@ -5545,6 +5555,7 @@ def test_object_copy_canned_acl():
# check ACL is applied by doing GET from another user
alt_client.get_object(Bucket=bucket_name, Key='foo123bar')

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_retaining_metadata():
for size in [3, 1024 * 1024]:
Expand All @@ -5564,6 +5575,7 @@ def test_object_copy_retaining_metadata():
body = _get_body(response)
assert size == response['ContentLength']

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_replacing_metadata():
for size in [3, 1024 * 1024]:
Expand All @@ -5585,6 +5597,7 @@ def test_object_copy_replacing_metadata():
assert metadata == response['Metadata']
assert size == response['ContentLength']

@pytest.mark.copy
def test_object_copy_bucket_not_found():
bucket_name = get_new_bucket()
client = get_client()
Expand All @@ -5594,6 +5607,7 @@ def test_object_copy_bucket_not_found():
status = _get_status(e.response)
assert status == 404

@pytest.mark.copy
def test_object_copy_key_not_found():
bucket_name = get_new_bucket()
client = get_client()
Expand All @@ -5603,6 +5617,7 @@ def test_object_copy_key_not_found():
status = _get_status(e.response)
assert status == 404

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_versioned_bucket():
bucket_name = get_new_bucket()
Expand Down Expand Up @@ -5667,6 +5682,7 @@ def test_object_copy_versioned_bucket():
assert data_str == body
assert size == response['ContentLength']

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_versioned_url_encoding():
bucket = get_new_bucket_resource()
Expand Down Expand Up @@ -5730,6 +5746,7 @@ def _multipart_upload(bucket_name, key, size, part_size=5*1024*1024, client=None

return (upload_id, s, parts)

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_object_copy_versioning_multipart_upload():
bucket_name = get_new_bucket()
Expand Down Expand Up @@ -5894,6 +5911,7 @@ def _check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name, ver
src_data = _get_body(response)
assert src_data == dest_data

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_multipart_copy_small():
src_key = 'foo'
Expand All @@ -5911,6 +5929,7 @@ def test_multipart_copy_small():
assert size == response['ContentLength']
_check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name)

@pytest.mark.copy
def test_multipart_copy_invalid_range():
client = get_client()
src_key = 'source'
Expand All @@ -5930,6 +5949,7 @@ def test_multipart_copy_invalid_range():
assert error_code == 'InvalidRange'


@pytest.mark.copy
# TODO: remove fails_on_rgw when https://tracker.ceph.com/issues/40795 is resolved
@pytest.mark.fails_on_rgw
def test_multipart_copy_improper_range():
Expand Down Expand Up @@ -5961,6 +5981,7 @@ def test_multipart_copy_improper_range():
assert error_code == 'InvalidArgument'


@pytest.mark.copy
def test_multipart_copy_without_range():
client = get_client()
src_key = 'source'
Expand All @@ -5986,6 +6007,7 @@ def test_multipart_copy_without_range():
assert response['ContentLength'] == 10
_check_key_content(src_key, src_bucket_name, dest_key, dest_bucket_name)

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_multipart_copy_special_names():
src_bucket_name = get_new_bucket()
Expand Down Expand Up @@ -6078,6 +6100,7 @@ def check_configure_versioning_retry(bucket_name, status, expected_string):

assert expected_string == read_status

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_multipart_copy_versioned():
src_bucket_name = get_new_bucket()
Expand Down Expand Up @@ -6165,6 +6188,7 @@ def test_multipart_upload_multiple_sizes():
(upload_id, data, parts) = _multipart_upload(bucket_name=bucket_name, key=key, size=objlen)
client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_multipart_copy_multiple_sizes():
src_key = 'foo'
Expand Down Expand Up @@ -7709,6 +7733,7 @@ def test_versioning_obj_suspend_versions():
assert len(version_ids) == 0
assert len(version_ids) == len(contents)

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_versioning_obj_suspended_copy():
bucket_name = get_new_bucket()
Expand Down Expand Up @@ -7861,6 +7886,7 @@ def test_versioning_obj_list_marker():
check_obj_content(client, bucket_name, key, version['VersionId'], contents[j])
i += 1

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_versioning_copy_obj_version():
bucket_name = get_new_bucket()
Expand Down Expand Up @@ -11533,6 +11559,7 @@ def test_bucket_policy_put_obj_tagging_existing_tag():
assert status == 403


@pytest.mark.copy
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
def test_bucket_policy_upload_part_copy():
Expand Down Expand Up @@ -11590,6 +11617,7 @@ def test_bucket_policy_upload_part_copy():
alt_client.abort_multipart_upload(Bucket=bucket_name2, Key='new_foo2', UploadId=upload_id)


@pytest.mark.copy
@pytest.mark.tagging
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
Expand Down Expand Up @@ -11637,6 +11665,7 @@ def test_bucket_policy_put_obj_copy_source():
copy_source = {'Bucket': bucket_name, 'Key': 'private/foo'}
check_access_denied(alt_client.copy_object, Bucket=bucket_name2, CopySource=copy_source, Key='new_foo2')

@pytest.mark.copy
@pytest.mark.tagging
@pytest.mark.bucket_policy
@pytest.mark.fails_on_dbstore
Expand Down Expand Up @@ -12889,6 +12918,7 @@ def test_object_lock_changing_mode_from_compliance():
assert status == 403
assert error_code == 'AccessDenied'

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_copy_object_ifmatch_good():
bucket_name = get_new_bucket()
Expand All @@ -12900,6 +12930,7 @@ def test_copy_object_ifmatch_good():
body = _get_body(response)
assert body == 'bar'

@pytest.mark.copy
# TODO: remove fails_on_rgw when https://tracker.ceph.com/issues/40808 is resolved
@pytest.mark.fails_on_rgw
def test_copy_object_ifmatch_failed():
Expand All @@ -12912,6 +12943,7 @@ def test_copy_object_ifmatch_failed():
assert status == 412
assert error_code == 'PreconditionFailed'

@pytest.mark.copy
# TODO: remove fails_on_rgw when https://tracker.ceph.com/issues/40808 is resolved
@pytest.mark.fails_on_rgw
def test_copy_object_ifnonematch_good():
Expand All @@ -12924,6 +12956,7 @@ def test_copy_object_ifnonematch_good():
assert status == 412
assert error_code == 'PreconditionFailed'

@pytest.mark.copy
@pytest.mark.fails_on_dbstore
def test_copy_object_ifnonematch_failed():
bucket_name = get_new_bucket()
Expand Down