diff --git a/netsome/_converters/bgp.py b/netsome/_converters/bgp.py index d50548d..f52c060 100644 --- a/netsome/_converters/bgp.py +++ b/netsome/_converters/bgp.py @@ -2,32 +2,33 @@ def asdotplus_to_asplain(string: str) -> int: - high_order, low_order = map(int, string.split(c.DOT, maxsplit=1)) - return high_order * c.TWO_BYTES + low_order + high_order, low_order = map(int, string.split(c.DELIMITERS.DOT, maxsplit=1)) + return high_order * c.BYTES.TWO + low_order def asdot_to_asplain(string: str) -> int: - if c.DOT in string: + if c.DELIMITERS.DOT in string: return asdotplus_to_asplain(string) return int(string) def asplain_to_asdot(number: int) -> str: - high_order, low_order = divmod(number, c.TWO_BYTES) - return f"{high_order}{c.DOT}{low_order}" if high_order else str(low_order) + high_order, low_order = divmod(number, c.BYTES.TWO) + if high_order: + return c.DELIMITERS.DOT.join_as_str((high_order, low_order)) + + return str(low_order) def asplain_to_asdotplus(number: int) -> str: - high_order, low_order = divmod(number, c.TWO_BYTES) - return f"{high_order}{c.DOT}{low_order}" + return c.DELIMITERS.DOT.join_as_str(divmod(number, c.BYTES.TWO)) def asplain_to_community(number: int) -> str: - asn, value = divmod(number, c.TWO_BYTES) - return f"{asn}{c.COLON}{value}" + return c.DELIMITERS.COLON.join_as_str(divmod(number, c.BYTES.TWO)) def community_to_asplain(string: str) -> int: - asn, value = map(int, string.split(c.COLON, maxsplit=1)) - return asn * c.TWO_BYTES + value + asn, value = map(int, string.split(c.DELIMITERS.COLON, maxsplit=1)) + return asn * c.BYTES.TWO + value diff --git a/netsome/_converters/ipv4.py b/netsome/_converters/ipv4.py index cf8968f..9e8d47f 100644 --- a/netsome/_converters/ipv4.py +++ b/netsome/_converters/ipv4.py @@ -2,10 +2,10 @@ def address_to_int(string: str) -> int: - octets = map(int, string.split(c.DOT, maxsplit=3)) + octets = map(int, string.split(c.DELIMITERS.DOT, maxsplit=3)) return int.from_bytes(octets, byteorder="big") def int_to_address(number: int) -> str: octets = map(str, number.to_bytes(length=4, byteorder="big")) - return c.DOT.join(octets) + return c.DELIMITERS.DOT.join(octets) diff --git a/netsome/constants.py b/netsome/constants.py index 7536882..7f7a98f 100644 --- a/netsome/constants.py +++ b/netsome/constants.py @@ -1,18 +1,46 @@ -ZERO = 0 -ONE_BYTE = 2**8 -TWO_BYTES = ONE_BYTE**2 -FOUR_BYTES = TWO_BYTES**2 +import enum -DOT = "." -COLON = ":" -SLASH = "/" -ASN_MAX = FOUR_BYTES - 1 -ASN_ORDER_MAX = TWO_BYTES - 1 +class BYTES(enum.IntEnum): -DEFAULT_VID = 1 -VID_MAX = 2**12 - 1 + ZERO = 0 + ONE = 2**8 + TWO = ONE**2 + FOUR = TWO**2 -IPV4_PREFIXLEN_MAX = 32 -IPV4_MAX = FOUR_BYTES - 1 -IPV4_OCTET_MAX = ONE_BYTE - 1 + +class DELIMITERS(str, enum.Enum): + + DASH = "-" + DOT = "." + COLON = ":" + SLASH = "/" + + def join_as_str(self, parts): + return self._value_.join(map(str, parts)) + + +class IPV4(enum.IntEnum): + + PREFIXLEN_MIN = 0 + PREFIXLEN_MAX = 32 + + ADDRESS_MIN = 0 + ADDRESS_MAX = BYTES.FOUR - 1 + + OCTET_MIN = 0 + OCTET_MAX = BYTES.ONE - 1 + + +class VLAN(enum.IntEnum): + + MIN = 0 + DEFAULT = 1 + MAX = 2 ** 12 - 1 + + +class BGP(enum.IntEnum): + + ASN_MIN = 0 + ASN_MAX = BYTES.FOUR - 1 + ASN_ORDER_MAX = BYTES.TWO - 1 diff --git a/netsome/types/bgp.py b/netsome/types/bgp.py index 0c71996..1f0efdc 100644 --- a/netsome/types/bgp.py +++ b/netsome/types/bgp.py @@ -1,37 +1,42 @@ import typing as t from netsome import constants as c -from netsome._converters import bgp as converters -from netsome.validators import bgp as validators +from netsome._converters import bgp as convs +from netsome.validators import bgp as valids # TODO(kuderr): can move some common stuff to Base class class ASN: + + MIN = c.BGP.ASN_MIN + MAX = c.BGP.ASN_MAX + ORDER_MAX = c.BGP.ASN_ORDER_MAX + def __init__(self, number: int) -> None: - validators.validate_asplain(number) + valids.validate_asplain(number) self._number = number @classmethod def from_asdot(cls, string: str) -> "ASN": - validators.validate_asdot(string) - return cls(converters.asdot_to_asplain(string)) + valids.validate_asdot(string) + return cls(convs.asdot_to_asplain(string)) @classmethod def from_asdotplus(cls, string: str) -> "ASN": - validators.validate_asdotplus(string) - return cls(converters.asdotplus_to_asplain(string)) + valids.validate_asdotplus(string) + return cls(convs.asdotplus_to_asplain(string)) @classmethod def from_asplain(cls, number: int) -> "ASN": - validators.validate_asplain(number) + valids.validate_asplain(number) return cls(number) def to_asdot(self): - return converters.asplain_to_asdot(self._number) + return convs.asplain_to_asdot(self._number) def to_asdotplus(self): - return converters.asplain_to_asdotplus(self._number) + return convs.asplain_to_asdotplus(self._number) def to_asplain(self): return self._number @@ -51,13 +56,13 @@ def __repr__(self) -> str: class Community: def __init__(self, number: int) -> None: - validators.validate_asplain(number) + valids.validate_asplain(number) self._number = number @classmethod def from_str(cls, value: str) -> "Community": - validators.validate_community(value) - return cls(converters.community_to_asplain(value)) + valids.validate_community(value) + return cls(convs.community_to_asplain(value)) def __eq__(self, other: t.Any) -> bool: return isinstance(other, self.__class__) and (self._number == other._number) diff --git a/netsome/types/ipv4.py b/netsome/types/ipv4.py index d58d64a..e87a58c 100644 --- a/netsome/types/ipv4.py +++ b/netsome/types/ipv4.py @@ -2,23 +2,35 @@ import typing as t from netsome import constants as c -from netsome._converters import ipv4 as converters -from netsome.validators import ipv4 as validators +from netsome._converters import ipv4 as convs +from netsome.validators import ipv4 as valids class IPv4Address: + + PREFIXLEN_MIN = c.IPV4.PREFIXLEN_MIN + PREFIXLEN_MAX = c.IPV4.PREFIXLEN_MAX + + ADDRESS_MIN = c.IPV4.ADDRESS_MIN + ADDRESS_MAX = c.IPV4.PREFIXLEN_MAX + + OCTET_MIN = c.IPV4.OCTET_MIN + OCTET_MAX = c.IPV4.OCTET_MAX + def __init__(self, address: str) -> None: - validators.validate_address_str(address) - self._addr = converters.address_to_int(address) + valids.validate_address_str(address) + self._addr = convs.address_to_int(address) @classmethod def from_int(cls, number: int) -> "IPv4Address": - validators.validate_address_int(number) - return cls(converters.int_to_address(number)) + valids.validate_address_int(number) + obj = cls.__new__(cls) + obj._addr = number + return obj @functools.cached_property def address(self) -> str: - return converters.int_to_address(self._addr) + return convs.int_to_address(self._addr) def __int__(self) -> int: return self._addr @@ -37,31 +49,38 @@ def __eq__(self, other: t.Any) -> bool: class IPv4Network: + def __init__(self, network: str) -> None: - address, prefixlen = network.split(c.SLASH, maxsplit=1) - validators.validate_address_str(address) - validators.validate_prefixlen_str(prefixlen) - validators.validate_network_int( - converters.address_to_int(address), int(prefixlen) - ) - - self._prefixlen = int(prefixlen) - self._netaddr = IPv4Address(address) - self._netmask = IPv4Address.from_int( - c.IPV4_MAX ^ (c.IPV4_MAX >> self._prefixlen) - ) + # TODO(d.burmistrov): move this block into util? (validate + convert) + addr, prefixlen = network.split(c.DELIMITERS.SLASH, maxsplit=1) + valids.validate_address_str(addr) + valids.validate_prefixlen_str(prefixlen) + prefixlen = int(prefixlen) + valids.validate_network_int(convs.address_to_int(addr), prefixlen) - def __repr__(self) -> str: - return f'{self.__class__.__name__}("{self.address}")' + self._populate(IPv4Address(addr), prefixlen) + + def _populate(self, netaddr: IPv4Address, prefixlen: int) -> None: + self._prefixlen = prefixlen + self._netaddr = netaddr + m = c.IPV4.ADDRESS_MAX + self._netmask = IPv4Address.from_int(m ^ (m >> prefixlen)) @classmethod - def from_int(cls, int_address: int, prefixlen: int) -> "IPv4Network": - validators.validate_address_int(int_address) - validators.validate_prefixlen_int(prefixlen) - validators.validate_network_int(int_address, prefixlen) + def from_int(cls, int_addr: int, prefixlen: int) -> "IPv4Network": + valids.validate_address_int(int_addr) + valids.validate_prefixlen_int(prefixlen) + valids.validate_network_int(int_addr, prefixlen) + + obj = cls.__new__(cls) + obj._populate(IPv4Address.from_int(int_addr), prefixlen) + return obj - address = f"{converters.int_to_address(int_address)}{c.SLASH}{prefixlen}" - return cls(address) + def as_tuple(self) -> tuple[int, int]: + return int(self.netaddress), self._prefixlen + + def __repr__(self) -> str: + return f'{self.__class__.__name__}("{self.address}")' @property def prefixlen(self) -> int: @@ -77,11 +96,11 @@ def netmask(self) -> IPv4Address: @functools.cached_property def address(self) -> str: - return f"{self._netaddr.address}/{self._prefixlen}" + return c.DELIMITERS.SLASH.join_as_str((self._netaddr.address, self._prefixlen)) @functools.cached_property def hostmask(self) -> IPv4Address: - return IPv4Address.from_int(int(self._netmask) ^ c.IPV4_MAX) + return IPv4Address.from_int(int(self._netmask) ^ c.IPV4.ADDRESS_MAX) @functools.cached_property def broadcast(self) -> IPv4Address: @@ -93,7 +112,7 @@ def subnets( ) -> t.Generator["IPv4Network", None, None]: new_prefixlen = self._prefixlen + 1 if prefixlen: - validators.validate_prefixlen_int(prefixlen, min_len=new_prefixlen) + valids.validate_prefixlen_int(prefixlen, min_len=new_prefixlen) new_prefixlen = prefixlen prefixlen_diff = new_prefixlen - self._prefixlen @@ -111,7 +130,7 @@ def supernet( ) -> "IPv4Network": new_prefixlen = self._prefixlen - 1 if prefixlen: - validators.validate_prefixlen_int(prefixlen, max_len=new_prefixlen) + valids.validate_prefixlen_int(prefixlen, max_len=new_prefixlen) new_prefixlen = prefixlen prefixlen_diff = self._prefixlen - new_prefixlen diff --git a/netsome/types/vlans.py b/netsome/types/vlans.py index 880c186..c044afa 100644 --- a/netsome/types/vlans.py +++ b/netsome/types/vlans.py @@ -1,16 +1,20 @@ import typing as t from netsome import constants as c -from netsome.validators import vlans as validators +from netsome.validators import vlans as valids class VID: """VLAN ID""" - _RESERVED = {c.ZERO, c.DEFAULT_VID, c.VID_MAX} + VID_MIN = c.VLAN.MIN + VID_DEFAULT = c.VLAN.DEFAULT + VID_MAX = c.VLAN.MAX + + _RESERVED = {VID_MIN, VID_DEFAULT, VID_MAX} def __init__(self, vid: int) -> None: - validators.validate_vid(vid) + valids.validate_vid(vid) self._vid = vid def __eq__(self, other: t.Any) -> bool: @@ -29,4 +33,4 @@ def is_reserved(self) -> bool: return self._vid in self._RESERVED def is_default(self) -> bool: - return self._vid == c.DEFAULT_VID + return self._vid == self.VID_DEFAULT diff --git a/netsome/validators/bgp.py b/netsome/validators/bgp.py index ffc910f..7f09c74 100644 --- a/netsome/validators/bgp.py +++ b/netsome/validators/bgp.py @@ -1,53 +1,53 @@ from netsome import constants as c -from netsome._converters import bgp as converters +from netsome._converters import bgp as convs def validate_asplain(number: int) -> None: if not isinstance(number, int): raise TypeError("Invalid asplain type, must be int") - if not (c.ZERO <= number <= c.ASN_MAX): - raise ValueError( - f"Invalid asplain number. Must be in range {c.ZERO}-{c.ASN_MAX}" - ) + if not (c.BGP.ASN_MIN <= number <= c.BGP.ASN_MAX): + msg = ("Invalid asplain number. Must be in range " + + c.DELIMITERS.DASH.join_as_str((c.BGP.ASN_MIN, c.BGP.ASN_MAX))) + raise ValueError(msg) def validate_asdotplus(string: str) -> None: if not isinstance(string, str): raise TypeError("Invalid asdot+ type, must be str") - if c.DOT not in string: + if c.DELIMITERS.DOT not in string: raise ValueError("Invalid asdot+ format, must be HIGH_ORDER.LOW_ORDER") - validate_asplain(converters.asdotplus_to_asplain(string)) + validate_asplain(convs.asdotplus_to_asplain(string)) def validate_asdot(string: str) -> None: if not isinstance(string, str): raise TypeError("Invalid asdot type, must be str") - if c.DOT in string: + if c.DELIMITERS.DOT in string: validate_asdotplus(string) - return - - validate_asplain(int(string)) + else: + validate_asplain(int(string)) def validate_community(string: str) -> None: if not isinstance(string, str): raise TypeError("Invalid Community type, must be str") - if string.count(c.COLON) != 1: - raise ValueError( - "Invalid Community format, delimiter must be colon – ASN:VALUE" - ) - - asn, value = map(int, string.split(c.COLON, maxsplit=1)) - if not (c.ZERO <= asn <= c.ASN_ORDER_MAX): - raise ValueError( - f"Invalid ASN in Community. Must be in range {c.ZERO}-{c.ASN_ORDER_MAX}" - ) - if not (c.ZERO <= value <= c.ASN_ORDER_MAX): - raise ValueError( - f"Invalid VALUE number in Community. Must be in range {c.ZERO}-{c.ASN_ORDER_MAX }" - ) + asn, value, *unexpected = string.split(c.DELIMITERS.COLON) + + if unexpected: + msg = "Invalid Community format, delimiter must be colon – ASN:VALUE" + raise ValueError(msg) + + asn, value = int(asn), int(value) + if not (c.BGP.ASN_MIN <= asn <= c.BGP.ASN_ORDER_MAX): + msg = ("Invalid ASN in Community. Must be in range " + + c.DELIMITERS.DASH.join_as_str((c.BGP.ASN_MIN, c.BGP.ASN_ORDER_MAX))) + raise ValueError(msg) + if not (c.BGP.ASN_MIN <= value <= c.BGP.ASN_ORDER_MAX): + msg = ("Invalid VALUE number in Community. Must be in range " + + c.DELIMITERS.DASH.join_as_str((c.BGP.ASN_MIN, c.BGP.ASN_ORDER_MAX))) + raise ValueError(msg) diff --git a/netsome/validators/ipv4.py b/netsome/validators/ipv4.py index abcc2ee..99f6bdd 100644 --- a/netsome/validators/ipv4.py +++ b/netsome/validators/ipv4.py @@ -5,19 +5,19 @@ def validate_address_str(string: str): if not isinstance(string, str): raise TypeError("Invalid type") - octets = string.split(c.DOT) + octets = tuple(map(int, string.split(c.DELIMITERS.DOT))) if len(octets) != 4: raise ValueError() for octet in octets: - validate_octet_str(octet) + validate_octet_int(octet) def validate_address_int(number: int) -> None: if not isinstance(number, int): raise TypeError("Invalid type") - if not (c.ZERO <= number <= c.IPV4_MAX): + if not (c.IPV4.ADDRESS_MIN <= number <= c.IPV4.ADDRESS_MAX): raise ValueError("Invalid value") @@ -38,7 +38,7 @@ def validate_octet_int(number: int) -> None: if not isinstance(number, int): raise TypeError("Invalid type") - if not (c.ZERO <= number <= c.IPV4_OCTET_MAX): + if not (c.BYTES.ZERO <= number <= c.IPV4.OCTET_MAX): raise ValueError("Invalid value") @@ -56,8 +56,8 @@ def validate_prefixlen_str(string: str) -> None: # TODO(dm.a.kudryavtsev): можно сделать общим валидатором на значение def validate_prefixlen_int( number: int, - min_len: int = c.ZERO, - max_len: int = c.IPV4_PREFIXLEN_MAX, + min_len: int = c.BYTES.ZERO, + max_len: int = c.IPV4.PREFIXLEN_MAX, ) -> None: if not isinstance(number, int): raise TypeError() @@ -67,6 +67,6 @@ def validate_prefixlen_int( def validate_network_int(address: int, prefixlen: int): - netmask = c.IPV4_MAX ^ (c.IPV4_MAX >> prefixlen) + netmask = c.IPV4.ADDRESS_MAX ^ (c.IPV4.ADDRESS_MAX >> prefixlen) if address & netmask != address: raise ValueError("host bits set") diff --git a/netsome/validators/vlans.py b/netsome/validators/vlans.py index 39c20e5..e3749f6 100644 --- a/netsome/validators/vlans.py +++ b/netsome/validators/vlans.py @@ -5,5 +5,6 @@ def validate_vid(vid: int) -> None: if not isinstance(vid, int): raise TypeError("Invalid type, must be int") - if not (c.ZERO <= vid <= c.VID_MAX): - raise ValueError(f"Invalid vlan number. Must be in range {c.ZERO}-{c.VID_MAX}") + if not (c.VLAN.MIN <= vid <= c.VLAN.MAX): + raise ValueError("Invalid vlan number. Must be in range " + + c.DELIMITERS.DASH.join_as_str((c.VLAN.MIN, c.VLAN.MAX)))