From c6e88110b48776a67fdbe87776652039af18a237 Mon Sep 17 00:00:00 2001 From: Leon Morten Richter Date: Sun, 15 Sep 2024 11:10:09 +0200 Subject: [PATCH 1/3] feat: adds TagBlockQueue --- README.md | 46 +++++++++------ examples/grouping.py | 15 +++++ examples/sample.ais | 4 ++ pyais/stream.py | 121 ++++++++++++++++++++++++++++++++++------ tests/test_examples.py | 2 +- tests/test_tag_block.py | 56 ++++++++++++++++++- 6 files changed, 210 insertions(+), 34 deletions(-) create mode 100644 examples/grouping.py diff --git a/README.md b/README.md index 5fa2174..fafb6f7 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,6 @@ ![CI](https://github.com/M0r13n/pyais/workflows/CI/badge.svg) [![Documentation Status](https://readthedocs.org/projects/pyais/badge/?version=latest)](https://pyais.readthedocs.io/en/latest/?badge=latest) -

@@ -51,7 +50,7 @@ This AIS sentence is known as a "Position Report" message and is used to transmi - : This field is left blank. This field can contain the sequence number. - **B**: This field indicates the communication channel being used to transmit the message. In this case, the channel is "B". - **15MwkT1P37G?fl0EJbR0OwT0@MS**: This field contains the payload of the message, which is encoded using a variant of ASCII known as "Six-bit ASCII". The payload contains information such as the vessel's identification, position, course, and speed. - 0*4E: This field is a checksum that is used to verify the integrity of the sentence. + 0\*4E: This field is a checksum that is used to verify the integrity of the sentence. **pyais** is a Python modul to encode and decode AIS messages. @@ -149,7 +148,7 @@ The AIS data is freely available under the [norwegian license for public data](h Data can be read from a TCP/IP socket and is encoded according to IEC 62320-1: -- IP: 153.44.253.27 +- IP: 153.44.253.27 - Port: 5631 Refer to the [examples/live_stream.py](./examples/live_stream.py) for a practical example on how to read & decode AIS data from a TCP/IP socket. @@ -159,8 +158,8 @@ This is useful for debugging or for getting used to pyais. It is also possible to encode messages. -| :exclamation: Every message needs at least a single keyword argument: `mmsi`. All other fields have most likely default values. | -| -------------------------------------------------------------------------------------------------------------------------------- | +| :exclamation: Every message needs at least a single keyword argument: `mmsi`. All other fields have most likely default values. | +| ------------------------------------------------------------------------------------------------------------------------------- | ### Encode data using a dictionary @@ -169,12 +168,12 @@ You can pass a dict that has a set of key-value pairs: - use `from pyais.encode import encode_dict` to import `encode_dict` method - it takes a dictionary of data and some NMEA specific kwargs and returns the NMEA 0183 encoded AIS sentence. - only keys known to each message are considered - - other keys are simply omitted - - you can get list of available keys by looking at pyais/encode.py - - you can also call `MessageType1.fields()` to get a list of fields programmatically for each message + - other keys are simply omitted + - you can get list of available keys by looking at pyais/encode.py + - you can also call `MessageType1.fields()` to get a list of fields programmatically for each message - every message needs at least two keyword arguments: - - `mmsi` the MMSI number to encode - - `type` or `msg_type` the type of the message to encode (1-27) + - `mmsi` the MMSI number to encode + - `type` or `msg_type` the type of the message to encode (1-27) **NOTE:** This method takes care of splitting large payloads (larger than 60 characters) @@ -303,6 +302,21 @@ with IterMessages(messages) as s: print(msg.decode()) ``` +## Tag Block Queue (grouping) + +Every class that implements the streaming API accepts an optional keyword argument `tbq`, which is set to `None` by default. When tbq is provided, it can be used as a queue for handling NMEA tag blocks. The queue's `get_nowait()` method allows you to retrieve a list of NMEASentence objects, but only when the entire group has been received (i.e., all sentences within the group are complete). It is important to note that this is rudimentary support for tag block groups, as pyais primarily focuses on processing AIS messages and abstracts away NMEA sentences from the user. + +```py +with FileReaderStream('/path/to/file.nmea', tbq=TagBlockQueue()) as stream: + tbq = stream.tbq + + for msg in stream: + try: + print(tbq.get_nowait()) + except queue.Empty: + pass +``` + # Gatehouse wrappers Some AIS messages have so-called Gatehouse wrappers. These encapsulating messages contain extra information, such as time and checksums. Some readers also process these. See some more documentation [here](https://www.iala-aism.org/wiki/iwrap/index.php/GH_AIS_Message_Format). @@ -348,9 +362,9 @@ def process(self, line: bytes) -> bytes: ``` Parameters: - line (bytes): The input line in bytes that needs to be processed. +line (bytes): The input line in bytes that needs to be processed. Returns: - bytes: The processed line in bytes, conforming to the NMEA0183 format. +bytes: The processed line in bytes, conforming to the NMEA0183 format. The `process` method is responsible for transforming the input bytes into a format that adheres to the NMEA0183 standard. This method must be implemented by any class that conforms to the `PreprocessorProtocol`. @@ -549,7 +563,7 @@ feature, please create an issue. During installation, you may encounter problems due to missing header files. The error looks like this: -````sh +```sh ... bitarray/_bitarray.c:13:10: fatal error: Python.h: No such file or directory @@ -560,13 +574,13 @@ During installation, you may encounter problems due to missing header files. The ... -```` +``` In order to solve this issue, you need to install header files and static libraries for python dev: -````sh +```sh $ sudo apt install python3-dev -```` +``` # For developers diff --git a/examples/grouping.py b/examples/grouping.py new file mode 100644 index 0000000..d423232 --- /dev/null +++ b/examples/grouping.py @@ -0,0 +1,15 @@ +import queue +from pyais.stream import FileReaderStream, TagBlockQueue +import pathlib + +filename = pathlib.Path(__file__).parent.joinpath('sample.ais') + + +with FileReaderStream(str(filename), tbq=TagBlockQueue()) as stream: + tbq = stream.tbq + + for msg in stream: + try: + print(tbq.get_nowait()) + except queue.Empty: + pass diff --git a/examples/sample.ais b/examples/sample.ais index 77232f3..3251111 100644 --- a/examples/sample.ais +++ b/examples/sample.ais @@ -10,3 +10,7 @@ IamNotAnAisMessage1111 # Tag blocks are also supported \g:1-2-73874,n:157036,s:r003669945,c:12415440354*A\!AIVDM,1,1,,B,15N4cJ005Jrek0H@9nDW5608EP,013 +\g:1-2-27300,n:636994,s:b003669710,c:1428621738*5F\!SAVDM,2,1,2,B,55Mw@A7J1adAL@?;7WPl58F0U bytes: pass +class TagBlockQueue(queue.Queue[list[NMEASentence]]): + + def __init__(self, maxsize: int = 0) -> None: + self.groups: dict[str, dict[str, object]] = {} + super().__init__(maxsize) + + def put_sentence(self, sentence: NMEASentence) -> None: + if not sentence.tag_block: + # No NMEA 4.10 tag block + # Returns a single line immediately. + super().put([sentence,]) + return + + tb = sentence.tag_block + tb.init() + + if not tb.group: + # No NMEA 4.10 tag block 'g'. + super().put([sentence,]) + return + + if int(tb.group.sentence_tot) == 1: + # Group of a single sentence + super().put([sentence,]) + return + + if int(tb.group.sentence_num) == 1: + # The first sentence + self.groups[tb.group.group_id] = {'sentence_tot': tb.group.sentence_tot, 'sentences': [sentence,]} + return + + if tb.group.group_id not in self.groups: + # Unknown group. First sentence of group is missing. + return + + self.groups[tb.group.group_id]['sentences'].append(sentence) # type: ignore + + # is_last_sentence = int(tb.group.sentence_num) != int(self.groups[tb.group.group_id]['sentence_tot']) + group_is_complete = int(tb.group.sentence_tot) != len(self.groups[tb.group.group_id]['sentences']) # type: ignore + + if group_is_complete: + return + + # All sentences belonging to this group were received. + super().put(self.groups[tb.group.group_id]['sentences']) # type: ignore + del self.groups[tb.group.group_id] + + class AssembleMessages(ABC): """ Base class that assembles multiline messages. Offers a iterator like interface. """ - def __init__(self) -> None: + def __init__(self, tbq: TagBlockQueue | None = None) -> None: self.wrapper_msg: typing.Optional[GatehouseSentence] = None + self.tbq: TagBlockQueue | None = tbq def __enter__(self) -> "AssembleMessages": # Enables use of with statement @@ -91,6 +141,12 @@ def __insert_wrapper_msg(self, msg: AISSentence) -> AISSentence: msg.wrapper_msg = wrapper_msg return msg + def __add_to_tbq(self, sentence: NMEASentence) -> None: + if not self.tbq: + # Tag Block Queue not defined. Do nothing. + return + self.tbq.put_sentence(sentence) + def _assemble_messages(self) -> Generator[NMEAMessage, None, None]: buffer: typing.Dict[typing.Tuple[int, str], typing.List[typing.Optional[NMEAMessage]]] = {} messages = self._iter_messages() @@ -99,6 +155,7 @@ def _assemble_messages(self) -> Generator[NMEAMessage, None, None]: for line in messages: try: sentence = NMEASentenceFactory.produce(line) + self.__add_to_tbq(sentence) if sentence.TYPE == GatehouseSentence.TYPE: sentence = cast(GatehouseSentence, sentence) self.__set_last_wrapper_msg(sentence) @@ -143,8 +200,8 @@ def _iter_messages(self) -> Generator[bytes, None, None]: class IterMessages(AssembleMessages): - def __init__(self, messages: Iterable[bytes]): - super().__init__() + def __init__(self, messages: Iterable[bytes], tbq: TagBlockQueue | None = None): + super().__init__(tbq=tbq) # If the user passes a single byte string make it into a list if isinstance(messages, bytes): messages = [messages, ] @@ -176,13 +233,17 @@ def _iter_messages(self) -> Generator[bytes, None, None]: class Stream(AssembleMessages, Generic[F], ABC): - def __init__(self, fobj: F, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: + def __init__( + self, fobj: F, + preprocessor: typing.Optional[PreprocessorProtocol] = None, + tbq: TagBlockQueue | None = None + ) -> None: """ Create a new Stream-like object. @param fobj: A file-like or socket object. @param preprocessor: An optional preprocessor """ - super().__init__() + super().__init__(tbq=tbq) self._fobj: F = fobj self.preprocessor = preprocessor @@ -211,8 +272,13 @@ def read(self) -> Generator[bytes, None, None]: class BinaryIOStream(Stream[BinaryIO]): """Read messages from a file-like object""" - def __init__(self, file: BinaryIO, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: - super().__init__(file, preprocessor=preprocessor) + def __init__( + self, + file: BinaryIO, + preprocessor: typing.Optional[PreprocessorProtocol] = None, + tbq: TagBlockQueue | None = None + ) -> None: + super().__init__(file, preprocessor=preprocessor, tbq=tbq) def read(self) -> Generator[bytes, None, None]: yield from self._fobj @@ -223,7 +289,13 @@ class FileReaderStream(BinaryIOStream): Read NMEA messages from file """ - def __init__(self, filename: typing.Union[str, pathlib.Path], mode: str = "rb", preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: + def __init__( + self, + filename: typing.Union[str, pathlib.Path], + mode: str = "rb", + preprocessor: typing.Optional[PreprocessorProtocol] = None, + tbq: TagBlockQueue | None = None + ) -> None: self.filename: typing.Union[str, pathlib.Path] = filename self.mode: str = mode # Try to open file @@ -232,7 +304,7 @@ def __init__(self, filename: typing.Union[str, pathlib.Path], mode: str = "rb", file = cast(BinaryIO, file) except Exception as e: raise FileNotFoundError(f"Could not open file {self.filename}") from e - super().__init__(file, preprocessor=preprocessor) + super().__init__(file, preprocessor=preprocessor, tbq=tbq) class ByteStream(Stream[None]): @@ -240,9 +312,14 @@ class ByteStream(Stream[None]): Takes a iterable that contains ais messages as bytes and assembles them. """ - def __init__(self, iterable: Iterable[bytes], preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: + def __init__( + self, + iterable: Iterable[bytes], + preprocessor: typing.Optional[PreprocessorProtocol] = None, + tbq: TagBlockQueue | None = None + ) -> None: self.iterable: Iterable[bytes] = iterable - super().__init__(None, preprocessor=preprocessor) + super().__init__(None, preprocessor=preprocessor, tbq=tbq) def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None: return @@ -285,10 +362,16 @@ def read(self) -> Generator[bytes, None, None]: class UDPReceiver(SocketStream): - def __init__(self, host: str, port: int, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: + def __init__( + self, + host: str, + port: int, + preprocessor: typing.Optional[PreprocessorProtocol] = None, + tbq: TagBlockQueue | None = None + ) -> None: sock: socket = socket(AF_INET, SOCK_DGRAM) sock.bind((host, port)) - super().__init__(sock, preprocessor=preprocessor) + super().__init__(sock, preprocessor=preprocessor, tbq=tbq) def recv(self) -> bytes: return self._fobj.recvfrom(self.BUF_SIZE)[0] @@ -303,11 +386,17 @@ class TCPConnection(SocketStream): def recv(self) -> bytes: return self._fobj.recv(self.BUF_SIZE) - def __init__(self, host: str, port: int = 80, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: + def __init__( + self, + host: str, + port: int = 80, + preprocessor: typing.Optional[PreprocessorProtocol] = None, + tbq: TagBlockQueue | None = None + ) -> None: sock: socket = socket(AF_INET, SOCK_STREAM) try: sock.connect((host, port)) except ConnectionRefusedError as e: sock.close() raise ConnectionRefusedError(f"Failed to connect to {host}:{port}") from e - super().__init__(sock, preprocessor=preprocessor) + super().__init__(sock, preprocessor=preprocessor, tbq=tbq) diff --git a/tests/test_examples.py b/tests/test_examples.py index d4c5ed3..38b9ce3 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -32,4 +32,4 @@ def test_run_every_file(self): if csv_file.exists(): csv_file.unlink() - assert i == 21 + assert i == 22 diff --git a/tests/test_tag_block.py b/tests/test_tag_block.py index d6cd84e..892de05 100644 --- a/tests/test_tag_block.py +++ b/tests/test_tag_block.py @@ -3,7 +3,57 @@ from pyais.exceptions import TagBlockNotInitializedException, UnknownMessageException from pyais.messages import AISSentence, NMEASentenceFactory, TagBlock -from pyais.stream import IterMessages +from pyais.stream import IterMessages, TagBlockQueue + + +class TagBlockQueueTestCase(unittest.TestCase): + + def test_put_single_w_group(self): + tbq = TagBlockQueue() + + raw = b'\\g:1-1-4512,s:FooBar,c:1428451253*50\\!AIVDM,1,1,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C' + sentence = NMEASentenceFactory.produce(raw) + tbq.put_sentence(sentence) + + result = tbq.get_nowait() + + self.assertEqual(len(result), 1) + self.assertEqual(result[0], sentence) + + def test_put_single_wo_group(self): + tbq = TagBlockQueue() + + raw = b'!AIVDM,1,1,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C' + sentence = NMEASentenceFactory.produce(raw) + tbq.put_sentence(sentence) + + result = tbq.get_nowait() + + self.assertEqual(len(result), 1) + self.assertEqual(result[0], sentence) + + def test_put_multiple_w_groups(self): + RAWS = [ + b'\\g:1-3-4512,s:FooBar,c:1428451253*50\\!AIVDM,1,1,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C', + b'\\g:3-3-4512,s:FooBar,c:1428451253*50\\!AIVDM,1,3,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C', + b'\\g:1-3-1234*30\\!AIVDM,1,1,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C', + b'\\g:2-3-4512*30\\!AIVDM,1,2,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C', + b'\\g:1-1-1337,s:FooBar,c:1428451253*50\\!AIVDM,1,2,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C', + b'\\g:1-42-4242,s:FooBar,c:1428451253*50\\!AIVDM,1,1,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C', + ] + tbq = TagBlockQueue() + + for raw in RAWS: + tbq.put_sentence(NMEASentenceFactory.produce(raw)) + + result = tbq.get_nowait() + self.assertEqual(len(result), 3) + self.assertEqual(result[0].frag_num, 1) + self.assertEqual(result[1].frag_num, 3) + self.assertEqual(result[2].frag_num, 2) + + result = tbq.get_nowait() + self.assertEqual(len(result), 1) class TagBlockTestCase(unittest.TestCase): @@ -187,3 +237,7 @@ def test_that_unknown_tag_blocks_can_exported_as_dicts(self): 'text': None } ) + + +if __name__ == '__main__': + unittest.main() From 5069f17207ae1f51d78aab1da63d8d349c89a221 Mon Sep 17 00:00:00 2001 From: Leon Morten Richter Date: Sun, 15 Sep 2024 11:13:51 +0200 Subject: [PATCH 2/3] fix: unsupported type hints on Python <3.10 --- pyais/stream.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pyais/stream.py b/pyais/stream.py index ce08c76..f26b356 100644 --- a/pyais/stream.py +++ b/pyais/stream.py @@ -55,10 +55,10 @@ def process(self, line: bytes) -> bytes: pass -class TagBlockQueue(queue.Queue[list[NMEASentence]]): +class TagBlockQueue(queue.Queue): # type: ignore def __init__(self, maxsize: int = 0) -> None: - self.groups: dict[str, dict[str, object]] = {} + self.groups: typing.Dict[str, typing.Dict[str, object]] = {} super().__init__(maxsize) def put_sentence(self, sentence: NMEASentence) -> None: @@ -99,7 +99,7 @@ def put_sentence(self, sentence: NMEASentence) -> None: return # All sentences belonging to this group were received. - super().put(self.groups[tb.group.group_id]['sentences']) # type: ignore + super().put(self.groups[tb.group.group_id]['sentences']) del self.groups[tb.group.group_id] @@ -109,9 +109,9 @@ class AssembleMessages(ABC): Offers a iterator like interface. """ - def __init__(self, tbq: TagBlockQueue | None = None) -> None: + def __init__(self, tbq: typing.Optional[TagBlockQueue] = None) -> None: self.wrapper_msg: typing.Optional[GatehouseSentence] = None - self.tbq: TagBlockQueue | None = tbq + self.tbq: typing.Optional[TagBlockQueue] = tbq def __enter__(self) -> "AssembleMessages": # Enables use of with statement @@ -200,7 +200,7 @@ def _iter_messages(self) -> Generator[bytes, None, None]: class IterMessages(AssembleMessages): - def __init__(self, messages: Iterable[bytes], tbq: TagBlockQueue | None = None): + def __init__(self, messages: Iterable[bytes], tbq: typing.Optional[TagBlockQueue] = None): super().__init__(tbq=tbq) # If the user passes a single byte string make it into a list if isinstance(messages, bytes): @@ -236,7 +236,7 @@ class Stream(AssembleMessages, Generic[F], ABC): def __init__( self, fobj: F, preprocessor: typing.Optional[PreprocessorProtocol] = None, - tbq: TagBlockQueue | None = None + tbq: typing.Optional[TagBlockQueue] = None ) -> None: """ Create a new Stream-like object. @@ -276,7 +276,7 @@ def __init__( self, file: BinaryIO, preprocessor: typing.Optional[PreprocessorProtocol] = None, - tbq: TagBlockQueue | None = None + tbq: typing.Optional[TagBlockQueue] = None ) -> None: super().__init__(file, preprocessor=preprocessor, tbq=tbq) @@ -294,7 +294,7 @@ def __init__( filename: typing.Union[str, pathlib.Path], mode: str = "rb", preprocessor: typing.Optional[PreprocessorProtocol] = None, - tbq: TagBlockQueue | None = None + tbq: typing.Optional[TagBlockQueue] = None ) -> None: self.filename: typing.Union[str, pathlib.Path] = filename self.mode: str = mode @@ -316,7 +316,7 @@ def __init__( self, iterable: Iterable[bytes], preprocessor: typing.Optional[PreprocessorProtocol] = None, - tbq: TagBlockQueue | None = None + tbq: typing.Optional[TagBlockQueue] = None ) -> None: self.iterable: Iterable[bytes] = iterable super().__init__(None, preprocessor=preprocessor, tbq=tbq) @@ -367,7 +367,7 @@ def __init__( host: str, port: int, preprocessor: typing.Optional[PreprocessorProtocol] = None, - tbq: TagBlockQueue | None = None + tbq: typing.Optional[TagBlockQueue] = None ) -> None: sock: socket = socket(AF_INET, SOCK_DGRAM) sock.bind((host, port)) @@ -391,7 +391,7 @@ def __init__( host: str, port: int = 80, preprocessor: typing.Optional[PreprocessorProtocol] = None, - tbq: TagBlockQueue | None = None + tbq: typing.Optional[TagBlockQueue] = None ) -> None: sock: socket = socket(AF_INET, SOCK_STREAM) try: From 523e709c64c0ada292fa03b65ef948daf0fb920a Mon Sep 17 00:00:00 2001 From: Leon Morten Richter Date: Sat, 21 Sep 2024 13:47:10 +0200 Subject: [PATCH 3/3] chore: docs --- examples/grouping.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/examples/grouping.py b/examples/grouping.py index d423232..12628e9 100644 --- a/examples/grouping.py +++ b/examples/grouping.py @@ -1,15 +1,14 @@ -import queue from pyais.stream import FileReaderStream, TagBlockQueue import pathlib filename = pathlib.Path(__file__).parent.joinpath('sample.ais') +# To track NMEA 4.10 tag block groups a queue is required. +# This queue buffers NMEA sentences belonging until the group is complete. +# NOTE: get_nowait will return NMEA sentences - NOT AIS sentences! +tbq = TagBlockQueue() -with FileReaderStream(str(filename), tbq=TagBlockQueue()) as stream: - tbq = stream.tbq - +with FileReaderStream(str(filename), tbq=tbq) as stream: for msg in stream: - try: + while not tbq.empty(): print(tbq.get_nowait()) - except queue.Empty: - pass