Skip to content

Commit

Permalink
Summarize inputs and outputs in ContainerRun.md5, as part of #751.
Browse files Browse the repository at this point in the history
  • Loading branch information
donkirkby committed Mar 1, 2019
1 parent 61ce29f commit 39ebdf4
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 14 deletions.
15 changes: 8 additions & 7 deletions kive/container/management/commands/convert_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from django.core.management.base import BaseCommand
from django.conf import settings
from django.db import transaction
from six.moves import input

from archive.models import Run
from constants import runstates
Expand All @@ -25,10 +26,6 @@
runstates.QUARANTINED_PK: ContainerRun.CANCELLED
}

if hasattr(__builtins__, 'raw_input'):
# noinspection PyShadowingBuiltins
input = raw_input


def get_converting_pipeline_marker(container_id):
return 'Converting to container id {}.'.format(container_id)
Expand Down Expand Up @@ -170,11 +167,12 @@ def convert_runs(self, pipeline, container, batch_size):
container_run.datasets.create(argument=argument,
dataset=dataset)
self.convert_logs(run, container_run)
container_run.set_md5()
container_run.save()
if run.description:
run.description += '\n'
run.description += get_converted_run_marker(container_run.id)
run.save()
pass
print('Converted all {} runs to container id {}.'.format(
pipeline_run_count,
container.id))
Expand Down Expand Up @@ -295,7 +293,7 @@ def create_container(self, pipeline, default_parent_container):
container.copy_permissions(pipeline)
container.full_clean()
container.refresh_from_db()
container.write_content(dict(pipeline=pipeline_config))
container.write_archive_content(dict(pipeline=pipeline_config))
container.created = pipeline.revision_DateTime
container.save()

Expand Down Expand Up @@ -366,7 +364,8 @@ def choose_pipeline(self):
get_converting_pipeline_marker)
if container_id is not None:
print(pipeline)
if input('In progress, continue? [Y]/N').upper() != 'Y':
# noinspection PyCompatibility
if input('In progress, continue? [Y]/N').upper() not in ('Y', ''):
return
return pipeline

Expand All @@ -389,12 +388,14 @@ def choose_pipeline(self):
pipeline_family.name,
converted_pipelines,
total_pipelines))
# noinspection PyCompatibility
choice = int(input('Pick a pipeline family: '))
pipeline_family = pipeline_families[choice - 1]
unconverted_pipelines = family_map[pipeline_family.id]
for pipeline in unconverted_pipelines:
print('{}: {}'.format(pipeline.revision_number,
pipeline.revision_name))
# noinspection PyCompatibility
choice = int(input('Pick a pipeline revision: '))
pipeline = pipeline_family.members.get(revision_number=choice)
print(pipeline.revision_name)
Expand Down
7 changes: 5 additions & 2 deletions kive/container/management/commands/runcontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def save_outputs(self, run):
('stderr.txt', ContainerLog.STDERR)):
run.load_log(os.path.join(logs_path, file_name), log_type)

run.set_md5()
run.state = (ContainerRun.COMPLETE
if run.return_code == 0
else ContainerRun.FAILED)
Expand Down Expand Up @@ -295,7 +296,6 @@ def run_pipeline(self,
execution_args = [
"singularity",
"exec",
'--cleanenv',
"--contain",
"-B",
extracted_archive_dir + ':' + internal_binary_dir,
Expand All @@ -310,6 +310,8 @@ def run_pipeline(self,
]
all_args = [str(arg)
for arg in execution_args + input_paths + output_paths]
child_environment = {'LANG': 'en_CA.UTF-8',
'PATH': os.environ['PATH']}
command_path = os.path.join(log_path, 'step_{}_command.txt'.format(idx))
with open(command_path, 'w') as f:
f.write(' '.join(all_args))
Expand All @@ -319,7 +321,8 @@ def run_pipeline(self,
open(step_stderr_path, 'w') as step_stderr:
step_return_code = call(all_args,
stdout=step_stdout,
stderr=step_stderr)
stderr=step_stderr,
env=child_environment)
for step_path, main_file in ((step_stdout_path, standard_out),
(step_stderr_path, standard_err)):
log_size = os.stat(step_path).st_size
Expand Down
22 changes: 22 additions & 0 deletions kive/container/migrations/0019_containerrun_md5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.20 on 2019-03-01 18:58
from __future__ import unicode_literals

import django.core.validators
from django.db import migrations, models
import re


class Migration(migrations.Migration):

dependencies = [
('container', '0018_containerrun_original_run'),
]

operations = [
migrations.AddField(
model_name='containerrun',
name='md5',
field=models.CharField(blank=True, help_text="Summary of MD5's for inputs, outputs, and containers.", max_length=64, validators=[django.core.validators.RegexValidator(message='MD5 checksum is not either 32 hex characters or blank', regex=re.compile('(^[0-9A-Fa-f]{32}$)|(^$)'))]),
),
]
35 changes: 35 additions & 0 deletions kive/container/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import errno
import hashlib
import json
import logging
import os
Expand Down Expand Up @@ -916,6 +917,13 @@ class ContainerRun(Stopwatch, AccessControl):
null=True,
blank=True,
related_name="reruns")
md5 = models.CharField(
max_length=64,
validators=[RegexValidator(
regex=re.compile("(^[0-9A-Fa-f]{32}$)|(^$)"),
message="MD5 checksum is not either 32 hex characters or blank")],
blank=True,
help_text="Summary of MD5's for inputs, outputs, and containers.")

class Meta(object):
ordering = ('-submit_time',)
Expand Down Expand Up @@ -943,6 +951,12 @@ def get_rerun_name(self):
name += rerun_suffix
return name

@property
def has_changed(self):
if self.state != self.COMPLETE or self.original_run is None:
return
return self.md5 != self.original_run.md5

def get_access_limits(self, access_limits=None):
if access_limits is None:
access_limits = []
Expand Down Expand Up @@ -1190,6 +1204,27 @@ def check_slurm_state(cls, pk=None):
run.end_time = Now()
run.save()

def set_md5(self):
""" Set this run's md5. Note that this does not save the run. """
encoding = 'utf8'
md5gen = hashlib.md5()
container = self.app.container
container_md5 = container.md5.encode(encoding)
md5gen.update(container_md5)
parent_container = container.parent
if parent_container is not None:
parent_md5 = parent_container.md5.encode(encoding)
md5gen.update(parent_md5)

# Use explict sort order, so changes to default don't invalidate MD5's.
for container_dataset in self.datasets.order_by('argument__type',
'argument__position',
'argument__name'):
dataset = container_dataset.dataset
dataset_md5 = dataset.MD5_checksum.encode(encoding)
md5gen.update(dataset_md5)
self.md5 = md5gen.hexdigest()


class ContainerDataset(models.Model):
run = models.ForeignKey(ContainerRun, related_name="datasets")
Expand Down
2 changes: 2 additions & 0 deletions kive/container/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ class Meta:
'batch_name',
'batch_absolute_url',
'original_run',
'has_changed',
'app',
'app_name',
'state',
Expand Down Expand Up @@ -341,6 +342,7 @@ def create_rerun(self, original_run, user):
description=original_run.description,
priority=original_run.priority,
original_run=original_run)
rerun.copy_permissions(original_run)

reruns_needed = rerun.create_inputs_from_original_run()
dependencies = {} # {source_rerun_id: source_dependencies}
Expand Down
4 changes: 4 additions & 0 deletions kive/container/static/container/ContainerRunTable.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
}, clickRerun)
.addClass('button')
.appendTo($td);
if (run.has_changed !== null) {
$("<span>").text(run.has_changed ? " CHANGED" : " unchanged")
.appendTo($td);
}

if (run.stopped_by !== null) {
$td.append(" (Stopped by user ", $('<span>').text(run.stopped_by), ")");
Expand Down
70 changes: 69 additions & 1 deletion kive/container/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2576,6 +2576,8 @@ def test_find_rerun_input(self):
content_file = ContentFile('x,y\n1,2')
output1 = Dataset.objects.create(user=run1.user, name='output1')
output1.dataset_file.save('example.csv', content_file)
output1.set_MD5()
output1.save()
run1.datasets.create(argument=output_argument, dataset=output1)

# run2 consumes an output from run1
Expand All @@ -2584,6 +2586,15 @@ def test_find_rerun_input(self):
state=ContainerRun.FAILED)
run2.datasets.create(argument=input_argument, dataset=output1)

# run2 produces an output
output2 = Dataset.objects.create(user=run2.user, name='output2')
content_file2 = ContentFile('greeting\n')
output2.dataset_file.save('out.csv', content_file2)
output2.set_MD5()
output2.save()
run2.datasets.create(argument=output_argument, dataset=output2)
run2.set_md5()

# Purge the input to run 2.
output1.dataset_file.delete()

Expand All @@ -2595,7 +2606,63 @@ def test_find_rerun_input(self):
state=ContainerRun.COMPLETE)
output1b = Dataset.objects.create(user=run1.user, name='output1b')
output1b.dataset_file.save('example_b.csv', content_file)
output1b.set_MD5(output1b.dataset_file.path)
output1b.set_MD5()
output1b.save()
run3.datasets.create(argument=output_argument, dataset=output1b)

# run4 is a rerun of run 2, and the one we are going to execute.
run4 = ContainerRun.objects.create(user=run2.user,
app=run2.app,
original_run=run2)

call_command('runcontainer', str(run4.id))

run4.refresh_from_db()
run_dataset = run4.datasets.get(argument__type=ContainerArgument.INPUT)
self.assertEqual(output1b.id, run_dataset.dataset_id)
self.assertEqual(run2.md5, run4.md5)

def test_find_rerun_input_changes(self):
run1 = ContainerRun.objects.get(name='fixture run')
app = run1.app
input_argument = app.arguments.get(type=ContainerArgument.INPUT)
output_argument = app.arguments.get(type=ContainerArgument.OUTPUT)

content_file1 = ContentFile('x,y\n1,2')
content_file1b = ContentFile('x,y\n10,20')
output1 = Dataset.objects.create(user=run1.user, name='output1')
output1.dataset_file.save('example.csv', content_file1)
output1.set_MD5()
output1.save()
run1.datasets.create(argument=output_argument, dataset=output1)

# run2 consumes an output from run1
run2 = ContainerRun.objects.create(user=run1.user,
app=run1.app,
state=ContainerRun.FAILED)
run2.datasets.create(argument=input_argument, dataset=output1)

# run2 produces an output
output2 = Dataset.objects.create(user=run2.user, name='output2')
content_file2 = ContentFile('greeting\n')
output2.dataset_file.save('out.csv', content_file2)
output2.set_MD5()
output2.save()
run2.datasets.create(argument=output_argument, dataset=output2)
run2.set_md5()

# Purge the input to run 2.
output1.dataset_file.delete()

# run3 is a rerun of run1 to reproduce the input for run 2.
run3 = ContainerRun.objects.create(user=run1.user,
name='source rerun',
app=run1.app,
original_run=run1,
state=ContainerRun.COMPLETE)
output1b = Dataset.objects.create(user=run1.user, name='output1b')
output1b.dataset_file.save('example_b.csv', content_file1b)
output1b.set_MD5()
output1b.save()
run3.datasets.create(argument=output_argument, dataset=output1b)

Expand All @@ -2609,6 +2676,7 @@ def test_find_rerun_input(self):
run4.refresh_from_db()
run_dataset = run4.datasets.get(argument__type=ContainerArgument.INPUT)
self.assertEqual(output1b.id, run_dataset.dataset_id)
self.assertNotEqual(run2.md5, run4.md5)

def test_rerun_input_exists(self):
run1 = ContainerRun.objects.get(name='fixture run')
Expand Down
31 changes: 31 additions & 0 deletions kive/container/tests_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,37 @@ def test_remove_running(self):
r'ContainerRun id 42 is still active.'):
run.build_removal_plan()

def test_change_on_rerun(self):
run1 = ContainerRun(md5='11111111111111111111111111111111')
run2 = ContainerRun(md5='22222222222222222222222222222222',
original_run=run1,
state=ContainerRun.COMPLETE)

self.assertEqual(True, run2.has_changed)

def test_change_not_on_rerun(self):
run1 = ContainerRun(md5='11111111111111111111111111111111')
run2 = ContainerRun(md5='11111111111111111111111111111111',
original_run=run1,
state=ContainerRun.COMPLETE)

self.assertEqual(False, run2.has_changed)

def test_change_on_fail(self):
""" Only report changes on successfully completed runs. """
run1 = ContainerRun(md5='11111111111111111111111111111111')
run2 = ContainerRun(md5='11111111111111111111111111111111',
original_run=run1,
state=ContainerRun.FAILED)

self.assertIsNone(run2.has_changed)

def test_change_on_new_run(self):
run1 = ContainerRun(md5='11111111111111111111111111111111',
state=ContainerRun.COMPLETE)

self.assertIsNone(run1.has_changed)


@mocked_relations(ContainerRun, ContainerApp, ContainerArgument)
class RunContainerMockTests(TestCase):
Expand Down
12 changes: 8 additions & 4 deletions kive/librarian/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,16 +629,20 @@ def create_structure(self, compounddatatype, num_rows=-1):
structure.save()
return structure

def set_MD5(self, file_path, file_handle=None):
def set_MD5(self, file_path=None, file_handle=None):
"""Set the MD5 hash from a file.
Closes the file after the MD5 is computed.
:param str file_path: Path to file to calculate MD5 for. file_path not used if file_handle supplied.
:param file file_handle: file handle of file to calculate MD5. File must be seeked to the beginning.
If file_handle empty, then uses file_path.
:param str file_path: Path to file to calculate MD5 for.
Defaults to dataset_file.path, and not used if file_handle supplied.
:param file file_handle: file handle of file to calculate MD5. File
must be seeked to the beginning.
If file_handle empty, then uses file_path.
"""
opened_file_ourselves = False
if file_handle is None:
if file_path is None:
file_path = self.dataset_file.path
file_handle = io.open(file_path, "rb")
opened_file_ourselves = True

Expand Down

0 comments on commit 39ebdf4

Please sign in to comment.