-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2 from bleykauf/develop
Release v1.0.0
- Loading branch information
Showing
12 changed files
with
963 additions
and
726 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
name: tests # also sets the name of the tests badge, so do not change for now | ||
|
||
on: | ||
push: | ||
|
||
jobs: | ||
test: | ||
runs-on: ubuntu-latest | ||
strategy: | ||
matrix: | ||
os: [ubuntu-latest, macos-latest, windows-latest] | ||
python-version: ["3.8", "3.9", "3.10","3.11", "3.12"] | ||
steps: | ||
- uses: actions/checkout@v4 | ||
- name: Set up Python ${{ matrix.python-version }} | ||
uses: actions/setup-python@v4 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
cache: pip | ||
- name: Install dependencies | ||
run: | | ||
python -m pip install --upgrade pip | ||
pip install .[tests] | ||
- name: Run pytest | ||
run: pytest . | ||
- name: Run mypy | ||
run: mypy --explicit-package-base . | ||
|
||
coverage: | ||
needs: | ||
- test | ||
runs-on: ubuntu-latest | ||
|
||
steps: | ||
- uses: actions/checkout@v4 | ||
with: | ||
fetch-depth: 0 | ||
- name: Set up Python | ||
uses: actions/setup-python@v4 | ||
with: | ||
python-version: 3.11 | ||
cache: pip | ||
- name: Install dependencies | ||
run: | | ||
python -m pip install --upgrade pip | ||
pip install . pytest pytest-cov | ||
- name: Create coverage report | ||
run: | | ||
coverage run -m pytest . | ||
coverage report -m | ||
- name: Create coverage badge | ||
uses: tj-actions/coverage-badge-py@v2 | ||
with: | ||
output: docs/coverage.svg | ||
- name: Verify Changed files | ||
uses: tj-actions/verify-changed-files@v16 | ||
id: verify-changed-files | ||
with: | ||
files: docs/coverage.svg | ||
- name: Commit files | ||
if: steps.verify-changed-files.outputs.files_changed == 'true' | ||
run: | | ||
git config --local user.email "github-actions[bot]@users.noreply.github.com" | ||
git config --local user.name "github-actions[bot]" | ||
git add docs/coverage.svg | ||
git commit -m "Updated coverage.svg" | ||
- name: Push changes | ||
if: steps.verify-changed-files.outputs.files_changed == 'true' | ||
uses: ad-m/github-push-action@master | ||
with: | ||
github_token: ${{ secrets.github_token }} | ||
branch: ${{ github.ref }} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +0,0 @@ | ||
from .meer_tec import TEC, USB, XPort # noqa: F401 | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import socket | ||
import time | ||
from typing import Protocol | ||
|
||
import serial | ||
|
||
from .mecom import Message | ||
|
||
|
||
class Interface(Protocol): | ||
def query(self, request: Message) -> Message: | ||
... | ||
|
||
def clear(self) -> None: | ||
... | ||
|
||
|
||
class XPort(socket.socket): | ||
def __init__(self, ip: str, port: int = 10001) -> None: | ||
super().__init__(socket.AF_INET, socket.SOCK_STREAM) | ||
self.settimeout(0.2) | ||
self.ip = ip | ||
self.port = port | ||
super().connect((self.ip, self.port)) | ||
|
||
def query(self, request: Message) -> Message: | ||
self.send(request.encode("ascii")) | ||
time.sleep(0.01) | ||
response = self.recv(128).decode("ascii") | ||
return Message(response, value_type=request.value_type) | ||
|
||
def clear(self) -> None: | ||
_ = self.recv(128) | ||
|
||
|
||
class USB(serial.Serial): | ||
def __init__(self, port: str, timeout: int = 1, baudrate: int = 57600) -> None: | ||
super().__init__( | ||
port, baudrate=baudrate, timeout=timeout, write_timeout=timeout | ||
) | ||
|
||
def query(self, request: "Message") -> str: | ||
self.write(request.encode("ascii")) | ||
time.sleep(0.01) | ||
response = self.read(128).decode("ascii") | ||
return Message(response, value_type=request.value_type) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
import random | ||
import struct | ||
from typing import Generic, Literal, Optional, Type, TypeVar | ||
|
||
from PyCRC.CRCCCITT import CRCCCITT as CRC | ||
|
||
PARAM_CMDS = ["VS", "?VR"] | ||
FloatOrInt = TypeVar("FloatOrInt", float, int) | ||
ParamCmds = Literal["VS", "?VR"] | ||
|
||
|
||
def calc_checksum(string: str) -> str: | ||
"""Calculate CRC checksum.""" | ||
return f"{CRC().calculate(string):04X}" | ||
|
||
|
||
def construct_param_cmd( | ||
device_addr: int, | ||
cmd: str, | ||
param_id: int, | ||
value_type: Type[FloatOrInt], | ||
param_inst: int = 1, | ||
value: Optional[FloatOrInt] = None, | ||
seq_num: Optional[int] = None, | ||
) -> str: | ||
""" | ||
Construct a MeCom command. | ||
:param device_addr: Device address (0 .. 255). Broadcast Device Address (0) will | ||
send the command to all connected Meerstetter devices | ||
:param param_id: Parameter ID (0 .. 65535) | ||
:param value_type: Value type (int or float) | ||
:param param_inst: Parameter instance (0 .. 255). For most parameters the instance | ||
is used to address the channel on the device | ||
:param value: Value to set | ||
:param seq_num: Sequence number (0 .. 65535). If not given, a random number will be | ||
generated | ||
:return: MeCom command | ||
""" | ||
if seq_num is None: | ||
seq_num = random.randint(0, 65535) | ||
|
||
if seq_num < 0 or seq_num > 65535: | ||
raise ValueError("seq_num must be between 0 and 65535") | ||
|
||
if cmd not in PARAM_CMDS: | ||
raise ValueError(f"cmd must be one of {PARAM_CMDS}") | ||
|
||
if device_addr < 0 or device_addr > 255: | ||
raise ValueError("device_addr must be between 0 and 255") | ||
|
||
if cmd in ["VS", "?VR"] and param_id is None: | ||
raise ValueError("param_id must be given for VS and ?VR commands") | ||
|
||
if cmd == "VS": | ||
if value is None: | ||
raise ValueError("value must be given for VS command") | ||
if value_type is float: | ||
# convert float to hex of length 8, remove the leading '0X' and capitalize | ||
val = hex(struct.unpack("<I", struct.pack("<f", value))[0])[2:].upper() | ||
elif value_type is int: | ||
# convert int to hex of length 8 | ||
val = f"{value:08X}" | ||
elif cmd == "?VR": | ||
val = "" | ||
|
||
cmd = f"#{device_addr:02X}{seq_num:04X}{cmd}{param_id:04X}{param_inst:02X}{val}" | ||
return f"{cmd}{calc_checksum(cmd)}\r" | ||
|
||
|
||
def construct_reset_cmd(device_addr: int, seq_num: Optional[int] = None) -> str: | ||
""" | ||
Construct a MeCom reset command. | ||
:param device_addr: Device address (0 .. 255). Broadcast Device Address (0) will | ||
send the command to all connected Meerstetter devices | ||
:param seq_num: Sequence number (0 .. 65535). If not given, a random number will be | ||
generated | ||
:return: MeCom command | ||
""" | ||
if seq_num is None: | ||
seq_num = random.randint(0, 65535) | ||
|
||
if seq_num < 0 or seq_num > 65535: | ||
raise ValueError("seq_num must be between 0 and 65535") | ||
|
||
cmd = f"#{device_addr:02X}{seq_num:04X}RS" | ||
return f"{cmd}{calc_checksum(cmd)}\r" | ||
|
||
|
||
def verify_response(reponse: "Message", request: "Message") -> bool: | ||
""" | ||
Verify a MeCom response. | ||
:param reponse: MeCom response | ||
:param request: MeCom request | ||
:return: True if response is valid, False otherwise | ||
""" | ||
checksum_correct = reponse.checksum == calc_checksum(reponse[0:-5]) | ||
request_match = reponse.seq_num == request.seq_num | ||
return checksum_correct & request_match | ||
|
||
|
||
class Message(str, Generic[FloatOrInt]): | ||
value_type: Type[FloatOrInt] | ||
|
||
def __new__(cls, response: str, value_type: Type[FloatOrInt]): | ||
return super().__new__(cls, response) | ||
|
||
def __init__(self, response: str, value_type: Type[FloatOrInt]) -> None: | ||
self.value_type = value_type | ||
self.device_addr = int(self[1:3], 8) | ||
self.seq_num = int(self[3:7], 16) | ||
self.payload = self[7:-5] | ||
self.checksum = self[-5:-1] | ||
|
||
@property | ||
def value(self) -> FloatOrInt: | ||
if self.value_type is int: | ||
return int(self.payload, 16) | ||
if self.value_type is float: | ||
return struct.unpack("!f", bytes.fromhex(self.payload))[0] | ||
else: | ||
raise ValueError("value_type must be int or float") |
Oops, something went wrong.