Skip to content

Commit

Permalink
Improve logging of dataset purges for #697.
Browse files Browse the repository at this point in the history
Add a sample script to generate large datasets, and fix a related bug.
  • Loading branch information
donkirkby committed Jul 30, 2018
1 parent b2aada7 commit 1f4d12d
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 5 deletions.
10 changes: 6 additions & 4 deletions kive/file_access_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def set_up_directory(directory_to_use, tolerate=False):
raise ValueError("Directory \"{}\" nonempty; contains file {}".format(directory_to_use, path))


def compute_md5(file_to_checksum):
def compute_md5(file_to_checksum, chunk_size=1024*64):
"""Computes MD5 checksum of specified file.
file_to_checksum should be an open, readable, file handle, with
Expand All @@ -156,9 +156,11 @@ def compute_md5(file_to_checksum):
so that bytes (not strings) are returned when iterating over the file.
"""
md5gen = hashlib.md5()
for line in file_to_checksum:
md5gen.update(line)
return md5gen.hexdigest()
while True:
chunk = file_to_checksum.read(chunk_size)
if not chunk:
return md5gen.hexdigest()
md5gen.update(chunk)


def file_exists(path):
Expand Down
6 changes: 5 additions & 1 deletion kive/librarian/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,11 @@ def purge(cls,
relpath = os.path.relpath(filepath, settings.MEDIA_ROOT)
total_size += filesize
heapq.heappush(files, (filedate, relpath, filesize))
if total_size >= max_storage:
if total_size < max_storage:
cls.logger.debug('Dataset purge not needed at %s over %d files.',
filesizeformat(total_size),
len(files))
else:
cls.logger.info('Dataset purge triggered at %s over %d files.',
filesizeformat(total_size),
len(files))
Expand Down
40 changes: 40 additions & 0 deletions samplecode/dataset_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python3

""" Generates a 1GB dataset.
Configure this as a method that is not reusable, then use
dataset_generator_client.py to submit a bunch of runs that will generate lots
of big datasets and trigger dataset purges.
"""

import shutil
from argparse import ArgumentParser, FileType
from datetime import datetime
from subprocess import check_call


def parse_args():
parser = ArgumentParser(
description='Generates a large dataset to test purging.')

parser.add_argument(
'header_txt',
type=FileType('rb'),
help='Text to write at the top of the output.')
parser.add_argument(
'zeros',
type=FileType('wb'),
help='A data file that will get a bunch of zeros bytes to it.')
return parser.parse_args()


def main():
print('Starting at {}.'.format(datetime.now()))
args = parse_args()
shutil.copyfileobj(args.header_txt, args.zeros)
args.zeros.flush()
check_call(['head', '-c', '1000000000', '/dev/zero'], stdout=args.zeros)
print('Finished at {}.'.format(datetime.now()))


main()
88 changes: 88 additions & 0 deletions samplecode/dataset_generator_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter, SUPPRESS
import logging
import os
from time import sleep

from kiveapi import KiveAPI
from requests.adapters import HTTPAdapter

logger = logging.getLogger(__name__)


def parse_args():
parser = ArgumentParser(
description="Launch runs of the dataset_generator pipeline.",
formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument(
'--kive_server',
default=os.environ.get('KIVE_SERVER', 'http://localhost:8000'),
help='server to send runs to')
parser.add_argument(
'--kive_user',
default=os.environ.get('KIVE_USER', 'kive'),
help='user name for Kive server')
parser.add_argument(
'--kive_password',
default=SUPPRESS,
help='password for Kive server (default not shown)')
# noinspection PyTypeChecker
parser.add_argument(
'--max_active',
default=os.environ.get('KIVE_MAX_ACTIVE', '4'),
type=int,
help='number of runs active at once')

args = parser.parse_args()
if not hasattr(args, 'kive_password'):
args.kive_password = os.environ.get('KIVE_PASSWORD', 'kive')
return args


def launch_if_needed(session, args, pipeline, input_dataset):
runs = session.find_runs(active=True)
active_count = 0
for run in runs:
if run.pipeline_id != pipeline.pipeline_id:
continue
if run.raw['end_time'] is not None:
continue
active_count += 1
while active_count < args.max_active:
run = session.run_pipeline(pipeline,
[input_dataset],
'dataset_generator test')
logger.info('Started run %d.', run.run_id)
active_count += 1


def main():
logging.basicConfig(level=logging.INFO,
format="%(asctime)s[%(levelname)s]%(name)s:%(message)s")
logging.getLogger(
"requests.packages.urllib3.connectionpool").setLevel(logging.WARNING)
logging.info('Starting.')

args = parse_args()
session = KiveAPI(args.kive_server)
session.mount('https://', HTTPAdapter(max_retries=20))
session.login(args.kive_user, args.kive_password)

runs = session.find_runs(active=True)
pipeline_id = input_id = None
for run in runs:
if 'dataset_generator' in run.raw['display_name']:
pipeline_id = run.pipeline_id
input_id = run.raw['inputs'][0]['dataset']
break
if pipeline_id is None:
raise RuntimeError(
'No active runs found with "dataset_generator" in the name.')
pipeline = session.get_pipeline(pipeline_id)
input_dataset = session.get_dataset(input_id)

while True:
launch_if_needed(session, args, pipeline, input_dataset)
sleep(1)


main()

0 comments on commit 1f4d12d

Please sign in to comment.