Skip to content

Commit

Permalink
Bunch of refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
pyctrl committed Apr 13, 2024
1 parent 58a85c2 commit 6721960
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 83 deletions.
25 changes: 14 additions & 11 deletions netsome/_converters/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,35 @@


def asdotplus_to_asplain(string: str) -> int:
high_order, low_order = map(int, string.split(c.DOT, maxsplit=1))
return high_order * c.TWO_BYTES + low_order
high_order, low_order = map(int, c.DELIMITERS.DOT.d_split(string))
return high_order * c.BYTES.TWO + low_order


def asdot_to_asplain(string: str) -> int:
if c.DOT in string:
if c.DELIMITERS.DOT in string:
return asdotplus_to_asplain(string)

return int(string)


def asplain_to_asdot(number: int) -> str:
high_order, low_order = divmod(number, c.TWO_BYTES)
return f"{high_order}{c.DOT}{low_order}" if high_order else str(low_order)
high_order, low_order = divmod(number, c.BYTES.TWO)
if high_order:
return c.DELIMITERS.DOT.d_join(high_order, low_order)
else:
return str(low_order)


def asplain_to_asdotplus(number: int) -> str:
high_order, low_order = divmod(number, c.TWO_BYTES)
return f"{high_order}{c.DOT}{low_order}"
high_order, low_order = divmod(number, c.BYTES.TWO)
return c.DELIMITERS.DOT.d_join(high_order, low_order)


def asplain_to_community(number: int) -> str:
asn, value = divmod(number, c.TWO_BYTES)
return f"{asn}{c.COLON}{value}"
asn, value = divmod(number, c.BYTES.TWO)
return c.DELIMITERS.DOT.d_join(asn, value)


def community_to_asplain(string: str) -> int:
asn, value = map(int, string.split(c.COLON, maxsplit=1))
return asn * c.TWO_BYTES + value
asn, value = map(int, c.DELIMITERS.DOT.d_split(string))
return asn * c.BYTES.TWO + value
4 changes: 2 additions & 2 deletions netsome/_converters/ipv4.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@


def address_to_int(string: str) -> int:
octets = map(int, string.split(c.DOT, maxsplit=3))
octets = map(int, c.DELIMITERS.DOT.d_split(string, maxsplit=3))
return int.from_bytes(octets, byteorder="big")


def int_to_address(number: int) -> str:
octets = map(str, number.to_bytes(length=4, byteorder="big"))
return c.DOT.join(octets)
return c.DELIMITERS.DOT.d_join(*octets)
58 changes: 44 additions & 14 deletions netsome/constants.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,48 @@
ZERO = 0
ONE_BYTE = 2**8
TWO_BYTES = ONE_BYTE**2
FOUR_BYTES = TWO_BYTES**2
import enum

DOT = "."
COLON = ":"
SLASH = "/"

ASN_MAX = FOUR_BYTES - 1
ASN_ORDER_MAX = TWO_BYTES - 1
class BYTES(enum.IntEnum):

DEFAULT_VID = 1
VID_MAX = 2**12 - 1
ZERO = 0
ONE = 2**8
TWO = ONE**2
FOUR = TWO**2

IPV4_PREFIXLEN_MAX = 32
IPV4_MAX = FOUR_BYTES - 1
IPV4_OCTET_MAX = ONE_BYTE - 1

class DELIMITERS(enum.StrEnum):

DASH = "-"
DOT = "."
COLON = ":"
SLASH = "/"

def d_join(self, *parts):
return self._value_.join(map(str, parts))

def d_split(self, string: str, maxsplit: int = 1):
return string.split(self._value_, maxsplit=maxsplit)


class IPV4(enum.IntEnum):

MIN_PREFIXLEN = 0
MAX_PREFIXLEN = 32

MIN_ADDRESS_BYTES = 0
MAX_ADDRESS_BYTES = BYTES.FOUR - 1

MIN_OCTET_BYTES = 0
MAX_OCTET_BYTES = BYTES.ONE - 1


class VLAN(enum.IntEnum):

ZERO_VID = 0
DEFAULT_VID = 1
MAX_VID = 2 ** 12 - 1


class BgpAsn(enum.IntEnum):

MAX = BYTES.FOUR - 1
ORDER_MAX = BYTES.TWO - 1
30 changes: 17 additions & 13 deletions netsome/types/bgp.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,41 @@
import typing as t

from netsome import constants as c
from netsome._converters import bgp as converters
from netsome.validators import bgp as validators
from netsome._converters import bgp as convs
from netsome.validators import bgp as valids

# TODO(kuderr): can move some common stuff to Base class


class ASN:

MAX = c.BgpAsn.MAX
ORDER_MAX = c.BgpAsn.ORDER_MAX

def __init__(self, number: int) -> None:
validators.validate_asplain(number)
valids.validate_asplain(number)
self._number = number

@classmethod
def from_asdot(cls, string: str) -> "ASN":
validators.validate_asdot(string)
return cls(converters.asdot_to_asplain(string))
valids.validate_asdot(string)
return cls(convs.asdot_to_asplain(string))

@classmethod
def from_asdotplus(cls, string: str) -> "ASN":
validators.validate_asdotplus(string)
return cls(converters.asdotplus_to_asplain(string))
valids.validate_asdotplus(string)
return cls(convs.asdotplus_to_asplain(string))

@classmethod
def from_asplain(cls, number: int) -> "ASN":
validators.validate_asplain(number)
valids.validate_asplain(number)
return cls(number)

def to_asdot(self):
return converters.asplain_to_asdot(self._number)
return convs.asplain_to_asdot(self._number)

def to_asdotplus(self):
return converters.asplain_to_asdotplus(self._number)
return convs.asplain_to_asdotplus(self._number)

def to_asplain(self):
return self._number
Expand All @@ -51,13 +55,13 @@ def __repr__(self) -> str:

class Community:
def __init__(self, number: int) -> None:
validators.validate_asplain(number)
valids.validate_asplain(number)
self._number = number

@classmethod
def from_str(cls, value: str) -> "Community":
validators.validate_community(value)
return cls(converters.community_to_asplain(value))
valids.validate_community(value)
return cls(convs.community_to_asplain(value))

def __eq__(self, other: t.Any) -> bool:
return isinstance(other, self.__class__) and (self._number == other._number)
Expand Down
80 changes: 49 additions & 31 deletions netsome/types/ipv4.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,35 @@
import typing as t

from netsome import constants as c
from netsome._converters import ipv4 as converters
from netsome.validators import ipv4 as validators
from netsome._converters import ipv4 as convs
from netsome.validators import ipv4 as valids


class IPv4Address:

MIN_PREFIXLEN = c.IPV4.MIN_PREFIXLEN
MAX_PREFIXLEN = c.IPV4.MAX_PREFIXLEN

MIN_ADDRESS_BYTES = c.IPV4.MIN_ADDRESS_BYTES
MAX_ADDRESS_BYTES = c.IPV4.MAX_PREFIXLEN

MIN_OCTET_BYTES = c.IPV4.MIN_OCTET_BYTES
MAX_OCTET_BYTES = c.IPV4.MAX_OCTET_BYTES

def __init__(self, address: str) -> None:
validators.validate_address_str(address)
self._addr = converters.address_to_int(address)
valids.validate_address_str(address)
self._addr = convs.address_to_int(address)

@classmethod
def from_int(cls, number: int) -> "IPv4Address":
validators.validate_address_int(number)
return cls(converters.int_to_address(number))
valids.validate_address_int(number)
obj = cls.__new__(cls)
obj._addr = number
return obj

@functools.cached_property
def address(self) -> str:
return converters.int_to_address(self._addr)
return convs.int_to_address(self._addr)

def __int__(self) -> int:
return self._addr
Expand All @@ -37,31 +49,37 @@ def __eq__(self, other: t.Any) -> bool:


class IPv4Network:

def __init__(self, network: str) -> None:
address, prefixlen = network.split(c.SLASH, maxsplit=1)
validators.validate_address_str(address)
validators.validate_prefixlen_str(prefixlen)
validators.validate_network_int(
converters.address_to_int(address), int(prefixlen)
)

self._prefixlen = int(prefixlen)
self._netaddr = IPv4Address(address)
self._netmask = IPv4Address.from_int(
c.IPV4_MAX ^ (c.IPV4_MAX >> self._prefixlen)
)
# TODO(d.burmistrov): move this block into util? (validate + convert)
addr, prefixlen = network.split(c.DELIMITERS.SLASH, maxsplit=1)
valids.validate_address_str(addr)
valids.validate_prefixlen_str(prefixlen)
valids.validate_network_int(convs.address_to_int(addr), int(prefixlen))

def __repr__(self) -> str:
return f'{self.__class__.__name__}("{self.address}")'
self._populate(IPv4Address(addr), prefixlen)

def _populate(self, netaddr: IPv4Address, prefixlen: int) -> None:
self._prefixlen = prefixlen
self._netaddr = netaddr
m = c.IPV4.MAX_ADDRESS_BYTES
self._netmask = IPv4Address.from_int(m ^ (m >> prefixlen))

@classmethod
def from_int(cls, int_address: int, prefixlen: int) -> "IPv4Network":
validators.validate_address_int(int_address)
validators.validate_prefixlen_int(prefixlen)
validators.validate_network_int(int_address, prefixlen)
def from_int(cls, int_addr: int, prefixlen: int) -> "IPv4Network":
valids.validate_address_int(int_addr)
valids.validate_prefixlen_int(prefixlen)
valids.validate_network_int(int_addr, prefixlen)

obj = cls.__new__(cls)
obj._populate(IPv4Address.from_int(int_addr), prefixlen)
return obj

address = f"{converters.int_to_address(int_address)}{c.SLASH}{prefixlen}"
return cls(address)
def as_tuple(self) -> tuple[int, int]:
return int(self.netaddress), self._prefixlen

def __repr__(self) -> str:
return f'{self.__class__.__name__}("{self.address}")'

@property
def prefixlen(self) -> int:
Expand All @@ -77,11 +95,11 @@ def netmask(self) -> IPv4Address:

@functools.cached_property
def address(self) -> str:
return f"{self._netaddr.address}/{self._prefixlen}"
return c.DELIMITERS.SLASH.d_join(self._netaddr.address, self._prefixlen)

@functools.cached_property
def hostmask(self) -> IPv4Address:
return IPv4Address.from_int(int(self._netmask) ^ c.IPV4_MAX)
return IPv4Address.from_int(int(self._netmask) ^ c.IPV4.MAX_ADDRESS_BYTES)

@functools.cached_property
def broadcast(self) -> IPv4Address:
Expand All @@ -93,7 +111,7 @@ def subnets(
) -> t.Generator["IPv4Network", None, None]:
new_prefixlen = self._prefixlen + 1
if prefixlen:
validators.validate_prefixlen_int(prefixlen, min_len=new_prefixlen)
valids.validate_prefixlen_int(prefixlen, min_len=new_prefixlen)
new_prefixlen = prefixlen

prefixlen_diff = new_prefixlen - self._prefixlen
Expand All @@ -111,7 +129,7 @@ def supernet(
) -> "IPv4Network":
new_prefixlen = self._prefixlen - 1
if prefixlen:
validators.validate_prefixlen_int(prefixlen, max_len=new_prefixlen)
valids.validate_prefixlen_int(prefixlen, max_len=new_prefixlen)
new_prefixlen = prefixlen

prefixlen_diff = self._prefixlen - new_prefixlen
Expand Down
12 changes: 8 additions & 4 deletions netsome/types/vlans.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import typing as t

from netsome import constants as c
from netsome.validators import vlans as validators
from netsome.validators import vlans as valids


class VID:
"""VLAN ID"""

_RESERVED = {c.ZERO, c.DEFAULT_VID, c.VID_MAX}
ZERO_VID = c.VLAN.ZERO_VID
DEFAULT_VID = c.VLAN.DEFAULT_VID
MAX_VID = c.VLAN.MAX_VID

_RESERVED = {ZERO_VID, DEFAULT_VID, MAX_VID}

def __init__(self, vid: int) -> None:
validators.validate_vid(vid)
valids.validate_vid(vid)
self._vid = vid

def __eq__(self, other: t.Any) -> bool:
Expand All @@ -29,4 +33,4 @@ def is_reserved(self) -> bool:
return self._vid in self._RESERVED

def is_default(self) -> bool:
return self._vid == c.DEFAULT_VID
return self._vid == self.DEFAULT_VID
Loading

0 comments on commit 6721960

Please sign in to comment.