Skip to content

Commit

Permalink
Constants rework (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
pyctrl authored and kuderr committed Apr 15, 2024
1 parent 6931ba4 commit 1b20496
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 109 deletions.
23 changes: 12 additions & 11 deletions netsome/_converters/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,33 @@


def asdotplus_to_asplain(string: str) -> int:
high_order, low_order = map(int, string.split(c.DOT, maxsplit=1))
return high_order * c.TWO_BYTES + low_order
high_order, low_order = map(int, string.split(c.DELIMITERS.DOT, maxsplit=1))
return high_order * c.BYTES.TWO + low_order


def asdot_to_asplain(string: str) -> int:
if c.DOT in string:
if c.DELIMITERS.DOT in string:
return asdotplus_to_asplain(string)

return int(string)


def asplain_to_asdot(number: int) -> str:
high_order, low_order = divmod(number, c.TWO_BYTES)
return f"{high_order}{c.DOT}{low_order}" if high_order else str(low_order)
high_order, low_order = divmod(number, c.BYTES.TWO)
if high_order:
return c.DELIMITERS.DOT.join_as_str((high_order, low_order))

return str(low_order)


def asplain_to_asdotplus(number: int) -> str:
high_order, low_order = divmod(number, c.TWO_BYTES)
return f"{high_order}{c.DOT}{low_order}"
return c.DELIMITERS.DOT.join_as_str(divmod(number, c.BYTES.TWO))


def asplain_to_community(number: int) -> str:
asn, value = divmod(number, c.TWO_BYTES)
return f"{asn}{c.COLON}{value}"
return c.DELIMITERS.COLON.join_as_str(divmod(number, c.BYTES.TWO))


def community_to_asplain(string: str) -> int:
asn, value = map(int, string.split(c.COLON, maxsplit=1))
return asn * c.TWO_BYTES + value
asn, value = map(int, string.split(c.DELIMITERS.COLON, maxsplit=1))
return asn * c.BYTES.TWO + value
4 changes: 2 additions & 2 deletions netsome/_converters/ipv4.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@


def address_to_int(string: str) -> int:
octets = map(int, string.split(c.DOT, maxsplit=3))
octets = map(int, string.split(c.DELIMITERS.DOT, maxsplit=3))
return int.from_bytes(octets, byteorder="big")


def int_to_address(number: int) -> str:
octets = map(str, number.to_bytes(length=4, byteorder="big"))
return c.DOT.join(octets)
return c.DELIMITERS.DOT.join(octets)
56 changes: 42 additions & 14 deletions netsome/constants.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,46 @@
ZERO = 0
ONE_BYTE = 2**8
TWO_BYTES = ONE_BYTE**2
FOUR_BYTES = TWO_BYTES**2
import enum

DOT = "."
COLON = ":"
SLASH = "/"

ASN_MAX = FOUR_BYTES - 1
ASN_ORDER_MAX = TWO_BYTES - 1
class BYTES(enum.IntEnum):

DEFAULT_VID = 1
VID_MAX = 2**12 - 1
ZERO = 0
ONE = 2**8
TWO = ONE**2
FOUR = TWO**2

IPV4_PREFIXLEN_MAX = 32
IPV4_MAX = FOUR_BYTES - 1
IPV4_OCTET_MAX = ONE_BYTE - 1

class DELIMITERS(str, enum.Enum):

DASH = "-"
DOT = "."
COLON = ":"
SLASH = "/"

def join_as_str(self, parts):
return self._value_.join(map(str, parts))


class IPV4(enum.IntEnum):

PREFIXLEN_MIN = 0
PREFIXLEN_MAX = 32

ADDRESS_MIN = 0
ADDRESS_MAX = BYTES.FOUR - 1

OCTET_MIN = 0
OCTET_MAX = BYTES.ONE - 1


class VLAN(enum.IntEnum):

MIN = 0
DEFAULT = 1
MAX = 2 ** 12 - 1


class BGP(enum.IntEnum):

ASN_MIN = 0
ASN_MAX = BYTES.FOUR - 1
ASN_ORDER_MAX = BYTES.TWO - 1
31 changes: 18 additions & 13 deletions netsome/types/bgp.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,42 @@
import typing as t

from netsome import constants as c
from netsome._converters import bgp as converters
from netsome.validators import bgp as validators
from netsome._converters import bgp as convs
from netsome.validators import bgp as valids

# TODO(kuderr): can move some common stuff to Base class


class ASN:

MIN = c.BGP.ASN_MIN
MAX = c.BGP.ASN_MAX
ORDER_MAX = c.BGP.ASN_ORDER_MAX

def __init__(self, number: int) -> None:
validators.validate_asplain(number)
valids.validate_asplain(number)
self._number = number

@classmethod
def from_asdot(cls, string: str) -> "ASN":
validators.validate_asdot(string)
return cls(converters.asdot_to_asplain(string))
valids.validate_asdot(string)
return cls(convs.asdot_to_asplain(string))

@classmethod
def from_asdotplus(cls, string: str) -> "ASN":
validators.validate_asdotplus(string)
return cls(converters.asdotplus_to_asplain(string))
valids.validate_asdotplus(string)
return cls(convs.asdotplus_to_asplain(string))

@classmethod
def from_asplain(cls, number: int) -> "ASN":
validators.validate_asplain(number)
valids.validate_asplain(number)
return cls(number)

def to_asdot(self):
return converters.asplain_to_asdot(self._number)
return convs.asplain_to_asdot(self._number)

def to_asdotplus(self):
return converters.asplain_to_asdotplus(self._number)
return convs.asplain_to_asdotplus(self._number)

def to_asplain(self):
return self._number
Expand All @@ -51,13 +56,13 @@ def __repr__(self) -> str:

class Community:
def __init__(self, number: int) -> None:
validators.validate_asplain(number)
valids.validate_asplain(number)
self._number = number

@classmethod
def from_str(cls, value: str) -> "Community":
validators.validate_community(value)
return cls(converters.community_to_asplain(value))
valids.validate_community(value)
return cls(convs.community_to_asplain(value))

def __eq__(self, other: t.Any) -> bool:
return isinstance(other, self.__class__) and (self._number == other._number)
Expand Down
81 changes: 50 additions & 31 deletions netsome/types/ipv4.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,35 @@
import typing as t

from netsome import constants as c
from netsome._converters import ipv4 as converters
from netsome.validators import ipv4 as validators
from netsome._converters import ipv4 as convs
from netsome.validators import ipv4 as valids


class IPv4Address:

PREFIXLEN_MIN = c.IPV4.PREFIXLEN_MIN
PREFIXLEN_MAX = c.IPV4.PREFIXLEN_MAX

ADDRESS_MIN = c.IPV4.ADDRESS_MIN
ADDRESS_MAX = c.IPV4.PREFIXLEN_MAX

OCTET_MIN = c.IPV4.OCTET_MIN
OCTET_MAX = c.IPV4.OCTET_MAX

def __init__(self, address: str) -> None:
validators.validate_address_str(address)
self._addr = converters.address_to_int(address)
valids.validate_address_str(address)
self._addr = convs.address_to_int(address)

@classmethod
def from_int(cls, number: int) -> "IPv4Address":
validators.validate_address_int(number)
return cls(converters.int_to_address(number))
valids.validate_address_int(number)
obj = cls.__new__(cls)
obj._addr = number
return obj

@functools.cached_property
def address(self) -> str:
return converters.int_to_address(self._addr)
return convs.int_to_address(self._addr)

def __int__(self) -> int:
return self._addr
Expand All @@ -37,31 +49,38 @@ def __eq__(self, other: t.Any) -> bool:


class IPv4Network:

def __init__(self, network: str) -> None:
address, prefixlen = network.split(c.SLASH, maxsplit=1)
validators.validate_address_str(address)
validators.validate_prefixlen_str(prefixlen)
validators.validate_network_int(
converters.address_to_int(address), int(prefixlen)
)

self._prefixlen = int(prefixlen)
self._netaddr = IPv4Address(address)
self._netmask = IPv4Address.from_int(
c.IPV4_MAX ^ (c.IPV4_MAX >> self._prefixlen)
)
# TODO(d.burmistrov): move this block into util? (validate + convert)
addr, prefixlen = network.split(c.DELIMITERS.SLASH, maxsplit=1)
valids.validate_address_str(addr)
valids.validate_prefixlen_str(prefixlen)
prefixlen = int(prefixlen)
valids.validate_network_int(convs.address_to_int(addr), prefixlen)

def __repr__(self) -> str:
return f'{self.__class__.__name__}("{self.address}")'
self._populate(IPv4Address(addr), prefixlen)

def _populate(self, netaddr: IPv4Address, prefixlen: int) -> None:
self._prefixlen = prefixlen
self._netaddr = netaddr
m = c.IPV4.ADDRESS_MAX
self._netmask = IPv4Address.from_int(m ^ (m >> prefixlen))

@classmethod
def from_int(cls, int_address: int, prefixlen: int) -> "IPv4Network":
validators.validate_address_int(int_address)
validators.validate_prefixlen_int(prefixlen)
validators.validate_network_int(int_address, prefixlen)
def from_int(cls, int_addr: int, prefixlen: int) -> "IPv4Network":
valids.validate_address_int(int_addr)
valids.validate_prefixlen_int(prefixlen)
valids.validate_network_int(int_addr, prefixlen)

obj = cls.__new__(cls)
obj._populate(IPv4Address.from_int(int_addr), prefixlen)
return obj

address = f"{converters.int_to_address(int_address)}{c.SLASH}{prefixlen}"
return cls(address)
def as_tuple(self) -> tuple[int, int]:
return int(self.netaddress), self._prefixlen

def __repr__(self) -> str:
return f'{self.__class__.__name__}("{self.address}")'

@property
def prefixlen(self) -> int:
Expand All @@ -77,11 +96,11 @@ def netmask(self) -> IPv4Address:

@functools.cached_property
def address(self) -> str:
return f"{self._netaddr.address}/{self._prefixlen}"
return c.DELIMITERS.SLASH.join_as_str((self._netaddr.address, self._prefixlen))

@functools.cached_property
def hostmask(self) -> IPv4Address:
return IPv4Address.from_int(int(self._netmask) ^ c.IPV4_MAX)
return IPv4Address.from_int(int(self._netmask) ^ c.IPV4.ADDRESS_MAX)

@functools.cached_property
def broadcast(self) -> IPv4Address:
Expand All @@ -93,7 +112,7 @@ def subnets(
) -> t.Generator["IPv4Network", None, None]:
new_prefixlen = self._prefixlen + 1
if prefixlen:
validators.validate_prefixlen_int(prefixlen, min_len=new_prefixlen)
valids.validate_prefixlen_int(prefixlen, min_len=new_prefixlen)
new_prefixlen = prefixlen

prefixlen_diff = new_prefixlen - self._prefixlen
Expand All @@ -111,7 +130,7 @@ def supernet(
) -> "IPv4Network":
new_prefixlen = self._prefixlen - 1
if prefixlen:
validators.validate_prefixlen_int(prefixlen, max_len=new_prefixlen)
valids.validate_prefixlen_int(prefixlen, max_len=new_prefixlen)
new_prefixlen = prefixlen

prefixlen_diff = self._prefixlen - new_prefixlen
Expand Down
12 changes: 8 additions & 4 deletions netsome/types/vlans.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import typing as t

from netsome import constants as c
from netsome.validators import vlans as validators
from netsome.validators import vlans as valids


class VID:
"""VLAN ID"""

_RESERVED = {c.ZERO, c.DEFAULT_VID, c.VID_MAX}
VID_MIN = c.VLAN.MIN
VID_DEFAULT = c.VLAN.DEFAULT
VID_MAX = c.VLAN.MAX

_RESERVED = {VID_MIN, VID_DEFAULT, VID_MAX}

def __init__(self, vid: int) -> None:
validators.validate_vid(vid)
valids.validate_vid(vid)
self._vid = vid

def __eq__(self, other: t.Any) -> bool:
Expand All @@ -29,4 +33,4 @@ def is_reserved(self) -> bool:
return self._vid in self._RESERVED

def is_default(self) -> bool:
return self._vid == c.DEFAULT_VID
return self._vid == self.VID_DEFAULT
Loading

0 comments on commit 1b20496

Please sign in to comment.