Skip to content

Commit

Permalink
example: update nidcpower measurement to support multiple instrument …
Browse files Browse the repository at this point in the history
…sessions (#676)

* example: update nidcpower example to support multiple sessions

* refactor: remove get connections api

* prioritize measui by default

* refactor: bring back the comments in the measure method

* docs: update example's README to reflect the measurement update
  • Loading branch information
jayaseelan-james authored Mar 26, 2024
1 parent 7c41df3 commit 704ccc9
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 44 deletions.
3 changes: 2 additions & 1 deletion examples/nidcpower_source_dc_voltage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ NI SMU.
- Uses the `nidcpower` package to access NI-DCPower from Python
- Demonstrates how to cancel a running measurement by breaking a long wait into
multiple short waits
- Pin-aware, supporting one session and multiple pins
- Pin-aware, supporting multiple sessions, multiple pins, and multiple selected
sites
- Sources the same DC voltage level on all selected pin/site combinations
- Measures the DC voltage and current for each selected pin/site combination
- Includes InstrumentStudio and MeasurementLink UI Editor project files
Expand Down
99 changes: 56 additions & 43 deletions examples/nidcpower_source_dc_voltage/measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import threading
import time
from contextlib import ExitStack
from typing import TYPE_CHECKING, Iterable, List, NamedTuple, Tuple

import click
Expand All @@ -16,7 +17,6 @@
import nidcpower
import nidcpower.session
from _helpers import configure_logging, verbosity_option
from ni_measurementlink_service.session_management import TypedConnection

_NIDCPOWER_WAIT_FOR_EVENT_TIMEOUT_ERROR_CODE = -1074116059
_NIDCPOWER_TIMEOUT_EXCEEDED_ERROR_CODE = -1074097933
Expand Down Expand Up @@ -77,49 +77,61 @@ def measure(
cancellation_event = threading.Event()
measurement_service.context.add_cancel_callback(cancellation_event.set)

with measurement_service.context.reserve_session(pin_names) as reservation:
with reservation.initialize_nidcpower_session() as session_info:
# Use connections to map pin names to channel names. This sets the
# channel order based on the pin order and allows mapping the
# resulting measurements back to the corresponding pins and sites.
connections = reservation.get_nidcpower_connections(pin_names)
channel_order = ",".join(connection.channel_name for connection in connections)
channels = session_info.session.channels[channel_order]

# Configure the same settings for all of the channels corresponding
with measurement_service.context.reserve_sessions(pin_names) as reservation:
with reservation.initialize_nidcpower_sessions() as session_infos:
# Configure the same channel settings for all of the sessions corresponding
# to the selected pins and sites.
channels.source_mode = nidcpower.SourceMode.SINGLE_POINT
channels.output_function = nidcpower.OutputFunction.DC_VOLTAGE
channels.current_limit = current_limit
channels.voltage_level_range = voltage_level_range
channels.current_limit_range = current_limit_range
channels.source_delay = hightime.timedelta(seconds=source_delay)
channels.voltage_level = voltage_level

# Initiate the channels to start sourcing the outputs. initiate()
# returns a context manager that aborts the measurement when the
# function returns or raises an exception.
with channels.initiate():
# Wait for the outputs to settle.
timeout = source_delay + 10.0
_wait_for_event(
channels, cancellation_event, nidcpower.enums.Event.SOURCE_COMPLETE, timeout
)

# Measure the voltage and current for each output.
measurements: List[_Measurement] = channels.measure_multiple()
for session_info in session_infos:
channels = session_info.session.channels[session_info.channel_list]
channels.source_mode = nidcpower.SourceMode.SINGLE_POINT
channels.output_function = nidcpower.OutputFunction.DC_VOLTAGE
channels.current_limit = current_limit
channels.voltage_level_range = voltage_level_range
channels.current_limit_range = current_limit_range
channels.source_delay = hightime.timedelta(seconds=source_delay)
channels.voltage_level = voltage_level

with ExitStack() as stack:
# Initiate the channels to start sourcing the outputs. initiate()
# returns a context manager that aborts the measurement when the
# function returns or raises an exception.
for session_info in session_infos:
channels = session_info.session.channels[session_info.channel_list]
stack.enter_context(channels.initiate())

# Determine whether the outputs are in compliance.
for index, connection in enumerate(connections):
channel = connection.session.channels[connection.channel_name]
in_compliance = channel.query_in_compliance()
measurements[index] = measurements[index]._replace(in_compliance=in_compliance)

_log_measurements(connections, measurements)
# Wait for the outputs to settle.
for session_info in session_infos:
channels = session_info.session.channels[session_info.channel_list]
timeout = source_delay + 10.0
_wait_for_event(
channels, cancellation_event, nidcpower.enums.Event.SOURCE_COMPLETE, timeout
)

measurements: List[_Measurement] = []
measured_sites, measured_pins = [], []
for session_info in session_infos:
channels = session_info.session.channels[session_info.channel_list]
# Measure the voltage and current for each output of the session.
session_measurements: List[_Measurement] = channels.measure_multiple()

for measurement, channel_mapping in zip(
session_measurements, session_info.channel_mappings
):
measured_sites.append(channel_mapping.site)
measured_pins.append(channel_mapping.pin_or_relay_name)
# Determine whether the outputs are in compliance.
in_compliance = session_info.session.channels[
channel_mapping.channel
].query_in_compliance()
measurement._replace(in_compliance=in_compliance)

measurements.extend(session_measurements)

_log_measurements(measured_sites, measured_pins, measurements)
logging.info("Completed measurement")
return (
[connection.site for connection in connections],
[connection.pin_or_relay_name for connection in connections],
measured_sites,
measured_pins,
[measurement.voltage for measurement in measurements],
[measurement.current for measurement in measurements],
[measurement.in_compliance for measurement in measurements],
Expand Down Expand Up @@ -161,12 +173,13 @@ def _wait_for_event(


def _log_measurements(
connections: Iterable[TypedConnection[nidcpower.Session]],
measured_sites: Iterable[int],
measured_pins: Iterable[str],
measured_values: Iterable[_Measurement],
) -> None:
"""Log the measured values."""
for connection, measurement in zip(connections, measured_values):
logging.info("site%s/%s:", connection.site, connection.pin_or_relay_name)
for site, pin, measurement in zip(measured_sites, measured_pins, measured_values):
logging.info("site%s/%s:", site, pin)
logging.info(" Voltage: %g V", measurement.voltage)
logging.info(" Current: %g A", measurement.current)
logging.info(" In compliance: %s", str(measurement.in_compliance))
Expand Down

0 comments on commit 704ccc9

Please sign in to comment.