Skip to content

Commit

Permalink
Rename motor signals to match those in ophyd and add a couple more (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
DominicOram committed Apr 5, 2024
1 parent bdfd7b5 commit b4f4436
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 26 deletions.
29 changes: 15 additions & 14 deletions src/ophyd_async/epics/motion/motor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,38 @@ class Motor(StandardReadable, Movable, Stoppable):

def __init__(self, prefix: str, name="") -> None:
# Define some signals
self.setpoint = epics_signal_rw(float, prefix + ".VAL")
self.readback = epics_signal_r(float, prefix + ".RBV")
self.user_setpoint = epics_signal_rw(float, prefix + ".VAL")
self.user_readback = epics_signal_r(float, prefix + ".RBV")
self.velocity = epics_signal_rw(float, prefix + ".VELO")
self.max_velocity = epics_signal_r(float, prefix + ".VMAX")
self.acceleration = epics_signal_rw(float, prefix + ".ACCL")
self.units = epics_signal_r(str, prefix + ".EGU")
self.motor_egu = epics_signal_r(str, prefix + ".EGU")
self.precision = epics_signal_r(int, prefix + ".PREC")
self.max_resolution = epics_signal_r(float, prefix + ".MRES")
self.low_limit_travel = epics_signal_rw(int, prefix + ".LLM")
self.high_limit_travel = epics_signal_rw(int, prefix + ".HLM")

# Signals that collide with standard methods should have a trailing underscore
self.stop_ = epics_signal_x(prefix + ".STOP")
self.motor_stop = epics_signal_x(prefix + ".STOP")
# Whether set() should complete successfully or not
self._set_success = True
# Set name and signals for read() and read_configuration()
self.set_readable_signals(
read=[self.readback],
config=[self.velocity, self.units],
read=[self.user_readback],
config=[self.velocity, self.motor_egu],
)
super().__init__(name=name)

def set_name(self, name: str):
super().set_name(name)
# Readback should be named the same as its parent in read()
self.readback.set_name(name)
self.user_readback.set_name(name)

async def _move(self, new_position: float, watchers: List[Callable] = []):
self._set_success = True
start = time.monotonic()
old_position, units, precision = await asyncio.gather(
self.setpoint.get_value(),
self.units.get_value(),
self.user_setpoint.get_value(),
self.motor_egu.get_value(),
self.precision.get_value(),
)

Expand All @@ -60,11 +61,11 @@ def update_watchers(current_position: float):
time_elapsed=time.monotonic() - start,
)

self.readback.subscribe_value(update_watchers)
self.user_readback.subscribe_value(update_watchers)
try:
await self.setpoint.set(new_position)
await self.user_setpoint.set(new_position)
finally:
self.readback.clear_sub(update_watchers)
self.user_readback.clear_sub(update_watchers)
if not self._set_success:
raise RuntimeError("Motor was stopped")

Expand All @@ -85,5 +86,5 @@ async def stop(self, success=False):
self._set_success = success
# Put with completion will never complete as we are waiting for completion on
# the move above, so need to pass wait=False
status = self.stop_.trigger(wait=False)
status = self.motor_stop.trigger(wait=False)
await status
4 changes: 2 additions & 2 deletions tests/core/test_device_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def my_plan():
RE(my_plan())

assert (
checking_loop.run_until_complete(sim_motor.setpoint.read())[
checking_loop.run_until_complete(sim_motor.user_setpoint.read())[
"sim_motor-setpoint"
]["value"]
== 3.14
Expand Down Expand Up @@ -146,7 +146,7 @@ def my_plan():

# The set should fail since the run engine is on a different event loop
assert (
device_connector_loop.run_until_complete(sim_motor.setpoint.read())[
device_connector_loop.run_until_complete(sim_motor.user_setpoint.read())[
"sim_motor-setpoint"
]["value"]
!= 3.14
Expand Down
22 changes: 12 additions & 10 deletions tests/epics/motion/test_motor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ async def sim_motor():
# Signals connected here

assert sim_motor.name == "sim_motor"
set_sim_value(sim_motor.units, "mm")
set_sim_value(sim_motor.motor_egu, "mm")
set_sim_value(sim_motor.precision, 3)
set_sim_value(sim_motor.velocity, 1)
yield sim_motor


async def test_motor_moving_well(sim_motor: motor.Motor) -> None:
set_sim_put_proceeds(sim_motor.setpoint, False)
set_sim_put_proceeds(sim_motor.user_setpoint, False)
s = sim_motor.set(0.55)
watcher = Mock()
s.watch(watcher)
Expand All @@ -45,10 +45,10 @@ async def test_motor_moving_well(sim_motor: motor.Motor) -> None:
time_elapsed=pytest.approx(0.0, abs=0.05),
)
watcher.reset_mock()
assert 0.55 == await sim_motor.setpoint.get_value()
assert 0.55 == await sim_motor.user_setpoint.get_value()
assert not s.done
await asyncio.sleep(0.1)
set_sim_value(sim_motor.readback, 0.1)
set_sim_value(sim_motor.user_readback, 0.1)
assert watcher.call_count == 1
assert watcher.call_args == call(
name="sim_motor",
Expand All @@ -59,20 +59,20 @@ async def test_motor_moving_well(sim_motor: motor.Motor) -> None:
precision=3,
time_elapsed=pytest.approx(0.1, abs=0.05),
)
set_sim_put_proceeds(sim_motor.setpoint, True)
set_sim_put_proceeds(sim_motor.user_setpoint, True)
await asyncio.sleep(A_BIT)
assert s.done
done.assert_called_once_with(s)


async def test_motor_moving_stopped(sim_motor: motor.Motor):
set_sim_put_proceeds(sim_motor.setpoint, False)
set_sim_put_proceeds(sim_motor.user_setpoint, False)
s = sim_motor.set(1.5)
s.add_callback(Mock())
await asyncio.sleep(0.2)
assert not s.done
await sim_motor.stop()
set_sim_put_proceeds(sim_motor.setpoint, True)
set_sim_put_proceeds(sim_motor.user_setpoint, True)
await asyncio.sleep(A_BIT)
assert s.done
assert s.success is False
Expand All @@ -85,12 +85,14 @@ async def test_read_motor(sim_motor: motor.Motor):
"source"
] == "sim://BLxxI-MO-TABLE-01:X.RBV"
assert (await sim_motor.read_configuration())["sim_motor-velocity"]["value"] == 1
assert (await sim_motor.describe_configuration())["sim_motor-units"]["shape"] == []
set_sim_value(sim_motor.readback, 0.5)
assert (await sim_motor.describe_configuration())["sim_motor-motor_egu"][
"shape"
] == []
set_sim_value(sim_motor.user_readback, 0.5)
assert (await sim_motor.read())["sim_motor"]["value"] == 0.5
sim_motor.unstage()
# Check we can still read and describe when not staged
set_sim_value(sim_motor.readback, 0.1)
set_sim_value(sim_motor.user_readback, 0.1)
assert (await sim_motor.read())["sim_motor"]["value"] == 0.1
assert await sim_motor.describe()

Expand Down

0 comments on commit b4f4436

Please sign in to comment.