Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not overwrite unchanged objects or re-delete #30

Merged
merged 2 commits into from
Apr 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 100 additions & 2 deletions s3-pit-restore
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,68 @@ class TestS3PitRestore(unittest.TestCase):
print("Restoring and checking for dmarker_restore test")
self.assertTrue(self.check_tree(path, content))

class TestS3PitRestoreSameBucket(unittest.TestCase):
def check_versioning(self, s3):
bucket_versioning = s3.BucketVersioning(args.bucket)
bucket_versioning.load()

print("Checking bucket versioning ... ", end='', flush=True)
self.assertNotEqual(bucket_versioning.status, None)
print("enabled!")

def test_no_op(self):
print('Running test_no_op ...')
test_content = str(uuid.uuid4())
test_key = f'test_no_op/{str(uuid.uuid4())}'

s3 = boto3.resource('s3', endpoint_url=args.endpoint_url)
self.check_versioning(s3)

print("Preparing ...")
object = s3.Object(args.bucket, test_key)
object.put(Body=test_content)
time.sleep(1)

args.prefix = test_key
args.timestamp = None
args.from_timestamp = None

print("Restoring ...")
do_restore()

print("Checking ...")
result = s3.meta.client.list_object_versions(Bucket=args.bucket, Prefix=test_key)
self.assertEqual(1, len(result['Versions']))
self.assertEqual(0, len(result.get("DeleteMarkers", [])))

def test_no_op_delete(self):
print('Running test_no_op_delete ...')
test_content = str(uuid.uuid4())
test_key = f'test_no_op_delete/{str(uuid.uuid4())}'

s3 = boto3.resource('s3', endpoint_url=args.endpoint_url)
self.check_versioning(s3)

print("Preparing ...")
object = s3.Object(args.bucket, test_key)
object.put(Body=test_content)
time.sleep(1)
object.delete()
time.sleep(1)

args.prefix = test_key
args.timestamp = None
args.from_timestamp = None

print("Restoring ...")
do_restore()

print("Checking ...")
result = s3.meta.client.list_object_versions(Bucket=args.bucket, Prefix=test_key)
self.assertEqual(1, len(result['Versions']))
self.assertEqual(1, len(result.get("DeleteMarkers", [])))


def signal_handler(signal, frame):
executor.shutdown(wait=False)
for future in list(futures.keys()):
Expand Down Expand Up @@ -293,6 +355,10 @@ def do_restore():
dest = args.dest
last_obj = {}
last_obj["Key"] = ""
# The key that was given for the latest version that was flagged with IsLatest=true.
is_latest_key = None
# The etag that was given for the latest version that was flagged with IsLatest=true.
is_latest_etag = None

if args.debug: boto3.set_stream_logger('botocore')

Expand Down Expand Up @@ -321,12 +387,16 @@ def do_restore():
deletemarkers = previous_deletemarkers + page.get("DeleteMarkers", [])
# And since they have been added, we remove them from the overflow list
previous_deletemarkers = []
dmarker = {"Key":""}
dmarker = {"Key": "", "IsLatest": False}
for obj in versions:
if last_obj["Key"] == obj["Key"]:
# We've had a newer version or a delete of this key
continue

if obj["IsLatest"]:
is_latest_key = obj["Key"]
is_latest_etag = obj["ETag"]

version_date = obj["LastModified"]

if version_date > pit_end_date or version_date < pit_start_date:
Expand All @@ -338,6 +408,9 @@ def do_restore():
# (both versions and deletemarkers list are sorted in alphabetical order of the key, and then in reverse time order for each key)
while deletemarkers and (dmarker["Key"] < obj["Key"] or (dmarker["Key"] == obj["Key"] and dmarker["LastModified"] > pit_end_date)):
dmarker = deletemarkers.pop(0)
if dmarker['IsLatest']:
# The given object is already deleted and does not have to be deleted again.
obj_needs_be_deleted.pop(dmarker["Key"], None)

#skip dmarker if it's latest than pit_end_date
if dmarker["Key"] == obj["Key"] and dmarker["LastModified"] > obj["LastModified"] and dmarker["LastModified"] <= pit_end_date:
Expand All @@ -353,7 +426,16 @@ def do_restore():

if args.dest_bucket is not None:
obj_needs_be_deleted.pop(obj["Key"], None)
handled_by_copy(obj)

# We can skip the restore if the current version is equivalent to the newest version
# of the object and if we want to restore it to the same bucket and path.
# is_latest_key == obj["Key"] also ensures that the object is not currently deleted,
# because a version with IsLatest=true was observed.
if is_latest_key != obj["Key"] or \
is_latest_etag != obj["ETag"] or \
args.bucket != args.dest_bucket or \
args.dest_prefix:
handled_by_copy(obj)
continue

if not handled_by_standard(obj):
Expand All @@ -373,6 +455,14 @@ def do_restore():
except Exception as ex:
print('"%s" %s %s %s %s "ERROR: %s"' % (obj["LastModified"], obj["VersionId"], obj["Size"], obj["StorageClass"], obj["Key"], ex), file=sys.stderr)
del(futures[future])

# Process leftover delete markers.
while previous_deletemarkers:
dmarker = previous_deletemarkers.pop(0)
if dmarker['IsLatest']:
# The given object is already deleted and does not have to be deleted again.
obj_needs_be_deleted.pop(dmarker["Key"], None)

# delete objects which came in existence after pit_end_date only if the destination bucket is same as source bucket and restoring to same object key
if args.dest_bucket == args.bucket and not args.dest_prefix:
for key in obj_needs_be_deleted:
Expand Down Expand Up @@ -425,6 +515,14 @@ if __name__=='__main__':
itersuite = unittest.TestLoader().loadTestsFromTestCase(TestS3PitRestore)
runner.run(itersuite)

# Restore in same bucket, ignoring original dest_bucket
args.dest_bucket = args.bucket
itersuite = unittest.TestLoader().loadTestsFromTestCase(TestS3PitRestore)
runner.run(itersuite)

itersuite = unittest.TestLoader().loadTestsFromTestCase(TestS3PitRestoreSameBucket)
runner.run(itersuite)

# Restore back dest_bucket state
args.dest_bucket = dest_bucket
if args.dest_bucket is not None:
Expand Down