Skip to content

Commit

Permalink
feat(Add ability to save new delta from calibrating labware): Add abi…
Browse files Browse the repository at this point in the history
…lity to save new delta from cal

2269, 2270
  • Loading branch information
Laura-Danielle committed Oct 17, 2018
1 parent fc809e2 commit b636179
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 17 deletions.
116 changes: 100 additions & 16 deletions api/src/opentrons/protocol_api/labware.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
"""This module will replace Placeable"""
import re
import os
import json
import time
from typing import List, Dict
from enum import Enum, auto
from opentrons.types import Point
from opentrons.util.environment import PI_DATA_PATH
from opentrons.util import environment
from collections import defaultdict


Expand All @@ -16,6 +21,12 @@ class WellShape(Enum):
'circular': WellShape.CIRCULAR
}

if os.environ.get('RUNNING_ON_PI'):
presistentPath = os.path.join(PI_DATA_PATH, 'offsets')
else:
app_dir = environment.get_path('APP_DATA_DIR')
persistentPath = os.path.join(app_dir, 'offsets')


class Well:
def __init__(self, well_props: dict, parent: Point) -> None:
Expand Down Expand Up @@ -118,16 +129,48 @@ class Labware:
provides methods for accessing wells within the labware.
"""
def __init__(self, definition: dict, parent: Point) -> None:
self._calibrated_offset: Point = Point(0, 0, 0)
self._wells: List[Well] = []
# Directly from definition
self._well_definition = definition['wells']
self._id = definition['otId']
self._parameters = definition['parameters']
offset = definition['cornerOffsetFromSlot']
# Inferred from definition
self._ordering = [well
for col in definition['ordering']
for well in col]
self._wells = definition['wells']
offset = definition['cornerOffsetFromSlot']
self._offset = Point(x=offset['x'] + parent.x,
y=offset['y'] + parent.y,
z=offset['z'] + parent.z)
# Applied properties
self.set_calibration(Point(0, 0, 0))
self._pattern = re.compile(r'^([A-Z]+)([1-9][0-9]*)$', re.X)

def _build_wells(self) -> List[Well]:
"""
This function is used to create one instance of wells to be used by all
accessor functions. It is only called again if a new offset needs
to be applied.
"""
return [Well(self._well_definition[well], self._calibrated_offset)
for well in self._ordering]

def _create_indexed_dictionary(self, group=0):
dictList = defaultdict(list)
for index, wellObj in zip(self._ordering, self._wells):
dictList[self._pattern.match(index).group(group)].append(wellObj)
return dictList

def set_calibration(self, delta: Point):
"""
Called by save calibration in order to update the offset on the object.
"""
self._calibrated_offset = Point(x=self._offset.x + delta.x,
y=self._offset.y + delta.y,
z=self._offset.z + delta.z)
self._wells = self._build_wells()

def wells(self) -> List[Well]:
"""
Accessor function used to generate a list of wells in top -> down,
Expand All @@ -139,8 +182,7 @@ def wells(self) -> List[Well]:
:return: Ordered list of all wells in a labware
"""
return [Well(self._wells[well], self._offset)
for well in self._ordering]
return self._wells

def wells_by_index(self) -> Dict[str, Well]:
"""
Expand All @@ -152,8 +194,8 @@ def wells_by_index(self) -> Dict[str, Well]:
:return: Dictionary of well objects keyed by well name
"""
return {well: Well(self._wells[well], self._offset)
for well in self._ordering}
return {well: wellObj
for well, wellObj in zip(self._ordering, self._wells)}

def rows(self) -> List[List[Well]]:
"""
Expand Down Expand Up @@ -211,12 +253,52 @@ def columns_by_index(self) -> Dict[str, List[Well]]:
colDict = self._create_indexed_dictionary(group=2)
return colDict

def _create_indexed_dictionary(self, group=0):
dictList = defaultdict(list)
for well in self._ordering:
wellObj = Well(self._wells[well], self._offset)
dictList[self._pattern.match(well).group(group)].append(wellObj)
return dictList

def save_calibration(labware: Labware, delta: Point):
"""
Function to be used whenever an updated delta is found for the first well
of a given labware. If an offset file does not exist, create the file
using labware id as the filename. If the file does exist, load it and
modify the delta and the lastModified field.
"""
if not os.path.exists(persistentPath):
os.mkdir(persistentPath)
labwareOffsetPath = os.path.join(
persistentPath, "{}.json".format(labware._id))
if not os.path.exists(labwareOffsetPath):
schema = {
"default": {
"offset": [delta.x, delta.y, delta.z],
"lastModified": time.time()
}
}

with open(labwareOffsetPath, 'w') as f:
json.dump(schema, f)
else:
with open(labwareOffsetPath, 'r') as f:
schema = json.load(f)
schema['default']['offset'] = [delta.x, delta.y, delta.z]
schema['default']['lastModified'] = time.time()
with open(labwareOffsetPath, 'w') as f:
json.dump(schema, f)

labware.set_calibration(delta)


def load_calibration(labware: Labware):
"""
Look up a calibration if it exists and apply it to the given labware.
"""
offset = Point(0, 0, 0)
labwareOffsetPath = os.path.join(
persistentPath, "{}.json".format(labware._id))
if os.path.exists(labwareOffsetPath):
with open(labwareOffsetPath) as f:
schema = json.load(f)
offsetArray = schema['default']['offset']
offset = Point(x=offsetArray[0], y=offsetArray[1], z=offsetArray[2])
labware.set_calibration(offset)


def _load_definition_by_name(name: str) -> dict:
Expand All @@ -228,18 +310,20 @@ def _load_definition_by_name(name: str) -> dict:
raise NotImplementedError


def load(name: str, ll_at: Point) -> Labware:
def load(name: str, slot: Point) -> Labware:
"""
Return a labware object constructed from a labware definition dict looked
up by name (definition must have been previously stored locally on the
robot)
"""
definition = _load_definition_by_name(name)
return load_from_definition(definition, ll_at)
labware = load_from_definition(definition, slot)
load_calibration(labware)
return labware


def load_from_definition(definition: dict, ll_at: Point) -> Labware:
def load_from_definition(definition: dict, slot: Point) -> Labware:
"""
Return a labware object constructed from a provided labware definition dict
"""
return Labware(definition, ll_at)
return Labware(definition, slot)
10 changes: 9 additions & 1 deletion api/tests/opentrons/protocol_api/test_accessor_fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
"y": 10,
"z": 5
},
"otId": "minimalLabwareDef",
"parameters": {
"isTiprack": False,
},
"ordering": [["A1"], ["A2"]],
"wells": {
"A1": {
Expand Down Expand Up @@ -36,6 +40,10 @@
"y": 10,
"z": 5
},
"otId": "minimalLabwareDef2",
"parameters": {
"isTiprack": False,
},
"ordering": [["A1", "B1", "C1"], ["A2", "B2", "C2"]],
"wells": {
"A1": {
Expand Down Expand Up @@ -101,7 +109,7 @@ def test_labware_init():
fakeLabware = labware.Labware(minimalLabwareDef, deck)
ordering = [well for col in minimalLabwareDef['ordering'] for well in col]
assert fakeLabware._ordering == ordering
assert fakeLabware._wells == minimalLabwareDef['wells']
assert fakeLabware._well_definition == minimalLabwareDef['wells']
assert fakeLabware._offset == Point(x=10, y=10, z=5)


Expand Down
93 changes: 93 additions & 0 deletions api/tests/opentrons/protocol_api/test_offsets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import tempfile
import pytest
import os
import json
import time
from opentrons.protocol_api import labware
from opentrons.types import Point

minimalLabwareDef = {
"cornerOffsetFromSlot": {
"x": 10,
"y": 10,
"z": 5
},
"otId": "minimalLabwareDef",
"parameters": {
"isTiprack": False,
},
"ordering": [["A1"], ["A2"]],
"wells": {
"A1": {
"depth": 40,
"totalLiquidVolume": 100,
"diameter": 30,
"x": 0,
"y": 0,
"z": 0,
"shape": "circular"
},
"A2": {
"depth": 40,
"totalLiquidVolume": 100,
"diameter": 30,
"x": 10,
"y": 0,
"z": 0,
"shape": "circular"
}
}
}

tmpdir = tempfile.mkdtemp("offsets")
labware.persistentPath = tmpdir
testLabware = labware.Labware(minimalLabwareDef, Point(0, 0, 0))
path = os.path.join(labware.persistentPath, "{}.json".format(testLabware._id))
global testPoint


@pytest.fixture
def patch_calibration(monkeypatch):
def fake_set_calibration(delta: Point):
global testPoint
testPoint = delta
monkeypatch.setattr(testLabware, 'set_calibration', fake_set_calibration)


def test_save_calibration(patch_calibration):
# Test the save calibration file
assert not os.path.exists(path)
labware.save_calibration(testLabware, Point(1, 1, 1))
assert os.path.exists(path)
global testPoint
testPoint == Point(1, 1, 1)


def test_schema_shape(patch_calibration, monkeypatch):
assert os.path.exists(path)

def fake_time():
return 1
monkeypatch.setattr(time, 'time', fake_time)
labware.save_calibration(testLabware, Point(1, 1, 1))
expected = {"default": {"offset": [1, 1, 1], "lastModified": 1}}
with open(path) as f:
result = json.load(f)
assert result == expected


def test_load_calibration(patch_calibration):
labware.load_calibration(testLabware)
global testPoint
testPoint == Point(1, 1, 1)


def test_wells_rebuilt_with_offset():
old_wells = testLabware._wells
assert testLabware._offset == Point(10, 10, 5)
assert testLabware._calibrated_offset == Point(10, 10, 5)
labware.save_calibration(testLabware, Point(2, 2, 2))
new_wells = testLabware._wells
assert old_wells[0] != new_wells[0]
assert testLabware._offset == Point(10, 10, 5)
assert testLabware._calibrated_offset == Point(12, 12, 7)

0 comments on commit b636179

Please sign in to comment.