Skip to content

Commit

Permalink
Writing an updated pipeline to a Container with existing runs now cre…
Browse files Browse the repository at this point in the history
…ates a new Container.
  • Loading branch information
Richard Liang committed Feb 20, 2019
1 parent 435416b commit e4623f9
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 27 deletions.
84 changes: 62 additions & 22 deletions kive/container/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,61 @@ def get_content(self, add_default=True):
pipeline=pipeline)
return content

def create_new_pipeline_revision(self, tag=None, description=None):
"""
Create a new Container that is a copy of this one.
:return:
"""
new_revision = Container(
user=self.user,
family=self.family,
file_type=self.file_type,
parent=self.parent,
tag=tag,
description=description if description is not None else self.description
)
with use_field_file(self.file):
new_revision.file.save(self.file.name, self.file, save=True)
new_revision.copy_permissions(self)
return new_revision

def update_content_or_create_new_container(self, content=None):
"""
Updates the content of this archive container if possible; otherwise creates a new one.
:return: the resulting container (either this one or the new one)
"""
if content is None:
content = self.get_content()
pipeline = content['pipeline']
if not self.pipeline_valid(pipeline):
return

# Protect this with a transaction so that it fails if someone uses this archive container.
resulting_container = self
with transaction.atomic():
if self.apps.exists() and self.apps.first().has_been_used():
new_tag = content["new_tag"]
new_description = content["new_description"]
resulting_container = self.create_new_pipeline_revision(new_tag, new_description)

resulting_container.apps.all().delete()
default_config = pipeline.get('default_config',
self.DEFAULT_APP_CONFIG)
app = resulting_container.apps.create(memory=default_config['memory'],
threads=default_config['threads'])
input_names = ' '.join(entry['dataset_name']
for entry in pipeline['inputs'])
output_names = ' '.join(entry['dataset_name']
for entry in pipeline['outputs'])
app.write_inputs(input_names)
app.write_outputs(output_names)

# Write the content to the container.
resulting_container.write_content(content)
return resulting_container

def write_content(self, content):
pipeline = content['pipeline']
pipeline_json = json.dumps(pipeline)
Expand All @@ -415,7 +470,6 @@ def write_content(self, content):
archive.write(file_name, pipeline_json)
break
self.set_md5()
self.create_app_from_content(content)

def get_pipeline_state(self):
content = self.get_content(add_default=False)
Expand All @@ -426,27 +480,6 @@ def get_pipeline_state(self):
return self.VALID
return self.INCOMPLETE

def create_app_from_content(self, content=None):
""" Creat an app based on the content configuration.
:raises ValueError: if this is not an archive container
"""
if content is None:
content = self.get_content()
pipeline = content['pipeline']
if self.pipeline_valid(pipeline):
self.apps.all().delete()
default_config = pipeline.get('default_config',
self.DEFAULT_APP_CONFIG)
app = self.apps.create(memory=default_config['memory'],
threads=default_config['threads'])
input_names = ' '.join(entry['dataset_name']
for entry in pipeline['inputs'])
output_names = ' '.join(entry['dataset_name']
for entry in pipeline['outputs'])
app.write_inputs(input_names)
app.write_outputs(output_names)

@staticmethod
def pipeline_valid(pipeline):
"""
Expand Down Expand Up @@ -690,6 +723,13 @@ def remove(self):
removal_plan = self.build_removal_plan()
remove_helper(removal_plan)

def has_been_used(self):
"""
True if this app has ever been used; False otherwise.
:return:
"""
return self.runs.exists()


class ContainerArgument(models.Model):
INPUT = 'I'
Expand Down
2 changes: 1 addition & 1 deletion kive/container/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class Meta:
def create(self, validated_data):
container = super(ContainerSerializer, self).create(validated_data)
if container.file_type != Container.SIMG:
container.create_app_from_content()
container.update_content_or_create_new_container()
return container


Expand Down
71 changes: 68 additions & 3 deletions kive/container/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,13 @@ def test_write_zip_content(self):
inputs=[],
steps=[],
outputs=[]))
expected_apps_count = 0 # write_content does not create an app

container.write_content(expected_content)
content = container.get_content()

self.assertEqual(expected_content, content)
self.assertEqual(expected_apps_count, container.apps.count())

def test_write_tar_content(self):
user = User.objects.first()
Expand All @@ -247,7 +249,7 @@ def test_write_tar_content(self):
inputs=[],
steps=[],
outputs=[]))
expected_apps_count = 0 # Pipeline is incomplete, so no app created.
expected_apps_count = 0 # write_content does not create an app

container.write_content(expected_content)
content = container.get_content()
Expand Down Expand Up @@ -277,7 +279,8 @@ def test_rewrite_content(self):

self.assertEqual(expected_content, content)

def test_write_content_and_app(self):
def test_write_complete_content(self):
"""Writing a proper pipeline both updates the content and creates a new app."""
user = User.objects.first()
family = ContainerFamily.objects.create(user=user)
container = Container.objects.create(family=family, user=user)
Expand All @@ -302,7 +305,7 @@ def test_write_content_and_app(self):
expected_inputs = "in1"
expected_outputs = "out1"

container.write_content(expected_content)
container.update_content_or_create_new_container(expected_content)

self.assertEqual(expected_apps_count, container.apps.count())
app = container.apps.first()
Expand All @@ -311,6 +314,68 @@ def test_write_content_and_app(self):
self.assertEqual(expected_inputs, app.inputs)
self.assertEqual(expected_outputs, app.outputs)

def test_write_complete_content_to_container_with_existing_runs(self):
"""Writing a proper pipeline to a container that has existing runs creates a new container."""
user = User.objects.first()
family = ContainerFamily.objects.create(user=user)
container = Container.objects.create(family=family, user=user)
self.create_tar_content(container)
container.save()
updated_content = dict(
files=["bar.txt", "foo.txt"],
pipeline=dict(default_config=dict(memory=200,
threads=2),
inputs=[dict(dataset_name='in1')],
steps=[dict(driver='foo.txt',
inputs=[dict(dataset_name="in1",
source_step=0,
source_dataset_name="in1")],
outputs=["out1"])],
outputs=[dict(dataset_name="out1",
source_step=1,
source_dataset_name="out1")]))
container.update_content_or_create_new_container(updated_content)
app = container.apps.first()
app.runs.create(
name="foo",
state=ContainerRun.NEW,
user=user
)

# Now, update the content. This should create a new container.
updated_content = dict(
files=["bar.txt", "foo.txt"],
new_tag="updated",
new_description="foo",
pipeline=dict(default_config=dict(memory=200,
threads=2),
inputs=[dict(dataset_name='input1')],
steps=[dict(driver='foo.txt',
inputs=[dict(dataset_name="in1",
source_step=0,
source_dataset_name="input1")],
outputs=["out1"])],
outputs=[dict(dataset_name="output1",
source_step=1,
source_dataset_name="out1")]))
updated_container = container.update_content_or_create_new_container(updated_content)

expected_apps_count = 1
expected_memory = 200
expected_threads = 2
expected_inputs = "input1"
expected_outputs = "output1"

self.assertEqual(expected_apps_count, updated_container.apps.count())
self.assertNotEqual(updated_container.pk, container.pk)
self.assertEqual("updated", updated_container.tag)
self.assertEqual("foo", updated_container.description)
app = updated_container.apps.first()
self.assertEqual(expected_memory, app.memory)
self.assertEqual(expected_threads, app.threads)
self.assertEqual(expected_inputs, app.inputs)
self.assertEqual(expected_outputs, app.outputs)

def test_create_content_and_app(self):
user = User.objects.first()
family = ContainerFamily.objects.create(user=user)
Expand Down
2 changes: 1 addition & 1 deletion kive/container/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def form_valid(self, form):
self.object.grant_from_json(form.cleaned_data["permissions"])
self.object.validate_restrict_access([self.object.family])
if self.object.file_type != Container.SIMG:
self.object.create_app_from_content()
self.object.update_content_or_create_new_container()
return response

def get_success_url(self):
Expand Down

0 comments on commit e4623f9

Please sign in to comment.