From 1f4d12d30f169929eb256cd652a67b1e8ec934d6 Mon Sep 17 00:00:00 2001 From: don Date: Mon, 30 Jul 2018 15:48:59 -0700 Subject: [PATCH] Improve logging of dataset purges for #697. Add a sample script to generate large datasets, and fix a related bug. --- kive/file_access_utils.py | 10 +-- kive/librarian/models.py | 6 +- samplecode/dataset_generator.py | 40 ++++++++++++ samplecode/dataset_generator_client.py | 88 ++++++++++++++++++++++++++ 4 files changed, 139 insertions(+), 5 deletions(-) create mode 100644 samplecode/dataset_generator.py create mode 100644 samplecode/dataset_generator_client.py diff --git a/kive/file_access_utils.py b/kive/file_access_utils.py index be7c59fae..53f9f4572 100755 --- a/kive/file_access_utils.py +++ b/kive/file_access_utils.py @@ -146,7 +146,7 @@ def set_up_directory(directory_to_use, tolerate=False): raise ValueError("Directory \"{}\" nonempty; contains file {}".format(directory_to_use, path)) -def compute_md5(file_to_checksum): +def compute_md5(file_to_checksum, chunk_size=1024*64): """Computes MD5 checksum of specified file. file_to_checksum should be an open, readable, file handle, with @@ -156,9 +156,11 @@ def compute_md5(file_to_checksum): so that bytes (not strings) are returned when iterating over the file. """ md5gen = hashlib.md5() - for line in file_to_checksum: - md5gen.update(line) - return md5gen.hexdigest() + while True: + chunk = file_to_checksum.read(chunk_size) + if not chunk: + return md5gen.hexdigest() + md5gen.update(chunk) def file_exists(path): diff --git a/kive/librarian/models.py b/kive/librarian/models.py index 0f0b283fd..843077645 100644 --- a/kive/librarian/models.py +++ b/kive/librarian/models.py @@ -1432,7 +1432,11 @@ def purge(cls, relpath = os.path.relpath(filepath, settings.MEDIA_ROOT) total_size += filesize heapq.heappush(files, (filedate, relpath, filesize)) - if total_size >= max_storage: + if total_size < max_storage: + cls.logger.debug('Dataset purge not needed at %s over %d files.', + filesizeformat(total_size), + len(files)) + else: cls.logger.info('Dataset purge triggered at %s over %d files.', filesizeformat(total_size), len(files)) diff --git a/samplecode/dataset_generator.py b/samplecode/dataset_generator.py new file mode 100644 index 000000000..a26b71b9c --- /dev/null +++ b/samplecode/dataset_generator.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" Generates a 1GB dataset. + +Configure this as a method that is not reusable, then use +dataset_generator_client.py to submit a bunch of runs that will generate lots +of big datasets and trigger dataset purges. +""" + +import shutil +from argparse import ArgumentParser, FileType +from datetime import datetime +from subprocess import check_call + + +def parse_args(): + parser = ArgumentParser( + description='Generates a large dataset to test purging.') + + parser.add_argument( + 'header_txt', + type=FileType('rb'), + help='Text to write at the top of the output.') + parser.add_argument( + 'zeros', + type=FileType('wb'), + help='A data file that will get a bunch of zeros bytes to it.') + return parser.parse_args() + + +def main(): + print('Starting at {}.'.format(datetime.now())) + args = parse_args() + shutil.copyfileobj(args.header_txt, args.zeros) + args.zeros.flush() + check_call(['head', '-c', '1000000000', '/dev/zero'], stdout=args.zeros) + print('Finished at {}.'.format(datetime.now())) + + +main() diff --git a/samplecode/dataset_generator_client.py b/samplecode/dataset_generator_client.py new file mode 100644 index 000000000..5451980db --- /dev/null +++ b/samplecode/dataset_generator_client.py @@ -0,0 +1,88 @@ +from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter, SUPPRESS +import logging +import os +from time import sleep + +from kiveapi import KiveAPI +from requests.adapters import HTTPAdapter + +logger = logging.getLogger(__name__) + + +def parse_args(): + parser = ArgumentParser( + description="Launch runs of the dataset_generator pipeline.", + formatter_class=ArgumentDefaultsHelpFormatter) + parser.add_argument( + '--kive_server', + default=os.environ.get('KIVE_SERVER', 'http://localhost:8000'), + help='server to send runs to') + parser.add_argument( + '--kive_user', + default=os.environ.get('KIVE_USER', 'kive'), + help='user name for Kive server') + parser.add_argument( + '--kive_password', + default=SUPPRESS, + help='password for Kive server (default not shown)') + # noinspection PyTypeChecker + parser.add_argument( + '--max_active', + default=os.environ.get('KIVE_MAX_ACTIVE', '4'), + type=int, + help='number of runs active at once') + + args = parser.parse_args() + if not hasattr(args, 'kive_password'): + args.kive_password = os.environ.get('KIVE_PASSWORD', 'kive') + return args + + +def launch_if_needed(session, args, pipeline, input_dataset): + runs = session.find_runs(active=True) + active_count = 0 + for run in runs: + if run.pipeline_id != pipeline.pipeline_id: + continue + if run.raw['end_time'] is not None: + continue + active_count += 1 + while active_count < args.max_active: + run = session.run_pipeline(pipeline, + [input_dataset], + 'dataset_generator test') + logger.info('Started run %d.', run.run_id) + active_count += 1 + + +def main(): + logging.basicConfig(level=logging.INFO, + format="%(asctime)s[%(levelname)s]%(name)s:%(message)s") + logging.getLogger( + "requests.packages.urllib3.connectionpool").setLevel(logging.WARNING) + logging.info('Starting.') + + args = parse_args() + session = KiveAPI(args.kive_server) + session.mount('https://', HTTPAdapter(max_retries=20)) + session.login(args.kive_user, args.kive_password) + + runs = session.find_runs(active=True) + pipeline_id = input_id = None + for run in runs: + if 'dataset_generator' in run.raw['display_name']: + pipeline_id = run.pipeline_id + input_id = run.raw['inputs'][0]['dataset'] + break + if pipeline_id is None: + raise RuntimeError( + 'No active runs found with "dataset_generator" in the name.') + pipeline = session.get_pipeline(pipeline_id) + input_dataset = session.get_dataset(input_id) + + while True: + launch_if_needed(session, args, pipeline, input_dataset) + sleep(1) + + +main()