From f99558d9b8e3dbba2a952a0b292d3497aec8ee69 Mon Sep 17 00:00:00 2001 From: Olivier Martin Date: Wed, 24 Apr 2024 13:10:55 +0200 Subject: [PATCH] gattlib-py: Make pylint pass (and fix issue) --- .github/workflows/github-actions.yml | 8 + gattlib-py/.pylintrc | 646 +++++++++++++++++++++++++++ gattlib-py/gattlib/__init__.py | 31 +- gattlib-py/gattlib/adapter.py | 132 ++---- gattlib-py/gattlib/device.py | 158 +++---- gattlib-py/gattlib/exception.py | 65 +-- gattlib-py/gattlib/gatt.py | 49 +- gattlib-py/gattlib/helpers.py | 49 ++ gattlib-py/gattlib/mainloop.py | 13 +- gattlib-py/gattlib/uuid.py | 17 +- include/gattlib.h | 4 +- 11 files changed, 934 insertions(+), 238 deletions(-) create mode 100644 gattlib-py/.pylintrc create mode 100644 gattlib-py/gattlib/helpers.py diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index ccdfd90e..250ac816 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -50,6 +50,14 @@ jobs: - run: sudo apt install libbluetooth-dev - run: mkdir build && pushd build && cmake -DCMAKE_BUILD_TYPE=Release -DGATTLIB_PYTHON_INTERFACE=OFF .. && make + test-pylint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - run: python3 -m pip install PyGObject>=3.44.0 + - run: python3 -m pip install pylint + - run: python3 -m pylint gattlib-py/gattlib --rcfile gattlib-py/.pylintrc + generate-python-binary-packages: runs-on: ubuntu-latest steps: diff --git a/gattlib-py/.pylintrc b/gattlib-py/.pylintrc new file mode 100644 index 00000000..05fb9629 --- /dev/null +++ b/gattlib-py/.pylintrc @@ -0,0 +1,646 @@ +[MAIN] + +# Analyse import fallback blocks. This can be used to support both Python 2 and +# 3 compatible code, which means that the block might have code that exists +# only in one or another interpreter, leading to false positives when analysed. +analyse-fallback-blocks=no + +# Clear in-memory caches upon conclusion of linting. Useful if running pylint +# in a server-like mode. +clear-cache-post-run=no + +# Load and enable all available extensions. Use --list-extensions to see a list +# all available extensions. +#enable-all-extensions= + +# In error mode, messages with a category besides ERROR or FATAL are +# suppressed, and no reports are done by default. Error mode is compatible with +# disabling specific errors. +#errors-only= + +# Always return a 0 (non-error) status code, even if lint errors are found. +# This is primarily useful in continuous integration scripts. +#exit-zero= + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. +extension-pkg-allow-list= + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. (This is an alternative name to extension-pkg-allow-list +# for backward compatibility.) +extension-pkg-whitelist= + +# Return non-zero exit code if any of these messages/categories are detected, +# even if score is above --fail-under value. Syntax same as enable. Messages +# specified are enabled, while categories only check already-enabled messages. +fail-on= + +# Specify a score threshold under which the program will exit with error. +fail-under=10 + +# Interpret the stdin as a python script, whose filename needs to be passed as +# the module_or_package argument. +#from-stdin= + +# Files or directories to be skipped. They should be base names, not paths. +ignore=CVS + +# Add files or directories matching the regular expressions patterns to the +# ignore-list. The regex matches against paths and can be in Posix or Windows +# format. Because '\\' represents the directory delimiter on Windows systems, +# it can't be used as an escape character. +ignore-paths= + +# Files or directories matching the regular expression patterns are skipped. +# The regex matches against base names, not paths. The default value ignores +# Emacs file locks +ignore-patterns=^\.# + +# List of module names for which member attributes should not be checked +# (useful for modules/projects where namespaces are manipulated during runtime +# and thus existing member attributes cannot be deduced by static analysis). It +# supports qualified module names, as well as Unix pattern matching. +ignored-modules= + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +#init-hook= + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use, and will cap the count on Windows to +# avoid hangs. +jobs=1 + +# Control the amount of potential inferred values when inferring a single +# object. This can help the performance when dealing with large functions or +# complex, nested conditions. +limit-inference-results=100 + +# List of plugins (as comma separated values of python module names) to load, +# usually to register additional checkers. +load-plugins= + +# Pickle collected data for later comparisons. +persistent=yes + +# Minimum Python version to use for version dependent checks. Will default to +# the version used to run pylint. +py-version=3.10 + +# Discover python modules and packages in the file system subtree. +recursive=no + +# Add paths to the list of the source roots. Supports globbing patterns. The +# source root is an absolute path or a path relative to the current working +# directory used to determine a package namespace for modules located under the +# source root. +source-roots= + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode=yes + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + +# In verbose mode, extra non-checker-related info will be displayed. +#verbose= + + +[BASIC] + +# Naming style matching correct argument names. +argument-naming-style=snake_case + +# Regular expression matching correct argument names. Overrides argument- +# naming-style. If left empty, argument names will be checked with the set +# naming style. +#argument-rgx= + +# Naming style matching correct attribute names. +attr-naming-style=snake_case + +# Regular expression matching correct attribute names. Overrides attr-naming- +# style. If left empty, attribute names will be checked with the set naming +# style. +#attr-rgx= + +# Bad variable names which should always be refused, separated by a comma. +bad-names=foo, + bar, + baz, + toto, + tutu, + tata + +# Bad variable names regexes, separated by a comma. If names match any regex, +# they will always be refused +bad-names-rgxs= + +# Naming style matching correct class attribute names. +class-attribute-naming-style=any + +# Regular expression matching correct class attribute names. Overrides class- +# attribute-naming-style. If left empty, class attribute names will be checked +# with the set naming style. +#class-attribute-rgx= + +# Naming style matching correct class constant names. +class-const-naming-style=UPPER_CASE + +# Regular expression matching correct class constant names. Overrides class- +# const-naming-style. If left empty, class constant names will be checked with +# the set naming style. +#class-const-rgx= + +# Naming style matching correct class names. +class-naming-style=PascalCase + +# Regular expression matching correct class names. Overrides class-naming- +# style. If left empty, class names will be checked with the set naming style. +#class-rgx= + +# Naming style matching correct constant names. +const-naming-style=UPPER_CASE + +# Regular expression matching correct constant names. Overrides const-naming- +# style. If left empty, constant names will be checked with the set naming +# style. +#const-rgx= + +# Minimum line length for functions/classes that require docstrings, shorter +# ones are exempt. +docstring-min-length=-1 + +# Naming style matching correct function names. +function-naming-style=snake_case + +# Regular expression matching correct function names. Overrides function- +# naming-style. If left empty, function names will be checked with the set +# naming style. +#function-rgx= + +# Good variable names which should always be accepted, separated by a comma. +good-names=i, + j, + k, + ex, + Run, + _ + +# Good variable names regexes, separated by a comma. If names match any regex, +# they will always be accepted +good-names-rgxs= + +# Include a hint for the correct naming format with invalid-name. +include-naming-hint=no + +# Naming style matching correct inline iteration names. +inlinevar-naming-style=any + +# Regular expression matching correct inline iteration names. Overrides +# inlinevar-naming-style. If left empty, inline iteration names will be checked +# with the set naming style. +#inlinevar-rgx= + +# Naming style matching correct method names. +method-naming-style=snake_case + +# Regular expression matching correct method names. Overrides method-naming- +# style. If left empty, method names will be checked with the set naming style. +#method-rgx= + +# Naming style matching correct module names. +module-naming-style=snake_case + +# Regular expression matching correct module names. Overrides module-naming- +# style. If left empty, module names will be checked with the set naming style. +#module-rgx= + +# Colon-delimited sets of names that determine each other's naming style when +# the name regexes allow several styles. +name-group= + +# Regular expression which should only match function or class names that do +# not require a docstring. +no-docstring-rgx=^_ + +# List of decorators that produce properties, such as abc.abstractproperty. Add +# to this list to register other decorators that produce valid properties. +# These decorators are taken in consideration only for invalid-name. +property-classes=abc.abstractproperty + +# Regular expression matching correct type alias names. If left empty, type +# alias names will be checked with the set naming style. +#typealias-rgx= + +# Regular expression matching correct type variable names. If left empty, type +# variable names will be checked with the set naming style. +#typevar-rgx= + +# Naming style matching correct variable names. +variable-naming-style=snake_case + +# Regular expression matching correct variable names. Overrides variable- +# naming-style. If left empty, variable names will be checked with the set +# naming style. +#variable-rgx= + + +[CLASSES] + +# Warn about protected attribute access inside special methods +check-protected-access-in-special-methods=no + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__, + __new__, + setUp, + asyncSetUp, + __post_init__ + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict,_fields,_replace,_source,_make,os._exit + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=mcs + + +[DESIGN] + +# List of regular expressions of class ancestor names to ignore when counting +# public methods (see R0903) +exclude-too-few-public-methods= + +# List of qualified class names to ignore when counting class parents (see +# R0901) +ignored-parents= + +# Maximum number of arguments for function / method. +max-args=5 + +# Maximum number of attributes for a class (see R0902). +max-attributes=20 + +# Maximum number of boolean expressions in an if statement (see R0916). +max-bool-expr=5 + +# Maximum number of branch for function / method body. +max-branches=20 + +# Maximum number of locals for function / method body. +max-locals=20 + +# Maximum number of parents for a class (see R0901). +max-parents=7 + +# Maximum number of public methods for a class (see R0904). +max-public-methods=20 + +# Maximum number of return / yield for function / method body. +max-returns=6 + +# Maximum number of statements in function / method body. +max-statements=50 + +# Minimum number of public methods for a class (see R0903). +min-public-methods=2 + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when caught. +overgeneral-exceptions=builtins.BaseException,builtins.Exception + + +[FORMAT] + +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +expected-line-ending-format= + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines=^\s*(# )??$ + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren=4 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Maximum number of characters on a single line. +max-line-length=140 + +# Maximum number of lines in a module. +max-module-lines=1000 + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +single-line-class-stmt=no + +# Allow the body of an if to be on the same line as the test if there is no +# else. +single-line-if-stmt=no + + +[IMPORTS] + +# List of modules that can be imported at any level, not just the top level +# one. +allow-any-import-level= + +# Allow explicit reexports by alias from a package __init__. +allow-reexport-from-package=no + +# Allow wildcard imports from modules that define __all__. +allow-wildcard-with-all=no + +# Deprecated modules which should not be used, separated by a comma. +deprecated-modules= + +# Output a graph (.gv or any supported image format) of external dependencies +# to the given file (report RP0402 must not be disabled). +ext-import-graph= + +# Output a graph (.gv or any supported image format) of all (i.e. internal and +# external) dependencies to the given file (report RP0402 must not be +# disabled). +import-graph= + +# Output a graph (.gv or any supported image format) of internal dependencies +# to the given file (report RP0402 must not be disabled). +int-import-graph= + +# Force import order to recognize a module as part of the standard +# compatibility libraries. +known-standard-library= + +# Force import order to recognize a module as part of a third party library. +known-third-party=enchant + +# Couples of modules and preferred modules, separated by a comma. +preferred-modules= + + +[LOGGING] + +# The type of string formatting that logging methods do. `old` means using % +# formatting, `new` is for `{}` formatting. +logging-format-style=old + +# Logging modules to check that the string format arguments are in logging +# function parameter format. +logging-modules=logging + + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, CONTROL_FLOW, INFERENCE, INFERENCE_FAILURE, +# UNDEFINED. +confidence=HIGH, + CONTROL_FLOW, + INFERENCE, + INFERENCE_FAILURE, + UNDEFINED + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifiers separated by comma (,) or put this +# option multiple times (only on the command line, not in the configuration +# file where it should appear only once). You can also use "--disable=all" to +# disable everything first and then re-enable specific checks. For example, if +# you want to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use "--disable=all --enable=classes +# --disable=W". +disable=raw-checker-failed, + bad-inline-option, + locally-disabled, + file-ignored, + suppressed-message, + useless-suppression, + deprecated-pragma, + use-implicit-booleaness-not-comparison-to-string, + use-implicit-booleaness-not-comparison-to-zero, + use-symbolic-message-instead, + superfluous-parens, + no-else-return, + unused-argument, + fixme, + too-few-public-methods, + too-many-arguments, + global-statement + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where +# it should appear only once). See also the "--disable" option for examples. +enable= + + +[METHOD_ARGS] + +# List of qualified names (i.e., library.method) which require a timeout +# parameter e.g. 'requests.api.get,requests.api.post' +timeout-methods=requests.api.delete,requests.api.get,requests.api.head,requests.api.options,requests.api.patch,requests.api.post,requests.api.put,requests.api.request + + +[MISCELLANEOUS] + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME, + XXX, + TODO + +# Regular expression of note tags to take in consideration. +notes-rgx= + + +[REFACTORING] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + +# Complete name of functions that never returns. When checking for +# inconsistent-return-statements if a never returning function is called then +# it will be considered as an explicit return statement and no message will be +# printed. +never-returning-functions=sys.exit,argparse.parse_error + +# Let 'consider-using-join' be raised when the separator to join on would be +# non-empty (resulting in expected fixes of the type: ``"- " + " - +# ".join(items)``) +suggest-join-with-non-empty-separator=yes + + +[REPORTS] + +# Python expression which should return a score less than or equal to 10. You +# have access to the variables 'fatal', 'error', 'warning', 'refactor', +# 'convention', and 'info' which contain the number of messages in each +# category, as well as 'statement' which is the total number of statements +# analyzed. This score is used by the global evaluation report (RP0004). +evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)) + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details. +msg-template= + +# Set the output format. Available formats are: text, parseable, colorized, +# json2 (improved json format), json (old json format) and msvs (visual +# studio). You can also give a reporter class, e.g. +# mypackage.mymodule.MyReporterClass. +#output-format= + +# Tells whether to display a full report or only the messages. +reports=no + +# Activate the evaluation score. +score=yes + + +[SIMILARITIES] + +# Comments are removed from the similarity computation +ignore-comments=yes + +# Docstrings are removed from the similarity computation +ignore-docstrings=yes + +# Imports are removed from the similarity computation +ignore-imports=yes + +# Signatures are removed from the similarity computation +ignore-signatures=yes + +# Minimum lines number of a similarity. +min-similarity-lines=4 + + +[SPELLING] + +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions=4 + +# Spelling dictionary name. No available dictionaries : You need to install +# both the python package and the system dependency for enchant to work. +spelling-dict= + +# List of comma separated words that should be considered directives if they +# appear at the beginning of a comment and should not be checked. +spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy: + +# List of comma separated words that should not be checked. +spelling-ignore-words= + +# A path to a file that contains the private dictionary; one word per line. +spelling-private-dict-file= + +# Tells whether to store unknown words to the private dictionary (see the +# --spelling-private-dict-file option) instead of raising a message. +spelling-store-unknown-words=no + + +[STRING] + +# This flag controls whether inconsistent-quotes generates a warning when the +# character used as a quote delimiter is used inconsistently within a module. +check-quote-consistency=no + +# This flag controls whether the implicit-str-concat should generate a warning +# on implicit string concatenation in sequences defined over several lines. +check-str-concat-over-line-jumps=no + + +[TYPECHECK] + +# List of decorators that produce context managers, such as +# contextlib.contextmanager. Add to this list to register other decorators that +# produce valid context managers. +contextmanager-decorators=contextlib.contextmanager + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members= + +# Tells whether to warn about missing members when the owner of the attribute +# is inferred to be None. +ignore-none=yes + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference +# can return multiple potential results while evaluating a Python object, but +# some branches might not be evaluated, which results in partial inference. In +# that case, it might be useful to still emit no-member and other checks for +# the rest of the inferred objects. +ignore-on-opaque-inference=yes + +# List of symbolic message names to ignore for Mixin members. +ignored-checks-for-mixins=no-member, + not-async-context-manager, + not-context-manager, + attribute-defined-outside-init + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes=optparse.Values,thread._local,_thread._local,argparse.Namespace + +# Show a hint with possible names when a member name was not found. The aspect +# of finding the hint is based on edit distance. +missing-member-hint=yes + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance=1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices=1 + +# Regex pattern to define which classes are considered mixins. +mixin-class-rgx=.*[Mm]ixin + +# List of decorators that change the signature of a decorated function. +signature-mutators= + + +[VARIABLES] + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid defining new builtins when possible. +additional-builtins= + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables=yes + +# List of names allowed to shadow builtins +allowed-redefined-builtins= + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_, + _cb + +# A regular expression matching the name of dummy variables (i.e. expected to +# not be used). +dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ + +# Argument names that match this expression will be ignored. +ignored-argument-names=_.*|^ignored_|^unused_ + +# Tells whether we should check for unused import in __init__ files. +init-import=no + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io diff --git a/gattlib-py/gattlib/__init__.py b/gattlib-py/gattlib/__init__.py index a5eec079..291698be 100644 --- a/gattlib-py/gattlib/__init__.py +++ b/gattlib-py/gattlib/__init__.py @@ -4,14 +4,16 @@ # Copyright (c) 2016-2024, Olivier Martin # +"""Gattlib C types and functions""" + from ctypes import * import logging import pathlib try: # '_version.py' is generated by 'setup.py' - from ._version import __version__ -except: + from ._version import __version__ #pylint: disable=import-error +except: #pylint: disable=bare-except pass logger = logging.getLogger(__name__) @@ -24,6 +26,7 @@ gattlib = CDLL('libgattlib.so') def native_logging(level: int, string: str): + """Convert Gattlib logging to Python logging.""" if level == 3: logger.debug(string) elif level == 2: @@ -50,6 +53,7 @@ def native_logging(level: int, string: str): # uint8_t data[16]; # } uint128_t; class GattlibUuid128(Structure): + """Python class representing the C structure 'uint128_t'.""" _fields_ = [("data", c_byte * 16)] @@ -62,10 +66,12 @@ class GattlibUuid128(Structure): # } value; # } uuid_t; class GattlibUuidValue(Union): + """Python class representing the C structure of the value of 'uuid_t'.""" _fields_ = [("uuid16", c_ushort), ("uuid32", c_uint), ("uuid128", GattlibUuid128)] class GattlibUuid(Structure): + """Python class representing the C structure 'uuid_t'.""" _fields_ = [("type", c_byte), ("value", GattlibUuidValue)] @@ -75,6 +81,7 @@ class GattlibUuid(Structure): # uuid_t uuid; # } gattlib_primary_service_t; class GattlibPrimaryService(Structure): + """Python class representing the C structure 'gattlib_primary_service_t'.""" _fields_ = [("attr_handle_start", c_ushort), ("attr_handle_end", c_ushort), ("uuid", GattlibUuid)] @@ -87,6 +94,7 @@ class GattlibPrimaryService(Structure): # uuid_t uuid; # } gattlib_characteristic_t; class GattlibCharacteristic(Structure): + """Python class representing the C structure 'gattlib_characteristic_t'.""" _fields_ = [("handle", c_ushort), ("properties", c_byte), ("value_handle", c_ushort), @@ -99,6 +107,7 @@ class GattlibCharacteristic(Structure): # size_t data_length; # } gattlib_advertisement_data_t; class GattlibAdvertisementData(Structure): + """Python class representing the C structure 'gattlib_advertisement_data_t'.""" _fields_ = [("uuid", GattlibUuid), ("data", c_void_p), ("data_length", c_size_t)] @@ -109,6 +118,7 @@ class GattlibAdvertisementData(Structure): # size_t data_size; # } gattlib_manufacturer_data_t; class GattlibManufacturerData(Structure): + """Python class representing the C structure 'gattlib_manufacturer_data_t'.""" _fields_ = [("manufacturer_id", c_ushort), ("data", c_void_p), ("data_size", c_size_t)] @@ -128,7 +138,8 @@ class GattlibManufacturerData(Structure): gattlib_discovered_device_python_callback.argtypes = [c_void_p, c_char_p, c_char_p, py_object] gattlib_discovered_device_python_callback.restype = c_void_p -# void gattlib_connected_device_python_callback(gattlib_adapter_t* adapter, const char *dst, gattlib_connection_t* connection, int error, void* user_data); +# void gattlib_connected_device_python_callback(gattlib_adapter_t* adapter, const char *dst, gattlib_connection_t* connection, +# int error, void* user_data); gattlib_connected_device_python_callback = gattlib.gattlib_connected_device_python_callback gattlib_connected_device_python_callback.argtypes = [c_void_p, c_char_p, c_void_p, c_int, py_object] gattlib_connected_device_python_callback.restype = c_void_p @@ -148,10 +159,12 @@ class GattlibManufacturerData(Structure): gattlib_python_callback_args.argtypes = [py_object, py_object] gattlib_python_callback_args.restype = c_void_p -# int gattlib_adapter_scan_enable_with_filter_non_blocking(gattlib_adapter_t* adapter, uuid_t **uuid_list, int16_t rssi_threshold, uint32_t enabled_filters, +# int gattlib_adapter_scan_enable_with_filter_non_blocking(gattlib_adapter_t* adapter, uuid_t **uuid_list, +# int16_t rssi_threshold, uint32_t enabled_filters, # gattlib_discovered_device_t discovered_device_cb, size_t timeout, void *user_data) gattlib_adapter_scan_enable_with_filter_non_blocking = gattlib.gattlib_adapter_scan_enable_with_filter_non_blocking -gattlib_adapter_scan_enable_with_filter_non_blocking.argtypes = [c_void_p, POINTER(POINTER(GattlibUuid)), c_int16, c_uint32, c_void_p, c_size_t, c_void_p] +gattlib_adapter_scan_enable_with_filter_non_blocking.argtypes = [c_void_p, POINTER(POINTER(GattlibUuid)), + c_int16, c_uint32, c_void_p, c_size_t, c_void_p] # int gattlib_adapter_scan_eddystone(gattlib_adapter_t* adapter, int16_t rssi_threshold, uint32_t eddsytone_types, # gattlib_discovered_device_with_data_t discovered_device_cb, size_t timeout, void *user_data) @@ -222,13 +235,17 @@ class GattlibManufacturerData(Structure): # gattlib_advertisement_data_t **advertisement_data, size_t *advertisement_data_count, # gattlib_manufacturer_data_t** manufacturer_data, size_t* manufacturer_data_count) gattlib_get_advertisement_data = gattlib.gattlib_get_advertisement_data -gattlib_get_advertisement_data.argtypes = [c_void_p, POINTER(POINTER(GattlibAdvertisementData)), POINTER(c_size_t), POINTER(POINTER(GattlibManufacturerData)), POINTER(c_size_t)] +gattlib_get_advertisement_data.argtypes = [c_void_p, + POINTER(POINTER(GattlibAdvertisementData)), POINTER(c_size_t), + POINTER(POINTER(GattlibManufacturerData)), POINTER(c_size_t)] # int gattlib_get_advertisement_data_from_mac(gattlib_adapter_t* adapter, const char *mac_address, # gattlib_advertisement_data_t **advertisement_data, size_t *advertisement_data_length, # gattlib_manufacturer_data_t** manufacturer_data, size_t* manufacturer_data_count) gattlib_get_advertisement_data_from_mac = gattlib.gattlib_get_advertisement_data_from_mac -gattlib_get_advertisement_data_from_mac.argtypes = [c_void_p, c_char_p, POINTER(POINTER(GattlibAdvertisementData)), POINTER(c_size_t), POINTER(POINTER(GattlibManufacturerData)), POINTER(c_size_t)] +gattlib_get_advertisement_data_from_mac.argtypes = [c_void_p, c_char_p, + POINTER(POINTER(GattlibAdvertisementData)), POINTER(c_size_t), + POINTER(POINTER(GattlibManufacturerData)), POINTER(c_size_t)] # int gattlib_mainloop_python(PyObject *handler, PyObject *user_data) gattlib_mainloop = gattlib.gattlib_mainloop_python diff --git a/gattlib-py/gattlib/adapter.py b/gattlib-py/gattlib/adapter.py index 3da984ad..5d5d6500 100644 --- a/gattlib-py/gattlib/adapter.py +++ b/gattlib-py/gattlib/adapter.py @@ -4,13 +4,15 @@ # Copyright (c) 2016-2024, Olivier Martin # +"""Gattlib Adapter API""" + import threading from uuid import UUID -from gattlib import * +from gattlib import * #pylint: disable=wildcard-import,unused-wildcard-import from .device import Device from .exception import handle_return -from .uuid import gattlib_uuid_to_int +from .helpers import convert_gattlib_advertisement_c_data_to_dict GATTLIB_DISCOVER_FILTER_USE_UUID = (1 << 0) GATTLIB_DISCOVER_FILTER_USE_RSSI = (1 << 1) @@ -38,12 +40,14 @@ class Adapter: - + """Bluetooth adapter.""" def __init__(self, name=c_char_p(None)): self._name = name self._adapter = c_void_p(None) self._is_opened = False # Note: 'self._adapter != c_void_p(None)' does not seem to return the expected result self._lock = threading.Lock() + self._on_discovered_device_callback = None + self._on_discovered_device_user_callback = None def __str__(self) -> str: if self._name: @@ -53,20 +57,20 @@ def __str__(self) -> str: @property def name(self): + """Return adapter name.""" return self._name - @staticmethod - def list(): - # TODO: Add support - return [] + #@staticmethod + #def list(): + # # TODO: Add support + # return [] def open(self): - self._lock.acquire() - if self._is_opened: - self._lock.release() - return + """Open adapter.""" + with self._lock: + if self._is_opened: + return - try: self._adapter = c_void_p(None) ret = gattlib_adapter_open(self._name, byref(self._adapter)) if ret == 0: @@ -75,27 +79,24 @@ def open(self): self._name = gattlib_adapter_get_name(self._adapter) else: handle_return(ret) - finally: - self._lock.release() def close(self): - self._lock.acquire() - try: + """Close adapter.""" + with self._lock: if self._adapter: ret = gattlib.gattlib_adapter_close(self._adapter) handle_return(ret) self._is_opened = False self._adapter = None - finally: - self._lock.release() # Use a closure to return a method that can be called by the C-library (see: https://stackoverflow.com/a/7261524/6267288) def get_on_discovered_device_callback(self): + """Return a callback for newly discovered device.""" def on_discovered_device(adapter, addr, name, user_data): try: device = Device(self, addr, name) - self.on_discovered_device_user_callback(device, user_data) - except Exception as e: + self._on_discovered_device_user_callback(device, user_data) + except Exception as e: #pylint: disable=broad-exception-caught logger.exception(e) return on_discovered_device @@ -115,11 +116,11 @@ def scan_enable(self, on_discovered_device_callback, timeout, notify_change=Fals @param timeout: defines the duration of the Bluetooth scanning. When timeout=None or 0, we scan indefinitely. @param user_data: is the data passed to the callback `discovered_device_cb()` """ - assert on_discovered_device_callback != None - self.on_discovered_device_user_callback = on_discovered_device_callback - # Save callback to prevent it to be cleaned by garbage collector see - # comment: https://stackoverflow.com/questions/7259794/how-can-i-get-methods-to-work-as-callbacks-with-python-ctypes#comment38658391_7261524 - self.on_discovered_device_callback = self.get_on_discovered_device_callback() + assert on_discovered_device_callback is not None + self._on_discovered_device_user_callback = on_discovered_device_callback + # Save callback to prevent it to be cleaned by garbage collector see comment: + # https://stackoverflow.com/questions/7259794/how-can-i-get-methods-to-work-as-callbacks-with-python-ctypes#comment38658391_7261524 + self._on_discovered_device_callback = self.get_on_discovered_device_callback() # Ensure BLE adapter it opened if not self._is_opened: @@ -164,41 +165,23 @@ def scan_enable(self, on_discovered_device_callback, timeout, notify_change=Fals uuid_list, rssi, enabled_filters, gattlib_discovered_device_python_callback, timeout, - gattlib_python_callback_args(self.on_discovered_device_callback, user_data)) + gattlib_python_callback_args(self._on_discovered_device_callback, user_data)) handle_return(ret) @staticmethod def on_discovered_ble_device_with_details(adapter, mac_addr, name, advertisement_data_buffer, advertisement_data_count, - manufacturer_id, manufacturer_data_buffer, manufacturer_data_size, + manufacturer_data_buffer, manufacturer_data_count, user_data): - advertisement_data = {} - manufacturer_data = None - - for i in range(0, advertisement_data_count): - service_data = advertisement_data_buffer[i] - uuid = gattlib_uuid_to_int(service_data.uuid) - - pointer_type = POINTER(c_byte * service_data.data_length) - c_bytearray = cast(service_data.data, pointer_type) - - data = bytearray(service_data.data_length) - for i in range(service_data.data_length): - data[i] = c_bytearray.contents[i] & 0xFF - - advertisement_data[uuid] = data - - if manufacturer_data_size > 0: - pointer_type = POINTER(c_byte * manufacturer_data_size) - c_bytearray = cast(manufacturer_data_buffer, pointer_type) - - manufacturer_data = bytearray(manufacturer_data_size) - for i in range(manufacturer_data_size): - manufacturer_data[i] = c_bytearray.contents[i] & 0xFF + """Callback invoked when a new device has been discovered.""" + advertisement_data, manufacturer_data = convert_gattlib_advertisement_c_data_to_dict( + advertisement_data_buffer, advertisement_data_count, + manufacturer_data_buffer, manufacturer_data_count) device = Device(user_data["adapter"], mac_addr, name) - user_data["callback"](device, advertisement_data, manufacturer_id, manufacturer_data, user_data["user_data"]) + user_data["callback"](device, advertisement_data, manufacturer_data, user_data["user_data"]) def scan_eddystone_enable(self, on_discovered_device_callback, eddystone_filters, timeout, rssi_threshold=None, user_data=None): + """Enable BLE scan for Eddystone devices.""" # Ensure BLE adapter it opened if not self._is_opened: self.open() @@ -221,10 +204,16 @@ def scan_eddystone_enable(self, on_discovered_device_callback, eddystone_filters handle_return(ret) def scan_disable(self): + """Disable BLE scan.""" ret = gattlib.gattlib_adapter_scan_disable(self._adapter) handle_return(ret) def get_rssi_from_mac(self, mac_address): + """ + Return RSSI of a device defined by its MAC address. + + Note: The RSSI is 0 when the device is connected. + """ if isinstance(mac_address, str): mac_address = mac_address.encode("utf-8") @@ -233,50 +222,19 @@ def get_rssi_from_mac(self, mac_address): return rssi.value def gattlib_get_advertisement_data_from_mac(self, mac_address): + """Return advertisement and manufacturer data of the device.""" if isinstance(mac_address, str): mac_address = mac_address.encode("utf-8") _advertisement_data = POINTER(GattlibAdvertisementData)() _advertisement_data_count = c_size_t(0) _manufacturer_data = POINTER(GattlibManufacturerData)() - _manufacturer_data_len = c_size_t(0) + _manufacturer_data_count = c_size_t(0) ret = gattlib_get_advertisement_data_from_mac(self._adapter, mac_address, byref(_advertisement_data), byref(_advertisement_data_count), - byref(_manufacturer_data), byref(_manufacturer_data_len)) + byref(_manufacturer_data), byref(_manufacturer_data_count)) handle_return(ret) - advertisement_data = {} - manufacturer_data = {} - - for i in range(0, _advertisement_data_count.value): - service_data = _advertisement_data[i] - uuid = gattlib_uuid_to_int(service_data.uuid) - - pointer_type = POINTER(c_byte * service_data.data_length) - c_bytearray = cast(service_data.data, pointer_type) - - data = bytearray(service_data.data_length) - for i in range(service_data.data_length): - data[i] = c_bytearray.contents[i] & 0xFF - - advertisement_data[uuid] = data - - for i in range(0, _manufacturer_data_len.value): - _manufacturer_data = _manufacturer_data[i] - - pointer_type = POINTER(c_byte * _manufacturer_data.data_size.value) - c_bytearray = cast(_manufacturer_data.data, pointer_type) - - data = bytearray(_manufacturer_data.data_size.value) - for j in range(_manufacturer_data.data_size.value): - data[j] = c_bytearray.contents[j] & 0xFF - - manufacturer_data[_manufacturer_data.manufacturer_id] = data - - gattlib_free_mem(_manufacturer_data.data) - - gattlib_free_mem(_advertisement_data) - gattlib_free_mem(_manufacturer_data) - - return advertisement_data, manufacturer_data + return convert_gattlib_advertisement_c_data_to_dict( + _advertisement_data, _advertisement_data_count, _manufacturer_data, _manufacturer_data_count) diff --git a/gattlib-py/gattlib/device.py b/gattlib-py/gattlib/device.py index 161c21e3..8d207917 100644 --- a/gattlib-py/gattlib/device.py +++ b/gattlib-py/gattlib/device.py @@ -4,16 +4,17 @@ # Copyright (c) 2016-2024, Olivier Martin # +"""Gattlib Device API""" + from __future__ import annotations -import logging import uuid import threading from typing import TYPE_CHECKING -from gattlib import * -from .exception import handle_return, DeviceError, InvalidParameter +from gattlib import * #pylint: disable=wildcard-import,unused-wildcard-import +from .exception import handle_return, InvalidParameter from .gatt import GattService, GattCharacteristic -from .uuid import gattlib_uuid_to_int +from .helpers import convert_gattlib_advertisement_c_data_to_dict if TYPE_CHECKING: from .adapter import Adapter @@ -31,10 +32,10 @@ class Device: - + """GATT device""" def __init__(self, adapter: Adapter, addr: str, name: str = None): self._adapter = adapter - if type(addr) == str: + if isinstance(addr, str): self._addr = addr.encode("utf-8") else: self._addr = addr @@ -45,8 +46,12 @@ def __init__(self, adapter: Adapter, addr: str, name: str = None): # We use a lock on disconnection to ensure the memory is safely freed self._disconnection_lock = threading.Lock() + self._services: dict[int, GattService] = {} + self._characteristics: dict[int, GattCharacteristic] = {} + self.on_connection_callback = None self.on_connection_error_callback = None + self.disconnection_callback = None # Keep track if notification handler has been initialized self._is_notification_init = False @@ -65,6 +70,7 @@ def mac_address(self) -> str: @property def connection(self): + """Return Gattlib connection C handle.""" if self._connection: return self._connection else: @@ -72,9 +78,11 @@ def connection(self): @property def is_connected(self) -> bool: + """Return True if the device is connected.""" return (self._connection is not None) def connect(self, options=CONNECTION_OPTIONS_LEGACY_DEFAULT): + """Connect the device.""" def _on_connection(adapter: c_void_p, mac_address: c_char_p, connection: c_void_p, error: c_int, user_data: py_object): if error: self._connection = None @@ -86,7 +94,7 @@ def _on_connection(adapter: c_void_p, mac_address: c_char_p, connection: c_void_ if self._adapter is None: adapter = None else: - adapter = self._adapter._adapter + adapter = self._adapter._adapter #pylint: disable=protected-access ret = gattlib_connect(adapter, self._addr, options, gattlib_connected_device_python_callback, @@ -94,16 +102,19 @@ def _on_connection(adapter: c_void_p, mac_address: c_char_p, connection: c_void_ handle_return(ret) def on_connection(self, user_data: py_object): - if self.on_connection_callback: - self.on_connection_callback(self, user_data) + """Method called on device connection.""" + if callable(self.on_connection_callback): + self.on_connection_callback(self, user_data) #pylint: disable=not-callable def on_connection_error(self, error: c_int, user_data: py_object): + """Method called on device connection error.""" logger.error("Failed to connect due to error '0x%x'", error) - if self.on_connection_error_callback: - self.on_connection_error_callback(self, error, user_data) + if callable(self.on_connection_error_callback): + self.on_connection_error_callback(self, error, user_data) #pylint: disable=not-callable @property def rssi(self): + """Return connection RSSI.""" _rssi = c_int16(0) if self._connection: ret = gattlib_get_rssi(self._connection, byref(_rssi)) @@ -113,56 +124,50 @@ def rssi(self): return self._adapter.get_rssi_from_mac(self._addr) def register_on_disconnect(self, callback, user_data=None): + """Register disconnection callback.""" self.disconnection_callback = callback def on_disconnection(user_data): - self._disconnection_lock.acquire() - - if self.disconnection_callback: - self.disconnection_callback() - - # On disconnection, we do not need the list of GATT services and GATT characteristics - if self._services_ptr: - gattlib_free_mem(self._services_ptr) - self._services_ptr = None - if self._characteristics_ptr: - gattlib_free_mem(self._characteristics_ptr) - self._characteristics_ptr = None - - # Reset the connection handler - self._connection = None - - self._disconnection_lock.release() + with self._disconnection_lock: + if self.disconnection_callback: + self.disconnection_callback() + + # On disconnection, we do not need the list of GATT services and GATT characteristics + if self._services_ptr: + gattlib_free_mem(self._services_ptr) + self._services_ptr = None + if self._characteristics_ptr: + gattlib_free_mem(self._characteristics_ptr) + self._characteristics_ptr = None + + # Reset the connection handler + self._connection = None gattlib_register_on_disconnect(self.connection, gattlib_disconnected_device_python_callback, gattlib_python_callback_args(on_disconnection, user_data)) def disconnect(self, wait_disconnection: bool = False): - self._connection_lock.acquire() - try: + """Disconnect connected device.""" + with self._connection_lock: if self._connection: ret = gattlib_disconnect(self.connection, wait_disconnection) handle_return(ret) self._connection = None - finally: - self._connection_lock.release() def discover(self): - # - # Discover GATT Services - # + """Discover GATT Services.""" self._services_ptr = POINTER(GattlibPrimaryService)() - _services_count = c_int(0) - ret = gattlib_discover_primary(self.connection, byref(self._services_ptr), byref(_services_count)) + services_count = c_int(0) + ret = gattlib_discover_primary(self.connection, byref(self._services_ptr), byref(services_count)) handle_return(ret) self._services = {} - for i in range(0, _services_count.value): + for i in range(0, services_count.value): service = GattService(self, self._services_ptr[i]) self._services[service.short_uuid] = service - logger.debug("Service UUID:0x%x" % service.short_uuid) + logger.debug("Service UUID:0x%x", service.short_uuid) # # Discover GATT Characteristics @@ -177,62 +182,33 @@ def discover(self): characteristic = GattCharacteristic(self, self._characteristics_ptr[i]) self._characteristics[characteristic.short_uuid] = characteristic - logger.debug("Characteristic UUID:0x%x" % characteristic.short_uuid) + logger.debug("Characteristic UUID:0x%x", characteristic.short_uuid) def get_advertisement_data(self): - _advertisement_data = POINTER(GattlibAdvertisementData)() - _advertisement_data_count = c_size_t(0) - _manufacturer_data = POINTER(GattlibManufacturerData)() - _manufacturer_data_len = c_size_t(0) + """Return advertisement and manufacturer data of the device.""" + advertisement_data = POINTER(GattlibAdvertisementData)() + advertisement_data_count = c_size_t(0) + manufacturer_data = POINTER(GattlibManufacturerData)() + manufacturer_data_count = c_size_t(0) if self._connection is None: - ret = gattlib_get_advertisement_data_from_mac(self._adapter._adapter, self._addr, - byref(_advertisement_data), byref(_advertisement_data_count), - byref(_manufacturer_data), byref(_manufacturer_data_len)) + ret = gattlib_get_advertisement_data_from_mac(self._adapter._adapter, self._addr, #pylint: disable=protected-access + byref(advertisement_data), byref(advertisement_data_count), + byref(manufacturer_data), byref(manufacturer_data_count)) else: ret = gattlib_get_advertisement_data(self._connection, - byref(_advertisement_data), byref(_advertisement_data_count), - byref(_manufacturer_data), byref(_manufacturer_data_len)) + byref(advertisement_data), byref(advertisement_data_count), + byref(manufacturer_data), byref(manufacturer_data_count)) handle_return(ret) - advertisement_data = {} - manufacturer_data = {} - - for i in range(0, _advertisement_data_count.value): - service_data = _advertisement_data[i] - uuid = gattlib_uuid_to_int(service_data.uuid) - - pointer_type = POINTER(c_byte * service_data.data_length) - c_bytearray = cast(service_data.data, pointer_type) - - data = bytearray(service_data.data_length) - for i in range(service_data.data_length): - data[i] = c_bytearray.contents[i] & 0xFF - - advertisement_data[uuid] = data - - for i in range(0, _manufacturer_data_len.value): - _manufacturer_data = _manufacturer_data[i] - - pointer_type = POINTER(c_byte * _manufacturer_data.data_size.value) - c_bytearray = cast(_manufacturer_data.data, pointer_type) - - data = bytearray(_manufacturer_data.data_size.value) - for j in range(_manufacturer_data.data_size.value): - data[j] = c_bytearray.contents[j] & 0xFF - - manufacturer_data[_manufacturer_data.manufacturer_id] = data - - gattlib_free_mem(_manufacturer_data.data) - - gattlib_free_mem(_advertisement_data) - gattlib_free_mem(_manufacturer_data) - - return advertisement_data, manufacturer_data + return convert_gattlib_advertisement_c_data_to_dict( #pylint: disable=protected-access + advertisement_data, advertisement_data_count, + manufacturer_data, manufacturer_data_count) @property - def services(self): + def services(self) -> dict[int, GattService]: + """Return a GATT Service dictionary - the GATT UUID being the key.""" if not hasattr(self, '_services'): logger.warning("Start GATT discovery implicitly") self.discover() @@ -240,7 +216,8 @@ def services(self): return self._services @property - def characteristics(self): + def characteristics(self) -> dict[int, GattCharacteristic]: + """Return a GATT Characteristic dictionary - the GATT UUID being the key.""" if not hasattr(self, '_characteristics'): logger.warning("Start GATT discovery implicitly") self.discover() @@ -248,16 +225,17 @@ def characteristics(self): return self._characteristics @staticmethod - def notification_callback(uuid_str, data, data_len, user_data): + def _notification_callback(uuid_str, data, data_len, user_data): + """Helper method to call back characteristic callback.""" this = user_data notification_uuid = uuid.UUID(uuid_str) short_uuid = notification_uuid.int - if short_uuid not in this._gatt_characteristic_callbacks: + if short_uuid not in this._gatt_characteristic_callbacks: #pylint: disable=protected-access raise RuntimeError("UUID '%s' is expected to be part of the notification list") - else: - characteristic_callback = this._gatt_characteristic_callbacks[short_uuid] + + characteristic_callback = this._gatt_characteristic_callbacks[short_uuid] #pylint: disable=protected-access # value = bytearray(data_len) # for i in range(data_len): @@ -281,7 +259,7 @@ def _notification_init(self): gattlib_register_notification(self._connection, gattlib_notification_device_python_callback, - gattlib_python_callback_args(Device.notification_callback, self)) + gattlib_python_callback_args(Device._notification_callback, self)) def _notification_add_gatt_characteristic_callback(self, gatt_characteristic, callback, user_data): if not callable(callback): diff --git a/gattlib-py/gattlib/exception.py b/gattlib-py/gattlib/exception.py index 7e491d24..6c2ad481 100644 --- a/gattlib-py/gattlib/exception.py +++ b/gattlib-py/gattlib/exception.py @@ -4,6 +4,8 @@ # Copyright (c) 2016-2024, Olivier Martin # +"""Gattlib Exceptions""" + GATTLIB_SUCCESS = 0 GATTLIB_INVALID_PARAMETER = 1 GATTLIB_NOT_FOUND = 2 @@ -25,42 +27,43 @@ class GattlibException(Exception): - pass + """Generic Gattlib exception.""" class NoAdapter(GattlibException): - pass + """Gattlib exception raised when no adapter is present.""" class Busy(GattlibException): - pass + """Gattlib busy exception.""" class Unexpected(GattlibException): - pass + """Gattlib unexpected exception.""" class AdapterNotOpened(GattlibException): - pass + """Gattlib exception raised when adapter is not opened yet.""" class InvalidParameter(GattlibException): - pass + """Gattlib invalid parameter exception.""" class NotFound(GattlibException): - pass + """Gattlib not found exception.""" class OutOfMemory(GattlibException): - pass + """Gattlib out of memory exception.""" class NotSupported(GattlibException): - pass + """Gattlib not supported exception.""" class NotConnected(GattlibException): - pass + """Gattlib exception raised when device is not connected.""" class AdapterClose(GattlibException): - pass + """Gattlib exception raised when the adapter is closed.""" class Disconnected(GattlibException): - pass + """Gattlib exception raised when the device is disconnected.""" class DeviceError(GattlibException): + """Gattlib device exception.""" def __init__(self, adapter: str = None, mac_address: str = None) -> None: self.adapter = adapter self.mac_address = mac_address @@ -69,48 +72,50 @@ def __str__(self) -> str: return f"Error with device {self.mac_address} on adapter {self.adapter}" class DBusError(GattlibException): + """Gattlib DBUS exception.""" def __init__(self, domain: int, code: int) -> None: self.domain = domain self.code = code def __str__(self) -> str: if self.domain == 238 and self.code == 60964: - return f"DBus Error: le-connection-abort-by-local" + return "DBus Error: le-connection-abort-by-local" elif self.domain == 238 and self.code == 60952: - return f"DBus Error: Timeout was reached" + return "DBus Error: Timeout was reached" elif self.domain == 238 and self.code == 60964: - return f"DBus Error: Timeout was reached" + return "DBus Error: Timeout was reached" else: return f"DBus Error domain={self.domain},code={self.code}" def handle_return(ret): + """Function to convert gattlib error to Python exception.""" if ret == GATTLIB_INVALID_PARAMETER: raise InvalidParameter() - elif ret == GATTLIB_NOT_FOUND: + if ret == GATTLIB_NOT_FOUND: raise NotFound() - elif ret == GATTLIB_OUT_OF_MEMORY: + if ret == GATTLIB_OUT_OF_MEMORY: raise OutOfMemory() - elif ret == GATTLIB_TIMEOUT: + if ret == GATTLIB_TIMEOUT: raise TimeoutError() - elif ret == GATTLIB_NOT_SUPPORTED: + if ret == GATTLIB_NOT_SUPPORTED: raise NotSupported() - elif ret == GATTLIB_DEVICE_ERROR: + if ret == GATTLIB_DEVICE_ERROR: raise DeviceError() - elif ret == GATTLIB_DEVICE_NOT_CONNECTED: + if ret == GATTLIB_DEVICE_NOT_CONNECTED: raise NotConnected() - elif ret == GATTLIB_NO_ADAPTER: + if ret == GATTLIB_NO_ADAPTER: raise NoAdapter() - elif ret == GATTLIB_BUSY: + if ret == GATTLIB_BUSY: raise Busy() - elif ret == GATTLIB_UNEXPECTED: + if ret == GATTLIB_UNEXPECTED: raise Unexpected() - elif ret == GATTLIB_ADAPTER_CLOSE: + if ret == GATTLIB_ADAPTER_CLOSE: raise AdapterClose() - elif ret == GATTLIB_DEVICE_DISCONNECTED: + if ret == GATTLIB_DEVICE_DISCONNECTED: raise Disconnected() - elif (ret & GATTLIB_ERROR_MODULE_MASK) == GATTLIB_ERROR_DBUS: + if (ret & GATTLIB_ERROR_MODULE_MASK) == GATTLIB_ERROR_DBUS: raise DBusError((ret >> 8) & 0xFFF, ret & 0xFFFF) - elif ret == -22: # From '-EINVAL' + if ret == -22: # From '-EINVAL' raise ValueError("Gattlib value error") - elif ret != 0: - raise RuntimeError("Gattlib exception %d" % ret) + if ret != 0: + raise RuntimeError(f"Gattlib exception {ret}") diff --git a/gattlib-py/gattlib/gatt.py b/gattlib-py/gattlib/gatt.py index 63dac3c7..6b6512bc 100644 --- a/gattlib-py/gattlib/gatt.py +++ b/gattlib-py/gattlib/gatt.py @@ -4,23 +4,29 @@ # Copyright (c) 2016-2024, Olivier Martin # -from gattlib import * +"""Module for GATT Service, Characteristic and Stream.""" + +from uuid import UUID + +from gattlib import * #pylint: disable=wildcard-import,unused-wildcard-import from .uuid import gattlib_uuid_to_uuid, gattlib_uuid_to_int from .exception import handle_return, InvalidParameter class GattStream(): - + """GATT Stream class.""" def __init__(self, fd, mtu): self._fd = fd self._mtu = mtu @property def mtu(self): + """Return connection MTU.""" # Remove ATT Header (3 bytes) return self._mtu - 3 def write(self, data, mtu=None): + """Write data to GATT stream.""" if mtu is None: mtu = self.mtu @@ -35,44 +41,51 @@ def write(self, data, mtu=None): gattlib.gattlib_write_char_stream_write(self._fd, buffer_type.from_buffer_copy(buffer), buffer_len) def close(self): + """Close GATT stream.""" gattlib.gattlib_write_char_stream_close(self._fd) class GattService(): - + """GATT Service class.""" def __init__(self, device, gattlib_primary_service): self._device = device self._gattlib_primary_service = gattlib_primary_service @property - def uuid(self): + def uuid(self) -> UUID: + """Return GATT service UUID""" return gattlib_uuid_to_uuid(self._gattlib_primary_service.uuid) @property - def short_uuid(self): + def short_uuid(self) -> int: + """Return GATT service short UUID""" return gattlib_uuid_to_int(self._gattlib_primary_service.uuid) class GattCharacteristic(): - + """GATT Characteristic class.""" def __init__(self, device, gattlib_characteristic): self._device = device self._gattlib_characteristic = gattlib_characteristic @property - def uuid(self): + def uuid(self) -> UUID: + """Read UUID characteristic.""" return gattlib_uuid_to_uuid(self._gattlib_characteristic.uuid) @property def short_uuid(self): + """Return GATT characteristic short UUID""" return gattlib_uuid_to_int(self._gattlib_characteristic.uuid) @property def connection(self): + """Return Gattlib connection C handle.""" return self._device.connection def read(self, callback=None): - if callback: + """Read GATT characteristic.""" + if callback: #pylint: disable=no-else-raise raise NotImplementedError() else: _buffer = c_void_p(None) @@ -92,6 +105,7 @@ def read(self, callback=None): return value def write(self, data, without_response=False): + """Write data to GATT characteristic.""" if not isinstance(data, bytes) and not isinstance(data, bytearray): raise TypeError("Data must be of bytes type to know its size.") @@ -100,12 +114,17 @@ def write(self, data, without_response=False): buffer_len = len(data) if without_response: - ret = gattlib_write_without_response_char_by_uuid(self.connection, self._gattlib_characteristic.uuid, buffer_type.from_buffer_copy(buffer), buffer_len) + ret = gattlib_write_without_response_char_by_uuid(self.connection, + self._gattlib_characteristic.uuid, + buffer_type.from_buffer_copy(buffer), buffer_len) else: - ret = gattlib_write_char_by_uuid(self.connection, self._gattlib_characteristic.uuid, buffer_type.from_buffer_copy(buffer), buffer_len) + ret = gattlib_write_char_by_uuid(self.connection, + self._gattlib_characteristic.uuid, + buffer_type.from_buffer_copy(buffer), buffer_len) handle_return(ret) def stream_open(self): + """Open GATT stream from GATT characteristic.""" _stream = c_void_p(None) _mtu = c_uint16(0) @@ -115,20 +134,24 @@ def stream_open(self): return GattStream(_stream, _mtu.value) def register_notification(self, callback, user_data=None): + """Register callback for notification on this GATT characteristic.""" if not callable(callback): raise InvalidParameter("Notification callback is not callable.") - self._device._notification_add_gatt_characteristic_callback(self, callback, user_data) + self._device._notification_add_gatt_characteristic_callback(self, callback, user_data) #pylint: disable=protected-access def unregister_notification(self): - self._device._notification_remove_gatt_characteristic_callback(self) + """Unregister all notification callbacks.""" + self._device._notification_remove_gatt_characteristic_callback(self) #pylint: disable=protected-access def notification_start(self): + """Start GATT notification.""" ret = gattlib_notification_start(self.connection, self._gattlib_characteristic.uuid) handle_return(ret) def notification_stop(self): - """ Could raise gattlib.exception.NotFound if notification has not been registered""" + """Stop GATT notification.""" + # Could raise gattlib.exception.NotFound if notification has not been registered ret = gattlib_notification_stop(self.connection, self._gattlib_characteristic.uuid) handle_return(ret) diff --git a/gattlib-py/gattlib/helpers.py b/gattlib-py/gattlib/helpers.py new file mode 100644 index 00000000..e075a213 --- /dev/null +++ b/gattlib-py/gattlib/helpers.py @@ -0,0 +1,49 @@ +# +# SPDX-License-Identifier: BSD-3-Clause +# +# Copyright (c) 2024, Olivier Martin +# + +"""Module for helper functions for Gattlib module.""" + +from gattlib import * #pylint: disable=wildcard-import,unused-wildcard-import +from .uuid import gattlib_uuid_to_int + +def convert_gattlib_advertisement_c_data_to_dict(advertisement_c_data, advertisement_c_data_count, + manufacturer_c_data, manufacturer_c_data_count): + """Helper function to convert advertisement and manufacturer c-data to Python dictionary""" + advertisement_data = {} + manufacturer_data = {} + + for i in range(0, advertisement_c_data_count.value): + service_data = advertisement_c_data[i] + uuid = gattlib_uuid_to_int(service_data.uuid) + + pointer_type = POINTER(c_byte * service_data.data_length) + c_bytearray = cast(service_data.data, pointer_type) + + data = bytearray(service_data.data_length) + for i in range(service_data.data_length): + data[i] = c_bytearray.contents[i] & 0xFF + + advertisement_data[uuid] = data + gattlib_free_mem(service_data.data) + + for i in range(0, manufacturer_c_data_count.value): + _manufacturer_c_data = manufacturer_c_data[i] + + pointer_type = POINTER(c_byte * _manufacturer_c_data.data_size.value) + c_bytearray = cast(_manufacturer_c_data.data, pointer_type) + + data = bytearray(_manufacturer_c_data.data_size.value) + for j in range(_manufacturer_c_data.data_size.value): + data[j] = c_bytearray.contents[j] & 0xFF + + manufacturer_data[_manufacturer_c_data.manufacturer_id] = data + + gattlib_free_mem(_manufacturer_c_data.data) + + gattlib_free_mem(advertisement_c_data) + gattlib_free_mem(manufacturer_c_data) + + return advertisement_data, manufacturer_data diff --git a/gattlib-py/gattlib/mainloop.py b/gattlib-py/gattlib/mainloop.py index 961fc95c..450449ad 100644 --- a/gattlib-py/gattlib/mainloop.py +++ b/gattlib-py/gattlib/mainloop.py @@ -4,6 +4,8 @@ # Copyright (c) 2016-2024, Olivier Martin # +"""Module for exposing main loop for Gattlib execution.""" + import threading import time import traceback @@ -19,7 +21,7 @@ def _user_thread_main(task): """Main entry point for the thread that will run user's code.""" - global gobject_mainloop, task_returned_code, task_exception + global task_returned_code, task_exception try: # Wait for GLib main loop to start running before starting user code. @@ -32,7 +34,7 @@ def _user_thread_main(task): # Run user's code. task_returned_code = task() - except Exception as ex: + except Exception as ex: #pylint: disable=broad-except logger.error("Exception in %s: %s: %s", task, type(ex), str(ex)) traceback.print_exception(type(ex), ex, ex.__traceback__) task_exception = ex @@ -40,7 +42,12 @@ def _user_thread_main(task): gobject_mainloop.quit() def run_mainloop_with(task): - global gobject_mainloop, task_returned_code, task_exception + """ + Run main loop with the given task. + + The main loop ends when the task has completed. + """ + global gobject_mainloop if gobject_mainloop: raise RuntimeError("A mainloop is already running") diff --git a/gattlib-py/gattlib/uuid.py b/gattlib-py/gattlib/uuid.py index b1411ff6..faeabfd0 100644 --- a/gattlib-py/gattlib/uuid.py +++ b/gattlib-py/gattlib/uuid.py @@ -4,19 +4,22 @@ # Copyright (c) 2016-2024, Olivier Martin # +"""Module to manipulate Gattlib UUID in Python environment.""" + import re from uuid import UUID -from gattlib import * +from gattlib import * #pylint: disable=wildcard-import,unused-wildcard-import SDP_UUID16 = 0x19 SDP_UUID32 = 0x1A SDP_UUID128 = 0x1C -GATT_STANDARD_UUID_FORMAT = re.compile("(\S+)-0000-1000-8000-00805f9b34fb", flags=re.IGNORECASE) +GATT_STANDARD_UUID_FORMAT = re.compile(r"(\S+)-0000-1000-8000-00805f9b34fb", flags=re.IGNORECASE) -def gattlib_uuid_to_uuid(gattlib_uuid): +def gattlib_uuid_to_uuid(gattlib_uuid) -> UUID: + """Convert Gattlib UUID to Python UUID""" if gattlib_uuid.type == SDP_UUID16: return UUID(fields=(gattlib_uuid.value.uuid16, 0x0000, 0x1000, 0x80, 0x00, 0x00805f9b34fb)) elif gattlib_uuid.type == SDP_UUID32: @@ -25,10 +28,11 @@ def gattlib_uuid_to_uuid(gattlib_uuid): data = bytes(gattlib_uuid.value.uuid128.data) return UUID(bytes=data) else: - return ValueError("Gattlib UUID not recognized (type:0x%x)" % gattlib_uuid.type) + return ValueError(f"Gattlib UUID not recognized (type:0x{gattlib_uuid.type:02x})") -def gattlib_uuid_to_int(gattlib_uuid): +def gattlib_uuid_to_int(gattlib_uuid) -> int: + """Convert Gattlib UUID to integer.""" if gattlib_uuid.type == SDP_UUID16: return gattlib_uuid.value.uuid16 elif gattlib_uuid.type == SDP_UUID32: @@ -37,10 +41,11 @@ def gattlib_uuid_to_int(gattlib_uuid): data = bytes(gattlib_uuid.value.uuid128.data) return int.from_bytes(data, byteorder='big') else: - return ValueError("Gattlib UUID not recognized (type:0x%x)" % gattlib_uuid.type) + return ValueError(f"Gattlib UUID not recognized (type:0x{gattlib_uuid.type:02x})") def gattlib_uuid_str_to_int(uuid_str: str) -> int: + """Convert uuid string to integer""" # Check if the string could already encode a UUID16 or UUID32 if len(uuid_str) <= 8: return int(uuid_str, 16) diff --git a/include/gattlib.h b/include/gattlib.h index 2814f831..90c7c0a6 100644 --- a/include/gattlib.h +++ b/include/gattlib.h @@ -679,8 +679,8 @@ int gattlib_get_rssi(gattlib_connection_t *connection, int16_t *rssi); /** * @brief Function to retrieve RSSI from a MAC Address * - * @note: This function is mainly used before a connection is established. Once the connection - * established, the function `gattlib_get_rssi()` should be preferred. + * @note: This function must be used before a connection is established. Once the connection + * established, the function will return a null RSSI. * * @param adapter is the adapter the new device has been seen * @param mac_address is the MAC address of the device to get the RSSI