Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix archive test #611

Merged
merged 3 commits into from
Nov 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,17 +429,21 @@ def kcidb_archive(event, context):
# Editing window
edit_window = datetime.timedelta(days=14)
# Maximum duration of the dump transferred in a single execution
max_duration = datetime.timedelta(days=3)
# Operational database cannot have gaps of this or greater duration
max_duration = datetime.timedelta(days=7)
# Duration of each dump piece
piece_duration = datetime.timedelta(hours=12)
# Execution (monotonic) deadline
deadline_monotonic = time.monotonic() + 7 * 60

op_client = get_db_client(OPERATIONAL_DATABASE)
op_io_schema = op_client.get_schema()[1]
op_obj_list_names = set(op_io_schema.id_fields)
op_now = op_client.get_current_time()
op_first_modified = op_client.get_first_modified()
if not op_first_modified:
LOGGER.info("Operational database is empty, nothing to archive")
LOGGER.info("Operational database is empty, nothing to archive, "
"aborting")
return

# Maximum timestamp of data to archive
Expand All @@ -458,7 +462,7 @@ def kcidb_archive(event, context):
}
min_after = min(after.values())
if min_after >= max_until:
LOGGER.info("No data old enough to archive")
LOGGER.info("No data old enough to archive, aborting")
return

# Find the maximum timestamp of the data we need to fetch
Expand All @@ -468,7 +472,12 @@ def kcidb_archive(event, context):
# Transfer data in pieces which can hopefully fit in memory
# Split by time, down to microseconds, as it's our transfer atom
min_after_str = min_after.isoformat(timespec='microseconds')
first_min_after_str = min_after_str
total_count = 0
while all(t < until for t in after.values()):
if time.monotonic() >= deadline_monotonic:
LOGGER.info("Ran out of time, stopping")
break
next_after = {
n: min(max(t, min_after + piece_duration), until)
for n, t in after.items()
Expand All @@ -487,21 +496,27 @@ def kcidb_archive(event, context):
)
dump = op_client.dump(with_metadata=True,
after=after, until=next_after)
count = kcidb.io.SCHEMA.count(dump)
LOGGER.info("LOADING a dump of %u objects into archive database",
kcidb.io.SCHEMA.count(dump))
count)
ar_client.load(dump, with_metadata=True)
LOGGER.info("ARCHIVED %u objects in (%s, %s] range",
kcidb.io.SCHEMA.count(dump),
min_after_str, next_min_after_str)
count, min_after_str, next_min_after_str)
for obj_list_name in after:
LOGGER.debug("ARCHIVED %u %s",
len(dump.get(obj_list_name, [])), obj_list_name)
total_count += count
after = next_after
min_after = next_min_after
min_after_str = next_min_after_str
# Make sure we have enough memory for the next piece
dump = None
gc.collect()
else:
LOGGER.info("Completed, stopping")

LOGGER.info("ARCHIVED %u objects TOTAL in (%s, %s] range",
total_count, first_min_after_str, min_after_str)


def kcidb_purge_db(event, context):
Expand Down