Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move content to new header.py. #400

Merged
merged 2 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 4 additions & 93 deletions wfdb/io/_header.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import datetime
import re
from typing import Collection, List, Tuple

import numpy as np
import pandas as pd

from wfdb.io import _signal
from wfdb.io import util

from wfdb.io.header import HeaderSyntaxError, rx_record, rx_segment, rx_signal

"""
Notes
Expand All @@ -32,12 +31,6 @@
so that the user doesn't need to. But when reading, it should
be clear that the fields are missing.

If all of the fields were filled out in a WFDB header file, they would appear
in this order with these seperators:

RECORD_NAME/NUM_SEG NUM_SIG SAMP_FREQ/COUNT_FREQ(BASE_COUNT_VAL) SAMPS_PER_SIG BASE_TIME BASE_DATE
FILE_NAME FORMATxSAMP_PER_FRAME:SKEW+BYTE_OFFSET ADC_GAIN(BASELINE)/UNITS ADC_RES ADC_ZERO CHECKSUM BLOCK_SIZE DESCRIPTION

"""
int_types = (int, np.int64, np.int32, np.int16, np.int8)
float_types = (float, np.float64, np.float32) + int_types
Expand Down Expand Up @@ -135,53 +128,6 @@
# Specifications of all WFDB header fields, except for comments
FIELD_SPECS = pd.concat((RECORD_SPECS, SIGNAL_SPECS, SEGMENT_SPECS))

# Regexp objects for reading headers
# Record line
_rx_record = re.compile(
r"""
[ \t]* (?P<record_name>[-\w]+)
/?(?P<n_seg>\d*)
[ \t]+ (?P<n_sig>\d+)
[ \t]* (?P<fs>\d*\.?\d*)
/*(?P<counter_freq>-?\d*\.?\d*)
\(?(?P<base_counter>-?\d*\.?\d*)\)?
[ \t]* (?P<sig_len>\d*)
[ \t]* (?P<base_time>\d{,2}:?\d{,2}:?\d{,2}\.?\d{,6})
[ \t]* (?P<base_date>\d{,2}/?\d{,2}/?\d{,4})
""",
re.VERBOSE,
)

# Signal line
_rx_signal = re.compile(
r"""
[ \t]* (?P<file_name>~?[-\w]*\.?[\w]*)
[ \t]+ (?P<fmt>\d+)
x?(?P<samps_per_frame>\d*)
:?(?P<skew>\d*)
\+?(?P<byte_offset>\d*)
[ \t]* (?P<adc_gain>-?\d*\.?\d*e?[\+-]?\d*)
\(?(?P<baseline>-?\d*)\)?
/?(?P<units>[\w\^\-\?%\/]*)
[ \t]* (?P<adc_res>\d*)
[ \t]* (?P<adc_zero>-?\d*)
[ \t]* (?P<init_value>-?\d*)
[ \t]* (?P<checksum>-?\d*)
[ \t]* (?P<block_size>\d*)
[ \t]* (?P<sig_name>[\S]?[^\t\n\r\f\v]*)
""",
re.VERBOSE,
)

# Segment line
_rx_segment = re.compile(
r"""
[ \t]* (?P<seg_name>[-\w]*~?)
[ \t]+ (?P<seg_len>\d+)
""",
re.VERBOSE,
)


class BaseHeaderMixin(object):
"""
Expand Down Expand Up @@ -1013,37 +959,6 @@ def wfdb_strptime(time_string: str) -> datetime.time:
return datetime.datetime.strptime(time_string, time_fmt).time()


def parse_header_content(
header_content: str,
) -> Tuple[List[str], List[str]]:
"""
Parse the text of a header file.

Parameters
----------
header_content: str
The string content of the full header file

Returns
-------
header_lines : List[str]
A list of all the non-comment lines
comment_lines : List[str]
A list of all the comment lines
"""
header_lines, comment_lines = [], []
for line in header_content.splitlines():
line = line.strip()
# Comment line
if line.startswith("#"):
comment_lines.append(line)
# Non-empty non-comment line = header line.
elif line:
header_lines.append(line)

return header_lines, comment_lines


def _parse_record_line(record_line: str) -> dict:
"""
Extract fields from a record line string into a dictionary.
Expand All @@ -1063,7 +978,7 @@ def _parse_record_line(record_line: str) -> dict:
record_fields = {}

# Read string fields from record line
match = _rx_record.match(record_line)
match = rx_record.match(record_line)
if match is None:
raise HeaderSyntaxError("invalid syntax in record line")
(
Expand Down Expand Up @@ -1139,7 +1054,7 @@ def _parse_signal_lines(signal_lines):

# Read string fields from signal line
for ch in range(n_sig):
match = _rx_signal.match(signal_lines[ch])
match = rx_signal.match(signal_lines[ch])
if match is None:
raise HeaderSyntaxError("invalid syntax in signal line")
(
Expand Down Expand Up @@ -1213,7 +1128,7 @@ def _read_segment_lines(segment_lines):

# Read string fields from signal line
for i in range(len(segment_lines)):
match = _rx_segment.match(segment_lines[i])
match = rx_segment.match(segment_lines[i])
if match is None:
raise HeaderSyntaxError("invalid syntax in segment line")
(
Expand All @@ -1226,7 +1141,3 @@ def _read_segment_lines(segment_lines):
segment_fields["seg_len"][i] = int(segment_fields["seg_len"][i])

return segment_fields


class HeaderSyntaxError(ValueError):
"""Invalid syntax found in a WFDB header file."""
19 changes: 4 additions & 15 deletions wfdb/io/convert/csv.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import datetime
import os

import numpy as np
import pandas as pd

from wfdb.io import _header
from wfdb.io.annotation import format_ann_from_df, Annotation, wrann
from wfdb.io.record import Record, wrsamp

Expand Down Expand Up @@ -121,16 +119,13 @@ def csv_to_wfdb(
The base counter value is a floating-point number that specifies the counter
value corresponding to sample 0. If absent, the base counter value is
taken to be 0.
base_time : str, optional
base_time : datetime.time, optional
This field can be present only if the number of samples is also present.
It gives the time of day that corresponds to the beginning of the
record, in 'HH:MM:SS' format (using a 24-hour clock; thus '13:05:00', or
'13:5:0', represent 1:05 pm). If this field is absent, the time-conversion
functions assume a value of '0:0:0', corresponding to midnight.
base_date : str, optional
record.
base_date : datetime.date, optional
This field can be present only if the base time is also present. It contains
the date that corresponds to the beginning of the record, in 'DD/MM/YYYY'
format (e.g., '25/4/1989' is '25 April 1989').
the date that corresponds to the beginning of the record.
comments : list, optional
A list of string comments to be written to the header file. Each string
entry represents a new line to be appended to the bottom of the header
Expand Down Expand Up @@ -416,12 +411,6 @@ def csv_to_wfdb(
if verbose:
print("Signal block size: {}".format(block_size))

# Change the dates and times into `datetime` objects
if base_time:
base_time = _header.wfdb_strptime(base_time)
if base_date:
base_date = datetime.datetime.strptime(base_date, "%d/%m/%Y").date()

# Convert array to floating point
p_signal = p_signal.astype("float64")

Expand Down
127 changes: 127 additions & 0 deletions wfdb/io/header.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
Module for parsing header files.

This module will eventually replace _header.py

"""
import datetime
import re
from typing import List, Tuple


class HeaderSyntaxError(ValueError):
"""Invalid syntax found in a WFDB header file."""


# Record line pattern. Format:
# RECORD_NAME/NUM_SEG NUM_SIG SAMP_FREQ/COUNT_FREQ(BASE_COUNT_VAL) SAMPS_PER_SIG BASE_TIME BASE_DATE
rx_record = re.compile(
r"""
[ \t]* (?P<record_name>[-\w]+)
/?(?P<n_seg>\d*)
[ \t]+ (?P<n_sig>\d+)
[ \t]* (?P<fs>\d*\.?\d*)
/*(?P<counter_freq>-?\d*\.?\d*)
\(?(?P<base_counter>-?\d*\.?\d*)\)?
[ \t]* (?P<sig_len>\d*)
[ \t]* (?P<base_time>\d{,2}:?\d{,2}:?\d{,2}\.?\d{,6})
[ \t]* (?P<base_date>\d{,2}/?\d{,2}/?\d{,4})
""",
re.VERBOSE,
)

# Signal line pattern. Format:
# FILE_NAME FORMATxSAMP_PER_FRAME:SKEW+BYTE_OFFSET ADC_GAIN(BASELINE)/UNITS ADC_RES ADC_ZERO CHECKSUM BLOCK_SIZE DESCRIPTION
rx_signal = re.compile(
r"""
[ \t]* (?P<file_name>~?[-\w]*\.?[\w]*)
[ \t]+ (?P<fmt>\d+)
x?(?P<samps_per_frame>\d*)
:?(?P<skew>\d*)
\+?(?P<byte_offset>\d*)
[ \t]* (?P<adc_gain>-?\d*\.?\d*e?[\+-]?\d*)
\(?(?P<baseline>-?\d*)\)?
/?(?P<units>[\w\^\-\?%\/]*)
[ \t]* (?P<adc_res>\d*)
[ \t]* (?P<adc_zero>-?\d*)
[ \t]* (?P<init_value>-?\d*)
[ \t]* (?P<checksum>-?\d*)
[ \t]* (?P<block_size>\d*)
[ \t]* (?P<sig_name>[\S]?[^\t\n\r\f\v]*)
""",
re.VERBOSE,
)

# Segment line
rx_segment = re.compile(
r"""
[ \t]* (?P<seg_name>[-\w]*~?)
[ \t]+ (?P<seg_len>\d+)
""",
re.VERBOSE,
)


def wfdb_strptime(time_string: str) -> datetime.time:
"""
Given a time string in an acceptable WFDB format, return
a datetime.time object.

Valid formats: SS, MM:SS, HH:MM:SS, all with and without microsec.

Parameters
----------
time_string : str
The time to be converted to a datetime.time object.

Returns
-------
datetime.time object
The time converted from str format.

"""
n_colons = time_string.count(":")

if n_colons == 0:
time_fmt = "%S"
elif n_colons == 1:
time_fmt = "%M:%S"
elif n_colons == 2:
time_fmt = "%H:%M:%S"

if "." in time_string:
time_fmt += ".%f"

return datetime.datetime.strptime(time_string, time_fmt).time()


def parse_header_content(
header_content: str,
) -> Tuple[List[str], List[str]]:
"""
Parse the text of a header file.

Parameters
----------
header_content: str
The string content of the full header file

Returns
-------
header_lines : List[str]
A list of all the non-comment lines
comment_lines : List[str]
A list of all the comment lines

"""
header_lines, comment_lines = [], []
for line in header_content.splitlines():
line = line.strip()
# Comment line
if line.startswith("#"):
comment_lines.append(line)
# Non-empty non-comment line = header line.
elif line:
header_lines.append(line)

return header_lines, comment_lines
3 changes: 2 additions & 1 deletion wfdb/io/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from wfdb.io import _signal
from wfdb.io import _url
from wfdb.io import download
from wfdb.io import header
from wfdb.io import util


Expand Down Expand Up @@ -1840,7 +1841,7 @@ def rdheader(record_name, pn_dir=None, rd_segments=False):
header_content = download._stream_header(file_name, pn_dir)

# Separate comment and non-comment lines
header_lines, comment_lines = _header.parse_header_content(header_content)
header_lines, comment_lines = header.parse_header_content(header_content)

# Get fields from record line
record_fields = _header._parse_record_line(header_lines[0])
Expand Down