Skip to content

Commit

Permalink
API: consolidate motion functions and fix some small design issues (#…
Browse files Browse the repository at this point in the history
…2606)

* refactor(api): aspirate and dispense can be given wells as position

You can now do aspirate(2.0, lw.wells()[0]) instead of aspirate(2.0, lw.wells()[0].top()).

* refactor(api): hardware_control handles retracting the last mount

The protocol api now always refreshes its location cache with the gantry
position for the specified mount to catch moves made either automatically by the
hardware control (like a retract) or a user.
  • Loading branch information
sfoster1 authored Nov 5, 2018
1 parent 7c13891 commit c33e1a8
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 76 deletions.
22 changes: 22 additions & 0 deletions api/src/opentrons/hardware_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(self,
top_types.Mount.RIGHT: None
}
self._attached_modules: Dict[str, Any] = {}
self._last_moved_mount: Optional[top_types.Mount] = None

@classmethod
def build_hardware_controller(
Expand Down Expand Up @@ -292,6 +293,8 @@ def current_position(self, mount: top_types.Mount) -> Dict[Axis, float]:
This returns cached position to avoid hitting the smoothie driver
unless ``refresh`` is ``True``.
"""
if not self._current_position:
raise MustHomeError
if mount == mount.RIGHT:
offset = top_types.Point(0, 0, 0)
else:
Expand Down Expand Up @@ -340,6 +343,9 @@ async def move_to(
"""
if not self._current_position:
raise MustHomeError

await self._cache_and_maybe_retract_mount(mount)

z_axis = Axis.by_mount(mount)
if mount == top_types.Mount.LEFT:
offset = top_types.Point(*self.config.mount_offset)
Expand All @@ -363,6 +369,9 @@ async def move_rel(self, mount: top_types.Mount, delta: top_types.Point,
"""
if not self._current_position:
raise MustHomeError

await self._cache_and_maybe_retract_mount(mount)

z_axis = Axis.by_mount(mount)
try:
target_position = OrderedDict(
Expand All @@ -377,8 +386,21 @@ async def move_rel(self, mount: top_types.Mount, delta: top_types.Point,
raise MustHomeError
await self._move(target_position, speed=speed)

async def _cache_and_maybe_retract_mount(self, mount: top_types.Mount):
""" Retract the 'other' mount if necessary
If `mount` does not match the value in :py:attr:`_last_moved_mount`
(and :py:attr:`_last_moved_mount` exists) then retract the mount
in :py:attr:`_last_moved_mount`. Also unconditionally update
:py:attr:`_last_moved_mount` to contain `mount`.
"""
if mount != self._last_moved_mount and self._last_moved_mount:
await self.retract(self._last_moved_mount, 10)
self._last_moved_mount = mount

async def _move_plunger(self, mount: top_types.Mount, dist: float,
speed: float = None):

z_axis = Axis.by_mount(mount)
pl_axis = Axis.of_plunger(mount)
all_axes_pos = OrderedDict(
Expand Down
158 changes: 95 additions & 63 deletions api/src/opentrons/protocol_api/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,49 +197,28 @@ def update_config(self, **kwargs):
"""
self._hardware.update_config(**kwargs)

def move_to(self, mount: types.Mount,
location: types.Location):
""" Implement motions of the robot.
This should not need to be called by the user; it is called by
:py:meth:`InstrumentContext.move_to` (and thus all other
:py:class:`InstrumentContext` methods that involve moving, such as
:py:meth:`InstrumentContext.aspirate`) to move the pipettes around.
It encapsulates location caching and ensures that all moves are safe.
It does this by taking a :py:class:`.types.Location` that can have
a position attached to it, and its behavior depends on the state of
that location cache and the passed location.
"""
switching_instr = self._last_moved_instrument\
and self._last_moved_instrument != mount
if switching_instr:
# TODO: Is 10 the right number here? This is what’s used in
# robot since it’s a default to an argument that is never
# changed
self._log.debug("retract {}".format(self._last_moved_instrument))
self._hardware.retract(self._last_moved_instrument, 10)

if self._location_cache and not switching_instr:
from_loc = self._location_cache
else:
from_loc = types.Location(
point=self._hardware.gantry_position(mount),
labware=None)
moves = geometry.plan_moves(from_loc, location, self._deck_layout)
self._log.debug("planned moves for {}->{}: {}"
.format(from_loc, location, moves))
self._location_cache = location
self._last_moved_instrument = mount
for move in moves:
self._hardware.move_to(mount, move)

def home(self):
""" Homes the robot.
"""
self._log.debug("home")
self._hardware.home()

@property
def location_cache(self) -> Optional[types.Location]:
""" The cache used by the robot to determine where it last was.
"""
return self._location_cache

@location_cache.setter
def location_cache(self, loc: Optional[types.Location]):
self._location_cache = loc

@property
def deck(self) -> geometry.Deck:
""" The object holding the deck layout of the robot.
"""
return self._deck_layout

@staticmethod
def _build_hardware_adapter(
loop: asyncio.AbstractEventLoop,
Expand Down Expand Up @@ -276,10 +255,11 @@ def __init__(self,
self._last_location: Union[Labware, Well, None] = None
self._log = log_parent.getChild(repr(self))
self._log.info("attached")
self._well_bottom_clearance = 0.5

def aspirate(self,
volume: float = None,
location: types.Location = None,
location: Union[types.Location, Well] = None,
rate: float = 1.0):
"""
Aspirate a volume of liquid (in microliters/uL) using this pipette
Expand All @@ -289,16 +269,17 @@ def aspirate(self,
from its current position. If only a location is passed,
:py:meth:`aspirate` will default to its :py:attr:`max_volume`.
If the :py:class:`.types.Location` passed in `location` has an
associated labware, that labware will be saved until another motion
is commanded. This is used to optimize motions - for instance, moving
between two wells requires much less Z-distance to avoid collisions
than moving between two pieces of labware.
:param volume: The volume to aspirate, in microliters. If not
specified, :py:attr:`max_volume`.
:type volume: int or float
:param location: Where to aspirate from. If unspecified, the
:param location: Where to aspirate from. If `location` is a
:py:class:`.Well`, the robot will aspirate from
:py:attr:`well_bottom_clearance` mm
above the bottom of the well. If `location` is a
:py:class:`.Location` (i.e. the result of
:py:meth:`.Well.top` or :py:meth:`.Well.bottom`), the
robot will aspirate from the exact specified location.
If unspecified, the robot will aspirate from the
current position.
:param rate: The relative plunger speed for this aspirate. During
this aspirate, the speed of the plunger will be
Expand All @@ -311,14 +292,24 @@ def aspirate(self,
.format(volume,
location if location else 'current position',
rate))
if location:
if isinstance(location, Well):
point, well = location.bottom()
self.move_to(
types.Location(point + types.Point(0, 0,
self.well_bottom_clearance),
well))
elif isinstance(location, types.Location):
self.move_to(location)
elif location is not None:
raise TypeError(
'location should be a Well or Location, but it is {}'
.format(location))
self._hardware.aspirate(self._mount, volume, rate)
return self

def dispense(self,
volume: float = None,
location: types.Location = None,
location: Union[types.Location, Well] = None,
rate: float = 1.0):
"""
Dispense a volume of liquid (in microliters/uL) using this pipette
Expand All @@ -329,20 +320,21 @@ def dispense(self,
into the pipette will be dispensed (this volume is accessible through
:py:attr:`current_volume`).
The location may be a :py:class:`.Well`, or a specific position in
relation to a :py:class:`.Well`, such as :py:meth:`.Well.top`. If a
:py:class:`.Well` is specified without calling a position method
(such as :py:meth:`.Well.top` or :py:meth:`.Well.bottom`), the liquid
will be dispensed at the bottom of the well.
:param volume: The volume of liquid to dispense, in microliters. If not
specified, defaults to :py:attr:`current_volume`.
:type volume: int or float
:param location: Where to dispense into. If unspecified, the
:param location: Where to dispense into. If `location` is a
:py:class:`.Well`, the robot will dispense into
:py:attr:`well_bottom_clearance` mm
above the bottom of the well. If `location` is a
:py:class:`.Location` (i.e. the result of
:py:meth:`.Well.top` or :py:meth:`.Well.bottom`), the
robot will dispense into the exact specified location.
If unspecified, the robot will dispense into the
current position.
:param rate: The relative plunger speed for this aspirate. During
this aspirate, the speed of the plunger will be
`rate` * :py:attr:`aspirate_speed`. If not specified,
:param rate: The relative plunger speed for this dispense. During
this dispense, the speed of the plunger will be
`rate` * :py:attr:`dispense_speed`. If not specified,
defaults to 1.0 (speed will not be modified).
:type rate: float
:returns: This instance.
Expand All @@ -351,8 +343,18 @@ def dispense(self,
.format(volume,
location if location else 'current position',
rate))
if location:
if isinstance(location, Well):
point, well = location.bottom()
self.move_to(
types.Location(point + types.Point(0, 0,
self.well_bottom_clearance),
well))
elif isinstance(location, types.Location):
self.move_to(location)
elif location is not None:
raise TypeError(
'location should be a Well or Location, but it is {}'
.format(location))
self._hardware.dispense(self._mount, volume, rate)
return self

Expand Down Expand Up @@ -420,13 +422,27 @@ def transfer(self,
raise NotImplementedError

def move_to(self, location: types.Location):
""" Move this pipette to a specific location on the deck.
""" Move the instrument.
:param location: Where to move to.
:raises ValueError: if an argument is incorrect.
:param location: The location to move to.
"""
self._log.debug("move to {}".format(location))
self._ctx.move_to(self._mount, location)
if self._ctx.location_cache:
from_lw = self._ctx.location_cache.labware
else:
from_lw = None
from_loc = types.Location(self._hardware.gantry_position(self._mount),
from_lw)
moves = geometry.plan_moves(from_loc, location, self._ctx.deck)
self._log.debug("move {}->{}: {}"
.format(from_loc, location, moves))
try:
for move in moves:
self._hardware.move_to(self._mount, move)
except Exception:
self._ctx.location_cache = None
raise
else:
self._ctx.location_cache = location
return self

@property
Expand Down Expand Up @@ -545,6 +561,22 @@ def hw_pipette(self) -> Optional[Dict[str, Any]]:
"""
return self._hardware.attached_instruments[self._mount]

@property
def well_bottom_clearance(self) -> float:
""" The distance above the bottom of a well to aspirate or dispense.
When :py:meth:`aspirate` or :py:meth:`dispense` is given a
:py:class:`.Well` rather than a full :py:class:`.Location`, the robot
will move this distance above the bottom of the well to aspirate or
dispense.
"""
return self._well_bottom_clearance

@well_bottom_clearance.setter
def well_bottom_clearance(self, clearance: float):
assert clearance >= 0
self._well_bottom_clearance = clearance

def __repr__(self):
return '<{}: {} in {}>'.format(self.__class__.__name__,
self.hw_pipette['name'],
Expand Down
14 changes: 13 additions & 1 deletion api/tests/opentrons/hardware_control/test_moves.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ async def test_controller_musthome(hardware_api):
async def test_home_specific_sim(hardware_api, monkeypatch):
await hardware_api.home()
await hardware_api.move_to(types.Mount.RIGHT, types.Point(-10, 10, 20))
# Avoid the autoretract when moving two difference instruments
hardware_api._last_moved_mount = None
await hardware_api.move_rel(types.Mount.LEFT, types.Point(0, 0, -20))
await hardware_api.home([Axis.Z, Axis.C])
assert hardware_api._current_position == {Axis.X: -10,
Expand Down Expand Up @@ -100,7 +102,7 @@ async def test_move(hardware_api):
target_position2 = {Axis.X: 60,
Axis.Y: 40,
Axis.Z: 228,
Axis.A: 10,
Axis.A: 218, # The other instrument is retracted
Axis.B: 19,
Axis.C: 19}
await hardware_api.move_rel(mount2, rel_position)
Expand Down Expand Up @@ -191,3 +193,13 @@ def mock_move(position, speed=None):
assert called_with['X'] == 44
assert called_with['Y'] == 20
assert called_with['Z'] == 30


async def test_other_mount_retracted(hardware_api):
await hardware_api.home()
await hardware_api.move_to(types.Mount.RIGHT, types.Point(0, 0, 0))
assert hardware_api.gantry_position(types.Mount.RIGHT)\
== types.Point(0, 0, 0)
await hardware_api.move_to(types.Mount.LEFT, types.Point(20, 20, 0))
assert hardware_api.gantry_position(types.Mount.RIGHT) \
== types.Point(54, 20, 218)
Loading

0 comments on commit c33e1a8

Please sign in to comment.