Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Constants rework #5

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions netsome/_converters/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,33 @@


def asdotplus_to_asplain(string: str) -> int:
high_order, low_order = map(int, string.split(c.DOT, maxsplit=1))
return high_order * c.TWO_BYTES + low_order
high_order, low_order = map(int, string.split(c.DELIMITERS.DOT, maxsplit=1))
return high_order * c.BYTES.TWO + low_order


def asdot_to_asplain(string: str) -> int:
if c.DOT in string:
if c.DELIMITERS.DOT in string:
return asdotplus_to_asplain(string)

return int(string)


def asplain_to_asdot(number: int) -> str:
high_order, low_order = divmod(number, c.TWO_BYTES)
return f"{high_order}{c.DOT}{low_order}" if high_order else str(low_order)
high_order, low_order = divmod(number, c.BYTES.TWO)
if high_order:
return c.DELIMITERS.DOT.join_as_str((high_order, low_order))

return str(low_order)


def asplain_to_asdotplus(number: int) -> str:
high_order, low_order = divmod(number, c.TWO_BYTES)
return f"{high_order}{c.DOT}{low_order}"
return c.DELIMITERS.DOT.join_as_str(divmod(number, c.BYTES.TWO))


def asplain_to_community(number: int) -> str:
asn, value = divmod(number, c.TWO_BYTES)
return f"{asn}{c.COLON}{value}"
return c.DELIMITERS.COLON.join_as_str(divmod(number, c.BYTES.TWO))


def community_to_asplain(string: str) -> int:
asn, value = map(int, string.split(c.COLON, maxsplit=1))
return asn * c.TWO_BYTES + value
asn, value = map(int, string.split(c.DELIMITERS.COLON, maxsplit=1))
return asn * c.BYTES.TWO + value
4 changes: 2 additions & 2 deletions netsome/_converters/ipv4.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@


def address_to_int(string: str) -> int:
octets = map(int, string.split(c.DOT, maxsplit=3))
octets = map(int, string.split(c.DELIMITERS.DOT, maxsplit=3))
return int.from_bytes(octets, byteorder="big")


def int_to_address(number: int) -> str:
octets = map(str, number.to_bytes(length=4, byteorder="big"))
return c.DOT.join(octets)
return c.DELIMITERS.DOT.join(octets)
56 changes: 42 additions & 14 deletions netsome/constants.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,46 @@
ZERO = 0
ONE_BYTE = 2**8
TWO_BYTES = ONE_BYTE**2
FOUR_BYTES = TWO_BYTES**2
import enum

DOT = "."
COLON = ":"
SLASH = "/"

ASN_MAX = FOUR_BYTES - 1
ASN_ORDER_MAX = TWO_BYTES - 1
class BYTES(enum.IntEnum):

DEFAULT_VID = 1
VID_MAX = 2**12 - 1
ZERO = 0
ONE = 2**8
TWO = ONE**2
FOUR = TWO**2

IPV4_PREFIXLEN_MAX = 32
IPV4_MAX = FOUR_BYTES - 1
IPV4_OCTET_MAX = ONE_BYTE - 1

class DELIMITERS(str, enum.Enum):

DASH = "-"
DOT = "."
COLON = ":"
SLASH = "/"

def join_as_str(self, parts):
return self._value_.join(map(str, parts))


class IPV4(enum.IntEnum):

PREFIXLEN_MIN = 0
PREFIXLEN_MAX = 32

ADDRESS_MIN = 0
ADDRESS_MAX = BYTES.FOUR - 1

OCTET_MIN = 0
OCTET_MAX = BYTES.ONE - 1


class VLAN(enum.IntEnum):

MIN = 0
DEFAULT = 1
MAX = 2 ** 12 - 1


class BGP(enum.IntEnum):

ASN_MIN = 0
ASN_MAX = BYTES.FOUR - 1
ASN_ORDER_MAX = BYTES.TWO - 1
31 changes: 18 additions & 13 deletions netsome/types/bgp.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,42 @@
import typing as t

from netsome import constants as c
from netsome._converters import bgp as converters
from netsome.validators import bgp as validators
from netsome._converters import bgp as convs
from netsome.validators import bgp as valids

# TODO(kuderr): can move some common stuff to Base class


class ASN:

MIN = c.BGP.ASN_MIN
MAX = c.BGP.ASN_MAX
ORDER_MAX = c.BGP.ASN_ORDER_MAX

def __init__(self, number: int) -> None:
validators.validate_asplain(number)
valids.validate_asplain(number)
self._number = number

@classmethod
def from_asdot(cls, string: str) -> "ASN":
validators.validate_asdot(string)
return cls(converters.asdot_to_asplain(string))
valids.validate_asdot(string)
return cls(convs.asdot_to_asplain(string))

@classmethod
def from_asdotplus(cls, string: str) -> "ASN":
validators.validate_asdotplus(string)
return cls(converters.asdotplus_to_asplain(string))
valids.validate_asdotplus(string)
return cls(convs.asdotplus_to_asplain(string))

@classmethod
def from_asplain(cls, number: int) -> "ASN":
validators.validate_asplain(number)
valids.validate_asplain(number)
return cls(number)

def to_asdot(self):
return converters.asplain_to_asdot(self._number)
return convs.asplain_to_asdot(self._number)

def to_asdotplus(self):
return converters.asplain_to_asdotplus(self._number)
return convs.asplain_to_asdotplus(self._number)

def to_asplain(self):
return self._number
Expand All @@ -51,13 +56,13 @@ def __repr__(self) -> str:

class Community:
def __init__(self, number: int) -> None:
validators.validate_asplain(number)
valids.validate_asplain(number)
self._number = number

@classmethod
def from_str(cls, value: str) -> "Community":
validators.validate_community(value)
return cls(converters.community_to_asplain(value))
valids.validate_community(value)
return cls(convs.community_to_asplain(value))

def __eq__(self, other: t.Any) -> bool:
return isinstance(other, self.__class__) and (self._number == other._number)
Expand Down
81 changes: 50 additions & 31 deletions netsome/types/ipv4.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,35 @@
import typing as t

from netsome import constants as c
from netsome._converters import ipv4 as converters
from netsome.validators import ipv4 as validators
from netsome._converters import ipv4 as convs
from netsome.validators import ipv4 as valids


class IPv4Address:

PREFIXLEN_MIN = c.IPV4.PREFIXLEN_MIN
PREFIXLEN_MAX = c.IPV4.PREFIXLEN_MAX

ADDRESS_MIN = c.IPV4.ADDRESS_MIN
ADDRESS_MAX = c.IPV4.PREFIXLEN_MAX

OCTET_MIN = c.IPV4.OCTET_MIN
OCTET_MAX = c.IPV4.OCTET_MAX

def __init__(self, address: str) -> None:
validators.validate_address_str(address)
self._addr = converters.address_to_int(address)
valids.validate_address_str(address)
self._addr = convs.address_to_int(address)

@classmethod
def from_int(cls, number: int) -> "IPv4Address":
validators.validate_address_int(number)
return cls(converters.int_to_address(number))
valids.validate_address_int(number)
obj = cls.__new__(cls)
obj._addr = number
return obj

@functools.cached_property
def address(self) -> str:
return converters.int_to_address(self._addr)
return convs.int_to_address(self._addr)

def __int__(self) -> int:
return self._addr
Expand All @@ -37,31 +49,38 @@ def __eq__(self, other: t.Any) -> bool:


class IPv4Network:

def __init__(self, network: str) -> None:
address, prefixlen = network.split(c.SLASH, maxsplit=1)
validators.validate_address_str(address)
validators.validate_prefixlen_str(prefixlen)
validators.validate_network_int(
converters.address_to_int(address), int(prefixlen)
)

self._prefixlen = int(prefixlen)
self._netaddr = IPv4Address(address)
self._netmask = IPv4Address.from_int(
c.IPV4_MAX ^ (c.IPV4_MAX >> self._prefixlen)
)
# TODO(d.burmistrov): move this block into util? (validate + convert)
addr, prefixlen = network.split(c.DELIMITERS.SLASH, maxsplit=1)
kuderr marked this conversation as resolved.
Show resolved Hide resolved
valids.validate_address_str(addr)
valids.validate_prefixlen_str(prefixlen)
prefixlen = int(prefixlen)
valids.validate_network_int(convs.address_to_int(addr), prefixlen)

def __repr__(self) -> str:
return f'{self.__class__.__name__}("{self.address}")'
self._populate(IPv4Address(addr), prefixlen)
kuderr marked this conversation as resolved.
Show resolved Hide resolved

def _populate(self, netaddr: IPv4Address, prefixlen: int) -> None:
self._prefixlen = prefixlen
self._netaddr = netaddr
m = c.IPV4.ADDRESS_MAX
self._netmask = IPv4Address.from_int(m ^ (m >> prefixlen))

@classmethod
def from_int(cls, int_address: int, prefixlen: int) -> "IPv4Network":
validators.validate_address_int(int_address)
validators.validate_prefixlen_int(prefixlen)
validators.validate_network_int(int_address, prefixlen)
def from_int(cls, int_addr: int, prefixlen: int) -> "IPv4Network":
valids.validate_address_int(int_addr)
valids.validate_prefixlen_int(prefixlen)
valids.validate_network_int(int_addr, prefixlen)

obj = cls.__new__(cls)
obj._populate(IPv4Address.from_int(int_addr), prefixlen)
return obj

address = f"{converters.int_to_address(int_address)}{c.SLASH}{prefixlen}"
return cls(address)
def as_tuple(self) -> tuple[int, int]:
return int(self.netaddress), self._prefixlen

def __repr__(self) -> str:
return f'{self.__class__.__name__}("{self.address}")'

@property
def prefixlen(self) -> int:
Expand All @@ -77,11 +96,11 @@ def netmask(self) -> IPv4Address:

@functools.cached_property
def address(self) -> str:
return f"{self._netaddr.address}/{self._prefixlen}"
return c.DELIMITERS.SLASH.join_as_str((self._netaddr.address, self._prefixlen))

@functools.cached_property
def hostmask(self) -> IPv4Address:
return IPv4Address.from_int(int(self._netmask) ^ c.IPV4_MAX)
return IPv4Address.from_int(int(self._netmask) ^ c.IPV4.ADDRESS_MAX)

@functools.cached_property
def broadcast(self) -> IPv4Address:
Expand All @@ -93,7 +112,7 @@ def subnets(
) -> t.Generator["IPv4Network", None, None]:
new_prefixlen = self._prefixlen + 1
if prefixlen:
validators.validate_prefixlen_int(prefixlen, min_len=new_prefixlen)
valids.validate_prefixlen_int(prefixlen, min_len=new_prefixlen)
new_prefixlen = prefixlen

prefixlen_diff = new_prefixlen - self._prefixlen
Expand All @@ -111,7 +130,7 @@ def supernet(
) -> "IPv4Network":
new_prefixlen = self._prefixlen - 1
if prefixlen:
validators.validate_prefixlen_int(prefixlen, max_len=new_prefixlen)
valids.validate_prefixlen_int(prefixlen, max_len=new_prefixlen)
new_prefixlen = prefixlen

prefixlen_diff = self._prefixlen - new_prefixlen
Expand Down
12 changes: 8 additions & 4 deletions netsome/types/vlans.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import typing as t

from netsome import constants as c
from netsome.validators import vlans as validators
from netsome.validators import vlans as valids


class VID:
"""VLAN ID"""

_RESERVED = {c.ZERO, c.DEFAULT_VID, c.VID_MAX}
VID_MIN = c.VLAN.MIN
VID_DEFAULT = c.VLAN.DEFAULT
VID_MAX = c.VLAN.MAX

_RESERVED = {VID_MIN, VID_DEFAULT, VID_MAX}

def __init__(self, vid: int) -> None:
validators.validate_vid(vid)
valids.validate_vid(vid)
self._vid = vid

def __eq__(self, other: t.Any) -> bool:
Expand All @@ -29,4 +33,4 @@ def is_reserved(self) -> bool:
return self._vid in self._RESERVED

def is_default(self) -> bool:
return self._vid == c.DEFAULT_VID
return self._vid == self.VID_DEFAULT
Loading