From 86cb8d64099661114f69afc14408ef63f8891afe Mon Sep 17 00:00:00 2001 From: Iwan Briquemont Date: Sun, 5 Feb 2017 12:42:53 +0100 Subject: [PATCH] Add isolated MIB loaders to support multiple sets of MIBs --- snimpy/manager.py | 34 +-- snimpy/mib.py | 417 ++++++++++++++++++---------------- tests/SNIMPY2-MIB.mib | 511 ++++++++++++++++++++++++++++++++++++++++++ tests/agent.py | 366 +++++++++++++++--------------- tests/test_manager.py | 16 ++ tests/test_mib.py | 176 +++++++++------ 6 files changed, 1075 insertions(+), 445 deletions(-) create mode 100644 tests/SNIMPY2-MIB.mib diff --git a/snimpy/manager.py b/snimpy/manager.py index e11574a..f2208fd 100644 --- a/snimpy/manager.py +++ b/snimpy/manager.py @@ -30,7 +30,7 @@ import inspect from time import time -from collections import MutableMapping, Container, Iterable, Sized +from collections import MutableMapping, Container, Iterable, Sized, defaultdict from snimpy import snmp, mib, basictypes @@ -234,7 +234,8 @@ def __init__(self, secname=None, authprotocol=None, authpassword=None, privprotocol=None, privpassword=None, - contextname=None): + contextname=None, + mibloader=None): """Create a new SNMP manager. Some of the parameters are explained in :meth:`snmp.Session.__init__`. @@ -271,7 +272,12 @@ def __init__(self, :param bulk: Max-repetition to use to speed up MIB walking with `GETBULK`. Set to `0` to disable. :type bulk: int + :param mibloader: The MibLoader instance which has the loaded MIBs + :type mibloader: MibLoader """ + if mibloader is None: + mibloader = mib._mibloader + self._mibloader = mibloader if host is None: host = Manager._host self._host = host @@ -293,7 +299,7 @@ def __init__(self, if none: self._session = NoneSession(self._session) self._loose = loose - self._loaded = loaded + self._loaded = loaded[self._mibloader] # To be able to clone, we save the arguments provided to the # constructor in a generic way @@ -306,7 +312,7 @@ def __init__(self, def _locate(self, attribute): for m in self._loaded: try: - a = mib.get(m, attribute) + a = self._mibloader.get(m, attribute) return (m, a) except mib.SMIException: pass @@ -345,7 +351,7 @@ def __setattr__(self, attribute, value): def __getitem__(self, modulename): modulename = modulename.encode('ascii') - for m in loaded: + for m in loaded[self._mibloader]: if modulename == m: return MibRestrictedManager(self, [m]) raise KeyError("{0} is not a loaded module".format(modulename)) @@ -568,20 +574,22 @@ def iteritems(self, table_filter=None): return ProxyIter.iteritems(self, resulting_filter) -loaded = [] +loaded = defaultdict(list) -def load(mibname): +def load(mibname, mibloader=None): """Load a MIB in memory. :param mibname: MIB name or filename :type mibname: str """ - m = mib.load(mibname) - if m not in loaded: - loaded.append(m) + if mibloader is None: + mibloader = mib._mibloader + m = mibloader.load(mibname) + if m not in loaded[mibloader]: + loaded[mibloader].append(m) if Manager._complete: - for o in mib.getScalars(m) + \ - mib.getColumns(m) + \ - mib.getTables(m): + for o in mibloader.getScalars(m) + \ + mibloader.getColumns(m) + \ + mibloader.getTables(m): setattr(Manager, str(o), 1) diff --git a/snimpy/mib.py b/snimpy/mib.py index 59fc5b0..01aebed 100644 --- a/snimpy/mib.py +++ b/snimpy/mib.py @@ -47,13 +47,14 @@ class Node(object): :class:`Table`, :class:`Column`, :class:`Node`. """ - def __init__(self, node): + def __init__(self, node, mibloader): """Create a new MIB node. :param node: libsmi node supporting this node. """ self.node = node self._override_type = None + self.mibloader = mibloader @property def type(self): @@ -64,6 +65,7 @@ def type(self): this node, the returned class can be instanciated to get an appropriate representation. """ + self.mibloader._switch() from snimpy import basictypes if self._override_type: t = self._override_type @@ -103,6 +105,7 @@ def typeName(self): :return: A string representing the current declared type, suitable for assignment to type.setter. """ + self.mibloader._switch() if self._override_type: t = self._override_type else: @@ -125,12 +128,13 @@ def typeName(self, type_name): :param type_name: string name of the type. """ + self.mibloader._switch() current_override = self._override_type declared_type = _smi.smiGetNodeType(self.node) declared_basetype = self.type - new_type = _getType(type_name) + new_type = self.mibloader._getType(type_name) if not new_type: raise SMIException("no type named {0} in any loaded module".format( type_name)) @@ -163,6 +167,7 @@ def fmt(self): format available. """ + self.mibloader._switch() if self._override_type: t = self._override_type else: @@ -182,6 +187,7 @@ def oid(self): :return: OID as a tuple """ + self.mibloader._switch() return tuple([self.node.oid[i] for i in range(self.node.oidlen)]) @property @@ -197,6 +203,7 @@ def ranges(self): :return: The valid range for this node. """ + self.mibloader._switch() t = _smi.smiGetNodeType(self.node) if t == ffi.NULL: return None @@ -225,6 +232,7 @@ def enum(self): :return: The dictionary of possible values keyed by the integer value. """ + self.mibloader._switch() t = _smi.smiGetNodeType(self.node) if t == ffi.NULL or t.basetype not in (_smi.SMI_BASETYPE_ENUM, _smi.SMI_BASETYPE_BITS): @@ -239,9 +247,11 @@ def enum(self): return result def __str__(self): + self.mibloader._switch() return ffi.string(self.node.name).decode("ascii") def __repr__(self): + self.mibloader._switch() r = _smi.smiRenderNode(self.node, _smi.SMI_RENDER_ALL) if r == ffi.NULL: return "".format( @@ -256,6 +266,7 @@ def __repr__(self): ffi.string(module.name)) def _convert(self, value): + self.mibloader._switch() attr = {_smi.SMI_BASETYPE_INTEGER32: "integer32", _smi.SMI_BASETYPE_UNSIGNED32: "unsigned32", _smi.SMI_BASETYPE_INTEGER64: "integer64", @@ -289,6 +300,7 @@ def columns(self): :return: list of table columns (:class:`Column` instances) """ + self.mibloader._switch() child = _smi.smiGetFirstChildNode(self.node) if child == ffi.NULL: return [] @@ -303,7 +315,7 @@ def columns(self): raise SMIException("child {0} of {1} is not a column".format( ffi.string(child.name), ffi.string(self.node.name))) - columns.append(Column(child)) + columns.append(Column(child, self.mibloader)) child = _smi.smiGetNextChildNode(child) return columns @@ -313,6 +325,7 @@ def _row(self): :return: row object (as an opaque object) """ + self.mibloader._switch() child = _smi.smiGetFirstChildNode(self.node) if child != ffi.NULL and child.indexkind == _smi.SMI_INDEX_AUGMENT: child = _smi.smiGetRelatedNode(child) @@ -342,6 +355,7 @@ def implied(self): :return: `True` if and only if the last index is implied. """ + self.mibloader._switch() child = self._row if child.implied: return True @@ -355,6 +369,7 @@ def index(self): :return: The list of indexes (as :class:`Column` instances) of the table. """ + self.mibloader._switch() child = self._row lindex = [] element = _smi.smiGetFirstElement(child) @@ -369,7 +384,7 @@ def index(self): "not a column".format( ffi.string(nelement.name), ffi.string(self.node.name))) - lindex.append(Column(nelement)) + lindex.append(Column(nelement, self.mibloader)) element = _smi.smiGetNextElement(element) return lindex @@ -385,6 +400,7 @@ def table(self): :return: The :class:`Table` instance associated to this column. """ + self.mibloader._switch() parent = _smi.smiGetParentNode(self.node) if parent == ffi.NULL: raise SMIException("unable to get parent of {0}".format( @@ -401,7 +417,7 @@ def table(self): raise SMIException("parent {0} of {1} is not a table".format( ffi.string(parent.name), ffi.string(self.node.name))) - t = Table(parent) + t = Table(parent, self.mibloader) return t @@ -420,6 +436,7 @@ def _logError(path, line, severity, msg, tag): def reset(): """Reset libsmi to its initial state.""" + global _mibloader _smi.smiExit() if _smi.smiInit(b"snimpy") < 0: raise SMIException("unable to init libsmi") @@ -429,210 +446,236 @@ def reset(): _smi.smiSetFlags(_smi.SMI_FLAG_ERRORS | _smi.SMI_FLAG_RECURSIVE) except TypeError: pass # We are being mocked + _mibloader = MibLoader() -def path(path=None): - """Set or get a search path to libsmi. +class MibLoader(object): + """Isolated MIB loader to support multiple sets of MIBs""" - When no path is provided, return the current path, - unmodified. Otherwise, set the path to the specified value. + _datasetid = 0 - :param path: The string to be used to change the search path or - `None` - - """ - if path is None: - # Get the path - path = _smi.smiGetPath() - if path == ffi.NULL: - raise SMIException("unable to get current libsmi path") - path = ffi.gc(path, _smi.free) - result = ffi.string(path) - return result.decode("utf8") - - # Set the path - if not isinstance(path, bytes): - path = path.encode("utf8") - if _smi.smiSetPath(path) < 0: - raise SMIException("unable to set the path {0}".format(path)) - - -def _get_module(name): - """Get the SMI module from its name. - - :param name: The name of the module - :return: The SMI module or `None` if not found (not loaded) - """ - if not isinstance(name, bytes): - name = name.encode("ascii") - m = _smi.smiGetModule(name) - if m == ffi.NULL: - return None - if m.conformance and m.conformance <= 1: - return None - return m + def __init__(self): + global _datasetid + self._tag = "snimpy:{}".format( + MibLoader._datasetid + ).encode('ascii') + MibLoader._datasetid += 1 + def _switch(self): + """Switch to this object's set of loaded MIBs""" + if _smi.smiInit(self._tag) < 0: + raise SMIException("unable to switch to the MIB loader") -def _kind2object(kind): - return { - _smi.SMI_NODEKIND_NODE: Node, - _smi.SMI_NODEKIND_SCALAR: Scalar, - _smi.SMI_NODEKIND_TABLE: Table, - _smi.SMI_NODEKIND_COLUMN: Column - }.get(kind, Node) + def path(self, path=None): + """Set or get a search path to libsmi. + When no path is provided, return the current path, + unmodified. Otherwise, set the path to the specified value. -def get(mib, name): - """Get a node by its name. + :param path: The string to be used to change the search path or + `None` - :param mib: The MIB name to query - :param name: The object name to get from the MIB - :return: the requested MIB node (:class:`Node`) - """ - if not isinstance(mib, bytes): - mib = mib.encode("ascii") - module = _get_module(mib) - if module is None: - raise SMIException("no module named {0}".format(mib)) - node = _smi.smiGetNode(module, name.encode("ascii")) - if node == ffi.NULL: - raise SMIException("in {0}, no node named {1}".format( - mib, name)) - pnode = _kind2object(node.nodekind) - return pnode(node) - - -def getByOid(oid): - """Get a node by its OID. - - :param oid: The OID as a tuple - :return: The requested MIB node (:class:`Node`) - """ - node = _smi.smiGetNodeByOID(len(oid), oid) - if node == ffi.NULL: - raise SMIException("no node for {0}".format( - ".".join([str(o) for o in oid]))) - pnode = _kind2object(node.nodekind) - return pnode(node) - - -def _getType(type_name): - """Searches for a smi type through all loaded modules. - - :param type_name: The name of the type to search for. - :return: The requested type (:class:`smi.SmiType`), if found, or None. - """ - if not isinstance(type_name, bytes): - type_name = type_name.encode("ascii") - for module in _loadedModules(): - new_type = _smi.smiGetType(module, type_name) - if new_type != ffi.NULL: - return new_type - return None - - -def _get_kind(mib, kind): - """Get nodes of a given kind from a MIB. - - :param mib: The MIB name to search objects for - :param kind: The SMI kind of object - :return: The list of matched MIB nodes for the MIB - """ - if not isinstance(mib, bytes): - mib = mib.encode("ascii") - module = _get_module(mib) - if module is None: - raise SMIException("no module named {0}".format(mib)) - lnode = [] - node = _smi.smiGetFirstNode(module, kind) - while node != ffi.NULL: - lnode.append(_kind2object(kind)(node)) - node = _smi.smiGetNextNode(node, kind) - return lnode - - -def getNodes(mib): - """Return all nodes from a given MIB. - - :param mib: The MIB name - :return: The list of all MIB nodes for the MIB - :rtype: list of :class:`Node` instances - """ - return _get_kind(mib, _smi.SMI_NODEKIND_NODE) - - -def getScalars(mib): - """Return all scalars from a given MIB. - - :param mib: The MIB name - :return: The list of all scalars for the MIB - :rtype: list of :class:`Scalar` instances - """ - return _get_kind(mib, _smi.SMI_NODEKIND_SCALAR) + """ + self._switch() + if path is None: + # Get the path + path = _smi.smiGetPath() + if path == ffi.NULL: + raise SMIException("unable to get current libsmi path") + path = ffi.gc(path, _smi.free) + result = ffi.string(path) + return result.decode("utf8") + + # Set the path + if not isinstance(path, bytes): + path = path.encode("utf8") + if _smi.smiSetPath(path) < 0: + raise SMIException("unable to set the path {0}".format(path)) + + def _get_module(self, name): + """Get the SMI module from its name. + + :param name: The name of the module + :return: The SMI module or `None` if not found (not loaded) + """ + self._switch() + if not isinstance(name, bytes): + name = name.encode("ascii") + m = _smi.smiGetModule(name) + if m == ffi.NULL: + return None + if m.conformance and m.conformance <= 1: + return None + return m + + def _kind2object(self, kind): + return { + _smi.SMI_NODEKIND_NODE: Node, + _smi.SMI_NODEKIND_SCALAR: Scalar, + _smi.SMI_NODEKIND_TABLE: Table, + _smi.SMI_NODEKIND_COLUMN: Column + }.get(kind, Node) + + def get(self, mib, name): + """Get a node by its name. + + :param mib: The MIB name to query + :param name: The object name to get from the MIB + :return: the requested MIB node (:class:`Node`) + """ + self._switch() + if not isinstance(mib, bytes): + mib = mib.encode("ascii") + module = self._get_module(mib) + if module is None: + raise SMIException("no module named {0}".format(mib)) + node = _smi.smiGetNode(module, name.encode("ascii")) + if node == ffi.NULL: + raise SMIException("in {0}, no node named {1}".format( + mib, name)) + pnode = self._kind2object(node.nodekind) + return pnode(node, self) + + def getByOid(self, oid): + """Get a node by its OID. + + :param oid: The OID as a tuple + :return: The requested MIB node (:class:`Node`) + """ + self._switch() + node = _smi.smiGetNodeByOID(len(oid), oid) + if node == ffi.NULL: + raise SMIException("no node for {0}".format( + ".".join([str(o) for o in oid]))) + pnode = self._kind2object(node.nodekind) + return pnode(node, self) + + def _getType(self, type_name): + """Searches for a smi type through all loaded modules. + + :param type_name: The name of the type to search for. + :return: The requested type (:class:`smi.SmiType`), if found, or None. + """ + if not isinstance(type_name, bytes): + type_name = type_name.encode("ascii") + for module in self._loadedModules(): + new_type = _smi.smiGetType(module, type_name) + if new_type != ffi.NULL: + return new_type + return None + def _get_kind(self, mib, kind): + """Get nodes of a given kind from a MIB. -def getTables(mib): - """Return all tables from a given MIB. + :param mib: The MIB name to search objects for + :param kind: The SMI kind of object + :return: The list of matched MIB nodes for the MIB + """ + if not isinstance(mib, bytes): + mib = mib.encode("ascii") + module = self._get_module(mib) + if module is None: + raise SMIException("no module named {0}".format(mib)) + lnode = [] + node = _smi.smiGetFirstNode(module, kind) + while node != ffi.NULL: + lnode.append(self._kind2object(kind)(node, self)) + node = _smi.smiGetNextNode(node, kind) + return lnode + + def getNodes(self, mib): + """Return all nodes from a given MIB. + + :param mib: The MIB name + :return: The list of all MIB nodes for the MIB + :rtype: list of :class:`Node` instances + """ + self._switch() + return self._get_kind(mib, _smi.SMI_NODEKIND_NODE) - :param mib: The MIB name - :return: The list of all tables for the MIB - :rtype: list of :class:`Table` instances - """ - return _get_kind(mib, _smi.SMI_NODEKIND_TABLE) + def getScalars(self, mib): + """Return all scalars from a given MIB. + :param mib: The MIB name + :return: The list of all scalars for the MIB + :rtype: list of :class:`Scalar` instances + """ + self._switch() + return self._get_kind(mib, _smi.SMI_NODEKIND_SCALAR) -def getColumns(mib): - """Return all columns from a givem MIB. + def getTables(self, mib): + """Return all tables from a given MIB. - :param mib: The MIB name - :return: The list of all columns for the MIB - :rtype: list of :class:`Column` instances - """ - return _get_kind(mib, _smi.SMI_NODEKIND_COLUMN) + :param mib: The MIB name + :return: The list of all tables for the MIB + :rtype: list of :class:`Table` instances + """ + self._switch() + return self._get_kind(mib, _smi.SMI_NODEKIND_TABLE) + def getColumns(self, mib): + """Return all columns from a givem MIB. -def load(mib): - """Load a MIB into the library. + :param mib: The MIB name + :return: The list of all columns for the MIB + :rtype: list of :class:`Column` instances + """ + self._switch() + return self._get_kind(mib, _smi.SMI_NODEKIND_COLUMN) - :param mib: The MIB to load, either a filename or a MIB name. - :return: The MIB name that has been loaded. - :except SMIException: The requested MIB cannot be loaded. - """ - if not isinstance(mib, bytes): - mib = mib.encode("ascii") - modulename = _smi.smiLoadModule(mib) - if modulename == ffi.NULL: - raise SMIException("unable to find {0} (check the path)".format(mib)) - modulename = ffi.string(modulename) - if not _get_module(modulename.decode("ascii")): - details = "check with smilint -s -l1" - if _lastError is not None: - details = "{0}: {1}".format(_lastError, - details) - raise SMIException( - "{0} contains major SMI error ({1})".format(mib, details)) - return modulename - - -def _loadedModules(): - """Generates the list of loaded modules. - - :yield: The :class:`smi.SmiModule` of all currently loaded modules. - """ - module = _smi.smiGetFirstModule() - while module != ffi.NULL: - yield module + def load(self, mib): + """Load a MIB into the library. - module = _smi.smiGetNextModule(module) + :param mib: The MIB to load, either a filename or a MIB name. + :return: The MIB name that has been loaded. + :except SMIException: The requested MIB cannot be loaded. + """ + self._switch() + if not isinstance(mib, bytes): + mib = mib.encode("ascii") + modulename = _smi.smiLoadModule(mib) + if modulename == ffi.NULL: + raise SMIException( + "unable to find {0} (check the path)".format(mib)) + modulename = ffi.string(modulename) + if not self._get_module(modulename.decode("ascii")): + details = "check with smilint -s -l1" + if _lastError is not None: + details = "{0}: {1}".format(_lastError, + details) + raise SMIException( + "{0} contains major SMI error ({1})".format(mib, details)) + return modulename + + def _loadedModules(self): + """Generates the list of loaded modules. + + :yield: The :class:`smi.SmiModule` of all currently loaded modules. + """ + module = _smi.smiGetFirstModule() + while module != ffi.NULL: + yield module + module = _smi.smiGetNextModule(module) -def loadedMibNames(): - """Generates the list of loaded MIB names. + def loadedMibNames(self): + """Generates the list of loaded MIB names. - :yield: The names of all currently loaded MIBs. - """ - for module in _loadedModules(): - yield ffi.string(module.name).decode('utf-8') + :yield: The names of all currently loaded MIBs. + """ + self._switch() + for module in self._loadedModules(): + yield ffi.string(module.name).decode('utf-8') reset() +_getType = _mibloader._getType +path = _mibloader.path +get = _mibloader.get +getByOid = _mibloader.getByOid +getColumns = _mibloader.getColumns +getNodes = _mibloader.getNodes +getScalars = _mibloader.getScalars +getTables = _mibloader.getTables +load = _mibloader.load +loadedMibNames = _mibloader.loadedMibNames diff --git a/tests/SNIMPY2-MIB.mib b/tests/SNIMPY2-MIB.mib new file mode 100644 index 0000000..762e42f --- /dev/null +++ b/tests/SNIMPY2-MIB.mib @@ -0,0 +1,511 @@ +SNIMPY2-MIB DEFINITIONS ::= BEGIN + +IMPORTS + MODULE-IDENTITY, OBJECT-TYPE, + IpAddress, Integer32, Gauge32, + TimeTicks, Counter64, + Counter32, mib-2 FROM SNMPv2-SMI + DisplayString, TEXTUAL-CONVENTION, + PhysAddress, TruthValue FROM SNMPv2-TC + InetAddressType, InetAddress, + InetAddressIPv4, InetAddressIPv6 FROM INET-ADDRESS-MIB + IANAifType FROM IANAifType-MIB; + + +snimpy2 MODULE-IDENTITY + LAST-UPDATED "200809160000Z" + ORGANIZATION + "snimpy2 + https://github.com/vincentbernat/snimpy" + CONTACT-INFO + "Lorem ipsum, etc, etc." + DESCRIPTION + "This is a test MIB module for snimpy MibLoader." + + REVISION "200809160000Z" + DESCRIPTION "Last revision" + ::= { mib-2 45121 } + +OddInteger ::= TEXTUAL-CONVENTION + DISPLAY-HINT "d-2" + STATUS current + DESCRIPTION + "Testing fmt" + SYNTAX INTEGER (6..18 | 20..23 | 27 | 28..1336) + +UnicodeString ::= TEXTUAL-CONVENTION + DISPLAY-HINT "255t" + STATUS current + DESCRIPTION + "Testing fmt" + SYNTAX OCTET STRING (SIZE(0..255)) + +snimpy2Scalars OBJECT IDENTIFIER ::= { snimpy2 1 } +snimpy2Tables OBJECT IDENTIFIER ::= { snimpy2 2 } + +snimpy2IpAddress OBJECT-TYPE + SYNTAX IpAddress + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "An IP address" + ::= { snimpy2Scalars 1 } + +snimpy2String OBJECT-TYPE + SYNTAX DisplayString (SIZE (0..255)) + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "An string to display" + ::= { snimpy2Scalars 2 } + +snimpy2Integer OBJECT-TYPE + SYNTAX OddInteger + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "An integer" + ::= { snimpy2Scalars 3 } + +snimpy2Enum OBJECT-TYPE + SYNTAX INTEGER { + up(1), + down(2), + testing(3) + } + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "An enumeration" + ::= { snimpy2Scalars 4 } + +snimpy2ObjectId OBJECT-TYPE + SYNTAX OBJECT IDENTIFIER + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "An oid" + ::= { snimpy2Scalars 5 } + +snimpy2Boolean OBJECT-TYPE + SYNTAX TruthValue + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "A boolean" + ::= { snimpy2Scalars 6 } + +snimpy2Counter OBJECT-TYPE + SYNTAX Counter32 + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "A 32 bits counter" + ::= { snimpy2Scalars 7 } + +snimpy2Gauge OBJECT-TYPE + SYNTAX Gauge32 + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "A 32 bits gauge" + ::= { snimpy2Scalars 8 } + +snimpy2Timeticks OBJECT-TYPE + SYNTAX TimeTicks + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "A timetick" + ::= { snimpy2Scalars 9 } + +snimpy2Counter64 OBJECT-TYPE + SYNTAX Counter64 + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "A 64-bit counter" + ::= { snimpy2Scalars 10 } + +snimpy2Bits OBJECT-TYPE + SYNTAX BITS { + first(0), + second(1), + third(2), + last(7) + } + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "A bit field" + ::= { snimpy2Scalars 11 } + +snimpy2NotImplemented OBJECT-TYPE + SYNTAX DisplayString (SIZE (0..255)) + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "An string to display (not implemented)" + ::= { snimpy2Scalars 12 } + +snimpy2OctetString OBJECT-TYPE + SYNTAX OCTET STRING + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "An string to display" + ::= { snimpy2Scalars 13 } + +snimpy2UnicodeString OBJECT-TYPE + SYNTAX UnicodeString + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "An unicode string to display" + ::= { snimpy2Scalars 14 } + +snimpy2MacAddress OBJECT-TYPE + SYNTAX PhysAddress + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "A MAC address" + ::= { snimpy2Scalars 15 } + +snimpy2MacAddressInvalid OBJECT-TYPE + SYNTAX DisplayString + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "A MAC address with invalid syntax" + ::= { snimpy2Scalars 16 } + +-- A simple table + +snimpy2SimpleTable OBJECT-TYPE + SYNTAX SEQUENCE OF Snimpy2SimpleEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "A table" + ::= { snimpy2Tables 1 } + +snimpy2SimpleEntry OBJECT-TYPE + SYNTAX Snimpy2SimpleEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Entry for our simple table" + INDEX { snimpy2SimpleIndex } + ::= { snimpy2SimpleTable 1 } + +Snimpy2SimpleEntry ::= + SEQUENCE { + snimpy2SimpleIndex Integer32, + snimpy2SimpleDescr DisplayString, + snimpy2SimpleType IANAifType, + snimpy2SimplePhys PhysAddress + } + +snimpy2SimpleIndex OBJECT-TYPE + SYNTAX Integer32 (1..30) + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Index for snimpy2 simple table" + ::= { snimpy2SimpleEntry 1 } + +snimpy2SimpleDescr OBJECT-TYPE + SYNTAX DisplayString (SIZE (0..255)) + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "Blah blah" + ::= { snimpy2SimpleEntry 2 } + +snimpy2SimpleType OBJECT-TYPE + SYNTAX IANAifType + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "Blah blah" + ::= { snimpy2SimpleEntry 3 } + +snimpy2SimplePhys OBJECT-TYPE + SYNTAX PhysAddress + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "Blah blah" + ::= { snimpy2SimpleEntry 4 } + +-- A more complex table + +snimpy2ComplexTable OBJECT-TYPE + SYNTAX SEQUENCE OF Snimpy2ComplexEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "A more complex table" + ::= { snimpy2Tables 2 } + +snimpy2ComplexEntry OBJECT-TYPE + SYNTAX Snimpy2ComplexEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Entry for our complex table" + INDEX { snimpy2ComplexFirstIP, snimpy2ComplexSecondIP } + ::= { snimpy2ComplexTable 1 } + +Snimpy2ComplexEntry ::= + SEQUENCE { + snimpy2ComplexFirstIP IpAddress, + snimpy2ComplexSecondIP IpAddress, + snimpy2ComplexState INTEGER + } + +snimpy2ComplexFirstIP OBJECT-TYPE + SYNTAX IpAddress + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "First IP address for index" + ::= { snimpy2ComplexEntry 1 } + +snimpy2ComplexSecondIP OBJECT-TYPE + SYNTAX IpAddress + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Second IP address for index" + ::= { snimpy2ComplexEntry 2 } + +snimpy2ComplexState OBJECT-TYPE + SYNTAX INTEGER { + up(1), + down(2), + testing(3) + } + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "State for our both IP" + ::= { snimpy2ComplexEntry 3 } + +-- A table with complex indexes + +snimpy2IndexTable OBJECT-TYPE + SYNTAX SEQUENCE OF Snimpy2IndexEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "A table with complex indexes" + ::= { snimpy2Tables 3 } + +snimpy2IndexEntry OBJECT-TYPE + SYNTAX Snimpy2IndexEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Entry for our indexed table" + INDEX { snimpy2IndexVarLen, snimpy2IndexOidVarLen, + snimpy2IndexFixedLen, IMPLIED snimpy2IndexImplied } + ::= { snimpy2IndexTable 1 } + +Snimpy2IndexEntry ::= + SEQUENCE { + snimpy2IndexVarLen DisplayString, + snimpy2IndexIntIndex Integer32, + snimpy2IndexOidVarLen OBJECT IDENTIFIER, + snimpy2IndexFixedLen DisplayString, + snimpy2IndexImplied DisplayString, + snimpy2IndexInt Integer32 + } + +snimpy2IndexVarLen OBJECT-TYPE + SYNTAX DisplayString (SIZE (1..10)) + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "Variable length index" + ::= { snimpy2IndexEntry 1 } + +snimpy2IndexIntIndex OBJECT-TYPE + SYNTAX Integer32 + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Integer index" + ::= { snimpy2IndexEntry 2 } + +snimpy2IndexOidVarLen OBJECT-TYPE + SYNTAX OBJECT IDENTIFIER + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "OID as index" + ::= { snimpy2IndexEntry 3 } + +snimpy2IndexFixedLen OBJECT-TYPE + SYNTAX DisplayString (SIZE (6)) + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Fixed length index" + ::= { snimpy2IndexEntry 4 } + +snimpy2IndexImplied OBJECT-TYPE + SYNTAX DisplayString (SIZE (1..30)) + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Variable length index, implied" + ::= { snimpy2IndexEntry 5 } + +snimpy2IndexInt OBJECT-TYPE + SYNTAX Integer32 + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "An integer of fixed size" + ::= { snimpy2IndexEntry 6 } + +-- A table indexed using InetAddresses + +snimpy2InetAddressTable OBJECT-TYPE + SYNTAX SEQUENCE OF Snimpy2InetAddressEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "A InetAddress table" + ::= { snimpy2Tables 4 } + +snimpy2InetAddressEntry OBJECT-TYPE + SYNTAX Snimpy2InetAddressEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Entry for our complex table" + INDEX { snimpy2InetAddressType, snimpy2InetAddress } + ::= { snimpy2InetAddressTable 1 } + +Snimpy2InetAddressEntry ::= + SEQUENCE { + snimpy2InetAddressType InetAddressType, + snimpy2InetAddress InetAddress, + snimpy2InetAddressState INTEGER + } + +snimpy2InetAddressType OBJECT-TYPE + SYNTAX InetAddressType + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Address type identifier for snimpy2InetAddress" + ::= { snimpy2InetAddressEntry 1 } + +snimpy2InetAddress OBJECT-TYPE + SYNTAX InetAddress + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Type dependent InetAddress" + ::= { snimpy2InetAddressEntry 2 } + +snimpy2InetAddressState OBJECT-TYPE + SYNTAX INTEGER { + up(1), + down(2), + testing(3) + } + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "State for the IP" + ::= { snimpy2InetAddressEntry 3 } + +-- A table that may contain invalid values + +snimpy2InvalidTable OBJECT-TYPE + SYNTAX SEQUENCE OF Snimpy2InvalidEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "A table" + ::= { snimpy2Tables 5 } + +snimpy2InvalidEntry OBJECT-TYPE + SYNTAX Snimpy2InvalidEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Entry for our invalid table" + INDEX { snimpy2InvalidIndex } + ::= { snimpy2InvalidTable 1 } + +Snimpy2InvalidEntry ::= + SEQUENCE { + snimpy2InvalidIndex Integer32, + snimpy2InvalidDescr DisplayString + } + +snimpy2InvalidIndex OBJECT-TYPE + SYNTAX Integer32 (1..30) + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Index for snimpy2 invalid table" + ::= { snimpy2InvalidEntry 1 } + +snimpy2InvalidDescr OBJECT-TYPE + SYNTAX DisplayString (SIZE (0..255)) + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "Blah blah" + ::= { snimpy2InvalidEntry 2 } + +-- A table that may be empty + +snimpy2EmptyTable OBJECT-TYPE + SYNTAX SEQUENCE OF Snimpy2EmptyEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "A table" + ::= { snimpy2Tables 6 } + +snimpy2EmptyEntry OBJECT-TYPE + SYNTAX Snimpy2EmptyEntry + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Entry for our empty table" + INDEX { snimpy2EmptyIndex } + ::= { snimpy2EmptyTable 1 } + +Snimpy2EmptyEntry ::= + SEQUENCE { + snimpy2EmptyIndex Integer32, + snimpy2EmptyDescr DisplayString + } + +snimpy2EmptyIndex OBJECT-TYPE + SYNTAX Integer32 (1..30) + MAX-ACCESS not-accessible + STATUS current + DESCRIPTION + "Index for snimpy2 empty table" + ::= { snimpy2EmptyEntry 1 } + +snimpy2EmptyDescr OBJECT-TYPE + SYNTAX DisplayString (SIZE (0..255)) + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "Blah blah" + ::= { snimpy2EmptyEntry 2 } + + +END diff --git a/tests/agent.py b/tests/agent.py index aaab81c..28a94ca 100644 --- a/tests/agent.py +++ b/tests/agent.py @@ -178,194 +178,198 @@ def getValue(self, name, idx): ifRcvAddressAddress=MibTableColumn((1, 3, 6, 1, 2, 1, 31, 1, 4, 1, 1), v2c.OctetString())) + for m in ('snimpy', 'snimpy2'): + um = m.upper() + mymib = '__MY_{}-MIB'.format(um) + args = ( + mymib, + # SNIMPY-MIB::snimpyIpAddress + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 1), + v2c.OctetString()).setMaxAccess("readwrite"), + MibScalarInstance( + (1, 3, 6, 1, 2, 1, 45121, 1, 1), (0,), + v2c.OctetString("AAAA")), + # SNIMPY-MIB::snimpyString + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 2), + v2c.OctetString()).setMaxAccess("readwrite"), + MibScalarInstance( + (1, 3, 6, 1, 2, 1, 45121, 1, 2), (0,), + v2c.OctetString("bye")), + # SNIMPY-MIB::snimpyInteger + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 3), + v2c.Integer()).setMaxAccess("readwrite"), + MibScalarInstance( + (1, 3, 6, 1, 2, 1, 45121, 1, 3), (0,), v2c.Integer(19)), + # SNIMPY-MIB::snimpyEnum + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 4), + v2c.Integer()).setMaxAccess("readwrite"), + MibScalarInstance( + (1, 3, 6, 1, 2, 1, 45121, 1, 4), (0,), v2c.Integer(2)), + # SNIMPY-MIB::snimpyObjectId + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 5), + v2c.ObjectIdentifier()).setMaxAccess("readwrite"), + MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 1, 5), ( + 0,), v2c.ObjectIdentifier((1, 3, 6, 4454, 0, 0))), + # SNIMPY-MIB::snimpyBoolean + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 6), + v2c.Integer()).setMaxAccess("readwrite"), + MibScalarInstance( + (1, 3, 6, 1, 2, 1, 45121, 1, 6), (0,), v2c.Integer(1)), + # SNIMPY-MIB::snimpyCounter + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 7), + v2c.Counter32()).setMaxAccess("readwrite"), + MibScalarInstance( + (1, 3, 6, 1, 2, 1, 45121, 1, 7), (0,), v2c.Counter32(47)), + # SNIMPY-MIB::snimpyGauge + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 8), + v2c.Gauge32()).setMaxAccess("readwrite"), + MibScalarInstance( + (1, 3, 6, 1, 2, 1, 45121, 1, 8), (0,), v2c.Gauge32(18)), + # SNIMPY-MIB::snimpyTimeticks + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 9), + v2c.TimeTicks()).setMaxAccess("readwrite"), + MibScalarInstance( + (1, 3, 6, 1, 2, 1, 45121, 1, 9), (0,), + v2c.TimeTicks(12111100)), + # SNIMPY-MIB::snimpyCounter64 + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 10), + v2c.Counter64()).setMaxAccess("readwrite"), + MibScalarInstance( + (1, 3, 6, 1, 2, 1, 45121, 1, 10), (0,), + v2c.Counter64(2 ** 48 + 3)), + # SNIMPY-MIB::snimpyBits + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 11), + v2c.OctetString()).setMaxAccess("readwrite"), + MibScalarInstance( + (1, 3, 6, 1, 2, 1, 45121, 1, 11), (0,), + v2c.OctetString(b"\xa0")), + # SNIMPY-MIB::snimpyMacAddress + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 15), + v2c.OctetString()).setMaxAccess("readwrite"), + MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 1, 15), ( + 0,), v2c.OctetString(b"\x11\x12\x13\x14\x15\x16")), + # SNIMPY-MIB::snimpyMacAddressInvalid + MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 16), + v2c.OctetString()).setMaxAccess("readwrite"), + MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 1, 16), ( + 0,), v2c.OctetString(b"\xf1\x12\x13\x14\x15\x16")), - args = ( - '__MY_SNIMPY-MIB', - # SNIMPY-MIB::snimpyIpAddress - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 1), - v2c.OctetString()).setMaxAccess("readwrite"), - MibScalarInstance( - (1, 3, 6, 1, 2, 1, 45121, 1, 1), (0,), - v2c.OctetString("AAAA")), - # SNIMPY-MIB::snimpyString - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 2), - v2c.OctetString()).setMaxAccess("readwrite"), - MibScalarInstance( - (1, 3, 6, 1, 2, 1, 45121, 1, 2), (0,), v2c.OctetString("bye")), - # SNIMPY-MIB::snimpyInteger - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 3), - v2c.Integer()).setMaxAccess("readwrite"), - MibScalarInstance( - (1, 3, 6, 1, 2, 1, 45121, 1, 3), (0,), v2c.Integer(19)), - # SNIMPY-MIB::snimpyEnum - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 4), - v2c.Integer()).setMaxAccess("readwrite"), - MibScalarInstance( - (1, 3, 6, 1, 2, 1, 45121, 1, 4), (0,), v2c.Integer(2)), - # SNIMPY-MIB::snimpyObjectId - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 5), - v2c.ObjectIdentifier()).setMaxAccess("readwrite"), - MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 1, 5), ( - 0,), v2c.ObjectIdentifier((1, 3, 6, 4454, 0, 0))), - # SNIMPY-MIB::snimpyBoolean - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 6), - v2c.Integer()).setMaxAccess("readwrite"), - MibScalarInstance( - (1, 3, 6, 1, 2, 1, 45121, 1, 6), (0,), v2c.Integer(1)), - # SNIMPY-MIB::snimpyCounter - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 7), - v2c.Counter32()).setMaxAccess("readwrite"), - MibScalarInstance( - (1, 3, 6, 1, 2, 1, 45121, 1, 7), (0,), v2c.Counter32(47)), - # SNIMPY-MIB::snimpyGauge - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 8), - v2c.Gauge32()).setMaxAccess("readwrite"), - MibScalarInstance( - (1, 3, 6, 1, 2, 1, 45121, 1, 8), (0,), v2c.Gauge32(18)), - # SNIMPY-MIB::snimpyTimeticks - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 9), - v2c.TimeTicks()).setMaxAccess("readwrite"), - MibScalarInstance( - (1, 3, 6, 1, 2, 1, 45121, 1, 9), (0,), - v2c.TimeTicks(12111100)), - # SNIMPY-MIB::snimpyCounter64 - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 10), - v2c.Counter64()).setMaxAccess("readwrite"), - MibScalarInstance( - (1, 3, 6, 1, 2, 1, 45121, 1, 10), (0,), - v2c.Counter64(2 ** 48 + 3)), - # SNIMPY-MIB::snimpyBits - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 11), - v2c.OctetString()).setMaxAccess("readwrite"), - MibScalarInstance( - (1, 3, 6, 1, 2, 1, 45121, 1, 11), (0,), - v2c.OctetString(b"\xa0")), - # SNIMPY-MIB::snimpyMacAddress - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 15), - v2c.OctetString()).setMaxAccess("readwrite"), - MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 1, 15), ( - 0,), v2c.OctetString(b"\x11\x12\x13\x14\x15\x16")), - # SNIMPY-MIB::snimpyMacAddressInvalid - MibScalar((1, 3, 6, 1, 2, 1, 45121, 1, 16), - v2c.OctetString()).setMaxAccess("readwrite"), - MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 1, 16), ( - 0,), v2c.OctetString(b"\xf1\x12\x13\x14\x15\x16")), - - # SNIMPY-MIB::snimpyIndexTable - MibTable((1, 3, 6, 1, 2, 1, 45121, 2, 3)), - MibTableRow( - (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1)).setIndexNames( - (0, "__MY_SNIMPY-MIB", "snimpyIndexVarLen"), - (0, "__MY_SNIMPY-MIB", "snimpyIndexOidVarLen"), - (0, "__MY_SNIMPY-MIB", "snimpyIndexFixedLen"), - (1, "__MY_SNIMPY-MIB", "snimpyIndexImplied")), - # SNIMPY-MIB::snimpyIndexVarLen - MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 1), - flatten(4, stringToOid('row1'), - 3, 1, 2, 3, - stringToOid('alpha5'), - stringToOid('end of row1')), - v2c.OctetString(b"row1")), - MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 1), - flatten(4, stringToOid('row2'), - 4, 1, 0, 2, 3, - stringToOid('beta32'), - stringToOid('end of row2')), - v2c.OctetString(b"row2")), - MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 1), - flatten(4, stringToOid('row3'), - 4, 120, 1, 2, 3, - stringToOid('gamma7'), - stringToOid('end of row3')), - v2c.OctetString(b"row3")), - # SNIMPY-MIB::snimpyIndexInt - MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 6), - flatten(4, stringToOid('row1'), - 3, 1, 2, 3, - stringToOid('alpha5'), - stringToOid('end of row1')), - v2c.Integer(4571)), - MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 6), - flatten(4, stringToOid('row2'), - 4, 1, 0, 2, 3, - stringToOid('beta32'), - stringToOid('end of row2')), - v2c.Integer(78741)), - MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 6), - flatten(4, stringToOid('row3'), - 4, 120, 1, 2, 3, - stringToOid('gamma7'), - stringToOid('end of row3')), - v2c.Integer(4110)), - - # SNIMPY-MIB::snimpyInvalidTable - MibTable((1, 3, 6, 1, 2, 1, 45121, 2, 5)), - MibTableRow( - (1, 3, 6, 1, 2, 1, 45121, 2, 5, 1)).setIndexNames( - (0, "__MY_SNIMPY-MIB", "snimpyInvalidIndex")), - # SNIMPY-MIB::snimpyInvalidDescr - MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 5, 1, 2), - (1,), - v2c.OctetString(b"Hello")), - MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 5, 1, 2), - (2,), - v2c.OctetString(b"\xf1\x12\x13\x14\x15\x16"))) + # SNIMPY-MIB::snimpyIndexTable + MibTable((1, 3, 6, 1, 2, 1, 45121, 2, 3)), + MibTableRow( + (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1)).setIndexNames( + (0, mymib, m+"IndexVarLen"), + (0, mymib, m+"IndexOidVarLen"), + (0, mymib, m+"IndexFixedLen"), + (1, mymib, m+"IndexImplied")), + # SNIMPY-MIB::snimpyIndexVarLen + MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 1), + flatten(4, stringToOid('row1'), + 3, 1, 2, 3, + stringToOid('alpha5'), + stringToOid('end of row1')), + v2c.OctetString(b"row1")), + MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 1), + flatten(4, stringToOid('row2'), + 4, 1, 0, 2, 3, + stringToOid('beta32'), + stringToOid('end of row2')), + v2c.OctetString(b"row2")), + MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 1), + flatten(4, stringToOid('row3'), + 4, 120, 1, 2, 3, + stringToOid('gamma7'), + stringToOid('end of row3')), + v2c.OctetString(b"row3")), + # SNIMPY-MIB::snimpyIndexInt + MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 6), + flatten(4, stringToOid('row1'), + 3, 1, 2, 3, + stringToOid('alpha5'), + stringToOid('end of row1')), + v2c.Integer(4571)), + MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 6), + flatten(4, stringToOid('row2'), + 4, 1, 0, 2, 3, + stringToOid('beta32'), + stringToOid('end of row2')), + v2c.Integer(78741)), + MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 6), + flatten(4, stringToOid('row3'), + 4, 120, 1, 2, 3, + stringToOid('gamma7'), + stringToOid('end of row3')), + v2c.Integer(4110)), - if self.emptyTable: - args += ( - # SNIMPY-MIB::snimpyEmptyTable - MibTable((1, 3, 6, 1, 2, 1, 45121, 2, 6)), + # SNIMPY-MIB::snimpyInvalidTable + MibTable((1, 3, 6, 1, 2, 1, 45121, 2, 5)), MibTableRow( - (1, 3, 6, 1, 2, 1, 45121, 2, 6, 1)).setIndexNames( - (0, "__MY_SNIMPY-MIB", "snimpyEmptyIndex"))) + (1, 3, 6, 1, 2, 1, 45121, 2, 5, 1)).setIndexNames( + (0, mymib, m+"InvalidIndex")), + # SNIMPY-MIB::snimpyInvalidDescr + MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 5, 1, 2), + (1,), + v2c.OctetString(b"Hello")), + MibScalarInstance((1, 3, 6, 1, 2, 1, 45121, 2, 5, 1, 2), + (2,), + v2c.OctetString( + b"\xf1\x12\x13\x14\x15\x16"))) - kwargs = dict( - # Indexes - snimpyIndexVarLen=MibTableColumn( - (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 1), - v2c.OctetString( - )), - snimpyIndexIntIndex=MibTableColumn( - (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 2), - v2c.Integer( - )).setMaxAccess( - "noaccess"), - snimpyIndexOidVarLen=MibTableColumn( - (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 3), - v2c.ObjectIdentifier( - )).setMaxAccess( - "noaccess"), - snimpyIndexFixedLen=MibTableColumn( - (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 4), - v2c.OctetString( - ).setFixedLength( - 6)).setMaxAccess( - "noaccess"), - snimpyIndexImplied=MibTableColumn( - (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 5), - v2c.OctetString( - )).setMaxAccess("noaccess"), - snimpyIndexInt=MibTableColumn( - (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 6), - v2c.Integer()).setMaxAccess("readwrite"), - snimpyInvalidIndex=MibTableColumn( - (1, 3, 6, 1, 2, 1, 45121, 2, 5, 1, 1), - v2c.Integer()).setMaxAccess("noaccess"), - snimpyInvalidDescr=MibTableColumn( - (1, 3, 6, 1, 2, 1, 45121, 2, 5, 1, 2), - v2c.OctetString()).setMaxAccess("readwrite") - ) + if self.emptyTable: + args += ( + # SNIMPY-MIB::snimpyEmptyTable + MibTable((1, 3, 6, 1, 2, 1, 45121, 2, 6)), + MibTableRow( + (1, 3, 6, 1, 2, 1, 45121, 2, 6, 1)).setIndexNames( + (0, mymib, m+"EmptyIndex"))) - if self.emptyTable: - kwargs.update(dict( - snimpyEmptyIndex=MibTableColumn( - (1, 3, 6, 1, 2, 1, 45121, 2, 6, 1, 1), + kwargs = dict( + # Indexes + snimpyIndexVarLen=MibTableColumn( + (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 1), + v2c.OctetString( + )), + snimpyIndexIntIndex=MibTableColumn( + (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 2), + v2c.Integer( + )).setMaxAccess( + "noaccess"), + snimpyIndexOidVarLen=MibTableColumn( + (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 3), + v2c.ObjectIdentifier( + )).setMaxAccess( + "noaccess"), + snimpyIndexFixedLen=MibTableColumn( + (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 4), + v2c.OctetString( + ).setFixedLength( + 6)).setMaxAccess( + "noaccess"), + snimpyIndexImplied=MibTableColumn( + (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 5), + v2c.OctetString( + )).setMaxAccess("noaccess"), + snimpyIndexInt=MibTableColumn( + (1, 3, 6, 1, 2, 1, 45121, 2, 3, 1, 6), + v2c.Integer()).setMaxAccess("readwrite"), + snimpyInvalidIndex=MibTableColumn( + (1, 3, 6, 1, 2, 1, 45121, 2, 5, 1, 1), v2c.Integer()).setMaxAccess("noaccess"), - snimpyEmptyDescr=MibTableColumn( - (1, 3, 6, 1, 2, 1, 45121, 2, 6, 1, 2), - v2c.OctetString()).setMaxAccess("readwrite"))) + snimpyInvalidDescr=MibTableColumn( + (1, 3, 6, 1, 2, 1, 45121, 2, 5, 1, 2), + v2c.OctetString()).setMaxAccess("readwrite") + ) + + if self.emptyTable: + kwargs.update(dict( + snimpyEmptyIndex=MibTableColumn( + (1, 3, 6, 1, 2, 1, 45121, 2, 6, 1, 1), + v2c.Integer()).setMaxAccess("noaccess"), + snimpyEmptyDescr=MibTableColumn( + (1, 3, 6, 1, 2, 1, 45121, 2, 6, 1, 2), + v2c.OctetString()).setMaxAccess("readwrite"))) - mibBuilder.exportSymbols(*args, **kwargs) + mibBuilder.exportSymbols(*args, **kwargs) # Start agent cmdrsp.GetCommandResponder(snmpEngine, snmpContext) diff --git a/tests/test_manager.py b/tests/test_manager.py index 492a116..a3b4f27 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -3,6 +3,7 @@ import time from datetime import timedelta from snimpy.manager import load, Manager, snmp +from snimpy.mib import MibLoader import agent if sys.version_info < (2, 7): import unittest2 as unittest @@ -239,6 +240,21 @@ def testGetChangingStuff(self): current = self.manager.ifInOctets[2] self.assertGreater(current, initial) + def testMibLoaderGet(self): + """Create another manager with a different MibLoader""" + mibloader = MibLoader() + load('IF-MIB', mibloader) + load('SNMPv2-MIB', mibloader) + load(os.path.join(os.path.dirname(os.path.abspath(__file__)), + "SNIMPY2-MIB.mib"), mibloader) + + manager = Manager(host="127.0.0.1:{0}".format(self.agent.port), + community="public", + version=2, mibloader=mibloader) + self.assertEqual(manager.snimpy2IpAddress, "65.65.65.65") + self.assertRaises(AttributeError, + getattr, manager, "snimpyIpAddress") + class TestManagerRestrictModule(TestManager): diff --git a/tests/test_mib.py b/tests/test_mib.py index b9724b3..9b4c851 100644 --- a/tests/test_mib.py +++ b/tests/test_mib.py @@ -2,6 +2,7 @@ import os import sys from snimpy import mib, basictypes +from snimpy.mib import MibLoader PYTHON3 = sys.version_info >= (3, 0) if PYTHON3: @@ -10,9 +11,14 @@ class TestMibSnimpy(unittest.TestCase): + def mibLoader(self): + return mib + def setUp(self): - mib.load(os.path.join(os.path.dirname(os.path.abspath(__file__)), - "SNIMPY-MIB.mib")) + self.mib = self.mibLoader() + self.mib.load(os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "SNIMPY-MIB.mib")) self.nodes = ["snimpy", "snimpyScalars", "snimpyTables"] @@ -75,7 +81,7 @@ def tearDown(self): def testGetNodes(self): """Test that we can get all nodes""" - nodes = mib.getNodes('SNIMPY-MIB') + nodes = self.mib.getNodes('SNIMPY-MIB') snodes = sorted([str(a) for a in nodes]) self.assertEqual(self.nodes, snodes) @@ -84,7 +90,7 @@ def testGetNodes(self): def testGetTables(self): """Test that we can get all tables""" - tables = mib.getTables('SNIMPY-MIB') + tables = self.mib.getTables('SNIMPY-MIB') stables = sorted([str(a) for a in tables]) self.assertEqual(self.tables, stables) @@ -93,7 +99,7 @@ def testGetTables(self): def testGetColumns(self): """Test that we can get all columns""" - columns = mib.getColumns('SNIMPY-MIB') + columns = self.mib.getColumns('SNIMPY-MIB') scolumns = sorted([str(a) for a in columns]) self.assertEqual(self.columns, scolumns) @@ -102,7 +108,7 @@ def testGetColumns(self): def testGetScalars(self): """Test that we can get all scalars""" - scalars = mib.getScalars('SNIMPY-MIB') + scalars = self.mib.getScalars('SNIMPY-MIB') sscalars = sorted([str(a) for a in scalars]) self.assertEqual(self.scalars, sscalars) for n in scalars: @@ -111,54 +117,60 @@ def testGetScalars(self): def testGet(self): """Test that we can get all named attributes""" for i in self.scalars: - self.assertEqual(str(mib.get('SNIMPY-MIB', i)), i) - self.assert_(isinstance(mib.get('SNIMPY-MIB', i), mib.Scalar)) + self.assertEqual(str(self.mib.get('SNIMPY-MIB', i)), i) + self.assert_(isinstance(self.mib.get('SNIMPY-MIB', i), mib.Scalar)) for i in self.tables: - self.assertEqual(str(mib.get('SNIMPY-MIB', i)), i) - self.assert_(isinstance(mib.get('SNIMPY-MIB', i), mib.Table)) + self.assertEqual(str(self.mib.get('SNIMPY-MIB', i)), i) + self.assert_(isinstance(self.mib.get('SNIMPY-MIB', i), mib.Table)) for i in self.columns: - self.assertEqual(str(mib.get('SNIMPY-MIB', i)), i) - self.assert_(isinstance(mib.get('SNIMPY-MIB', i), mib.Column)) + self.assertEqual(str(self.mib.get('SNIMPY-MIB', i)), i) + self.assert_(isinstance(self.mib.get('SNIMPY-MIB', i), mib.Column)) for i in self.nodes: - self.assertEqual(str(mib.get('SNIMPY-MIB', i)), i) - self.assert_(isinstance(mib.get('SNIMPY-MIB', i), mib.Node)) + self.assertEqual(str(self.mib.get('SNIMPY-MIB', i)), i) + self.assert_(isinstance(self.mib.get('SNIMPY-MIB', i), mib.Node)) def testGetByOid(self): """Test that we can get all named attributes by OID.""" for i in self.scalars: - nodebyname = mib.get('SNIMPY-MIB', i) - self.assertEqual(str(mib.getByOid(nodebyname.oid)), i) - self.assert_(isinstance(mib.getByOid(nodebyname.oid), mib.Scalar)) + nodebyname = self.mib.get('SNIMPY-MIB', i) + self.assertEqual(str(self.mib.getByOid(nodebyname.oid)), i) + self.assert_(isinstance(self.mib.getByOid(nodebyname.oid), + mib.Scalar)) for i in self.tables: - nodebyname = mib.get('SNIMPY-MIB', i) - self.assertEqual(str(mib.getByOid(nodebyname.oid)), i) - self.assert_(isinstance(mib.getByOid(nodebyname.oid), mib.Table)) + nodebyname = self.mib.get('SNIMPY-MIB', i) + self.assertEqual(str(self.mib.getByOid(nodebyname.oid)), i) + self.assert_(isinstance(self.mib.getByOid(nodebyname.oid), + mib.Table)) for i in self.columns: - nodebyname = mib.get('SNIMPY-MIB', i) - self.assertEqual(str(mib.getByOid(nodebyname.oid)), i) - self.assert_(isinstance(mib.getByOid(nodebyname.oid), mib.Column)) + nodebyname = self.mib.get('SNIMPY-MIB', i) + self.assertEqual(str(self.mib.getByOid(nodebyname.oid)), i) + self.assert_(isinstance(self.mib.getByOid(nodebyname.oid), + mib.Column)) for i in self.nodes: - nodebyname = mib.get('SNIMPY-MIB', i) - self.assertEqual(str(mib.getByOid(nodebyname.oid)), i) - self.assert_(isinstance(mib.getByOid(nodebyname.oid), mib.Node)) + nodebyname = self.mib.get('SNIMPY-MIB', i) + self.assertEqual(str(self.mib.getByOid(nodebyname.oid)), i) + self.assert_(isinstance(self.mib.getByOid(nodebyname.oid), + mib.Node)) def testGetByOid_UnknownOid(self): """Test that unknown OIDs raise an exception.""" - self.assertRaises(mib.SMIException, mib.getByOid, (255,)) + self.assertRaises(mib.SMIException, self.mib.getByOid, (255,)) def testGetType(self): """Test that _getType properly identifies known and unknown types.""" self.assertEqual(b"PhysAddress", - mib.ffi.string(mib._getType("PhysAddress").name)) + mib.ffi.string( + self.mib._getType("PhysAddress").name)) self.assertEqual(b"InetAddress", - mib.ffi.string(mib._getType(b"InetAddress").name)) - self.assertEqual(None, mib._getType("SomeUnknownType.kjgf")) - self.assertEqual(None, mib._getType("snimpySimpleTable")) + mib.ffi.string( + self.mib._getType(b"InetAddress").name)) + self.assertEqual(None, self.mib._getType("SomeUnknownType.kjgf")) + self.assertEqual(None, self.mib._getType("snimpySimpleTable")) def testTableColumnRelation(self): """Test that we can get the column from the table and vice-versa""" for i in self.tables: - table = mib.get('SNIMPY-MIB', i) + table = self.mib.get('SNIMPY-MIB', i) for r in table.columns: self.assert_(isinstance(r, mib.Column)) self.assertEqual(str(r.table), str(i)) @@ -171,7 +183,7 @@ def testTableColumnRelation(self): tcolumns.sort() self.assertEqual(columns, tcolumns) for r in self.columns: - column = mib.get('SNIMPY-MIB', r) + column = self.mib.get('SNIMPY-MIB', r) table = column.table self.assert_(isinstance(table, mib.Table)) prefix = str(table).replace("Table", "") @@ -198,11 +210,11 @@ def testTypes(self): "snimpyComplexSecondIP": basictypes.IpAddress, "snimpyComplexState": basictypes.Enum} for t in tt: - self.assertEqual(mib.get('SNIMPY-MIB', t).type, tt[t]) + self.assertEqual(self.mib.get('SNIMPY-MIB', t).type, tt[t]) # Also check we get an exception when no type available def call(): - mib.get('SNIMPY-MIB', 'snimpySimpleTable').type + self.mib.get('SNIMPY-MIB', 'snimpySimpleTable').type self.assertRaises(mib.SMIException, call) def testRanges(self): @@ -224,16 +236,18 @@ def testRanges(self): "snimpyComplexState": None } for t in tt: - self.assertEqual(mib.get('SNIMPY-MIB', t).ranges, tt[t]) + self.assertEqual(self.mib.get('SNIMPY-MIB', t).ranges, + tt[t]) def testEnums(self): """Test that we got the enum values correctly""" - self.assertEqual(mib.get('SNIMPY-MIB', "snimpyInteger").enum, None) - self.assertEqual(mib.get("SNIMPY-MIB", "snimpyEnum").enum, + self.assertEqual(self.mib.get('SNIMPY-MIB', + "snimpyInteger").enum, None) + self.assertEqual(self.mib.get("SNIMPY-MIB", "snimpyEnum").enum, {1: "up", 2: "down", 3: "testing"}) - self.assertEqual(mib.get("SNIMPY-MIB", "snimpyBits").enum, + self.assertEqual(self.mib.get("SNIMPY-MIB", "snimpyBits").enum, {0: "first", 1: "second", 2: "third", @@ -242,30 +256,33 @@ def testEnums(self): def testIndexes(self): """Test that we can retrieve correctly the index of tables""" self.assertEqual( - [str(i) for i in mib.get("SNIMPY-MIB", "snimpySimpleTable").index], + [str(i) for i in self.mib.get("SNIMPY-MIB", + "snimpySimpleTable").index], ["snimpySimpleIndex"]) self.assertEqual( [str(i) - for i in mib.get("SNIMPY-MIB", "snimpyComplexTable").index], + for i in self.mib.get("SNIMPY-MIB", + "snimpyComplexTable").index], ["snimpyComplexFirstIP", "snimpyComplexSecondIP"]) self.assertEqual( [str(i) - for i in mib.get("SNIMPY-MIB", "snimpyInetAddressTable").index], + for i in self.mib.get("SNIMPY-MIB", + "snimpyInetAddressTable").index], ["snimpyInetAddressType", "snimpyInetAddress"]) def testImplied(self): """Check that we can get implied attribute for a given table""" self.assertEqual( - mib.get("SNIMPY-MIB", - 'snimpySimpleTable').implied, + self.mib.get("SNIMPY-MIB", + 'snimpySimpleTable').implied, False) self.assertEqual( - mib.get("SNIMPY-MIB", - 'snimpyComplexTable').implied, + self.mib.get("SNIMPY-MIB", + 'snimpyComplexTable').implied, False) self.assertEqual( - mib.get("SNIMPY-MIB", - 'snimpyIndexTable').implied, + self.mib.get("SNIMPY-MIB", + 'snimpyIndexTable').implied, True) def testOid(self): @@ -282,41 +299,51 @@ def testOid(self): "snimpyComplexState": (1, 3, 6, 1, 2, 1, 45121, 2, 2, 1, 3), } for o in oids: - self.assertEqual(mib.get('SNIMPY-MIB', o).oid, oids[o]) + self.assertEqual(self.mib.get('SNIMPY-MIB', o).oid, oids[o]) def testLoadedMibNames(self): """Check that only expected modules were loaded.""" for module in self.expected_modules: - self.assertTrue(module in list(mib.loadedMibNames())) + self.assertTrue(module in list(self.mib.loadedMibNames())) def testLoadInexistantModule(self): """Check that we get an exception when loading an inexistant module""" - self.assertRaises(mib.SMIException, mib.load, "idontexist.gfdgfdg") + self.assertRaises(mib.SMIException, self.mib.load, + "idontexist.gfdgfdg") def testLoadInvalidModule(self): """Check that an obviously invalid module cannot be loaded""" path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "SNIMPY-INVALID-MIB.mib") - self.assertRaises(mib.SMIException, mib.load, path) - self.assertRaises(mib.SMIException, mib.getNodes, "SNIMPY-INVALID-MIB") - self.assertRaises(mib.SMIException, mib.get, + self.assertRaises(mib.SMIException, self.mib.load, path) + self.assertRaises(mib.SMIException, self.mib.getNodes, + "SNIMPY-INVALID-MIB") + self.assertRaises(mib.SMIException, self.mib.get, "SNIMPY-INVALID-MIB", "invalidSnimpyNode") def testAccesInexistantModule(self): """Check that we get an exception when querying inexistant module""" - self.assertRaises(mib.SMIException, mib.getNodes, "idontexist.kjgf") - self.assertRaises(mib.SMIException, mib.getScalars, "idontexist.kjgf") - self.assertRaises(mib.SMIException, mib.getTables, "idontexist.kjgf") - self.assertRaises(mib.SMIException, mib.getColumns, "idontexist.kjgf") + self.assertRaises(mib.SMIException, self.mib.getNodes, + "idontexist.kjgf") + self.assertRaises(mib.SMIException, self.mib.getScalars, + "idontexist.kjgf") + self.assertRaises(mib.SMIException, self.mib.getTables, + "idontexist.kjgf") + self.assertRaises(mib.SMIException, self.mib.getColumns, + "idontexist.kjgf") def testFmt(self): """Check that we get FMT from types""" - self.assertEqual(mib.get("SNIMPY-MIB", 'snimpySimplePhys').fmt, "1x:") - self.assertEqual(mib.get("SNIMPY-MIB", 'snimpyInteger').fmt, "d-2") + self.assertEqual( + self.mib.get("SNIMPY-MIB", 'snimpySimplePhys').fmt, + "1x:") + self.assertEqual( + self.mib.get("SNIMPY-MIB", 'snimpyInteger').fmt, + "d-2") def testTypeOverrides(self): """Check that we can override a type""" - table = mib.get("SNIMPY-MIB", "snimpyInetAddressTable") + table = self.mib.get("SNIMPY-MIB", "snimpyInetAddressTable") addrtype_attr = table.index[0] addr_attr = table.index[1] @@ -369,7 +396,7 @@ def testTypeOverrides(self): self.assertEqual(bytes(addr), b"\x7f\0\0\1") def testTypeOverrides_Errors(self): - table = mib.get("SNIMPY-MIB", "snimpyInetAddressTable") + table = self.mib.get("SNIMPY-MIB", "snimpyInetAddressTable") attr = table.index[1] # Value with the wrong type. @@ -389,7 +416,7 @@ def testTypeOverrides_Errors(self): def testTypeName(self): """Check that we can get the current declared type name""" - table = mib.get("SNIMPY-MIB", "snimpyInetAddressTable") + table = self.mib.get("SNIMPY-MIB", "snimpyInetAddressTable") attr = table.index[1] self.assertEqual(attr.typeName, b"InetAddress") @@ -400,9 +427,30 @@ def testTypeName(self): attr.typeName = b"InetAddressIPv6" self.assertEqual(attr.typeName, b"InetAddressIPv6") - attr = mib.get("SNIMPY-MIB", "snimpySimpleIndex") + attr = self.mib.get("SNIMPY-MIB", "snimpySimpleIndex") self.assertEqual(attr.typeName, b"Integer32") + def testMibLoader(self): + """Check that you can create an isolated MibLoader""" + mloader = MibLoader() + mloader.load(os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "SNIMPY2-MIB.mib")) + mloader.get("SNIMPY2-MIB", "snimpy2InetAddressTable") + mloader.get("SNIMPY2-MIB", "snimpy2IpAddress") + self.mib.get("SNIMPY-MIB", "snimpyInetAddressTable") + with self.assertRaises(mib.SMIException): + mloader.get("SNIMPY-MIB", "snimpyInetAddressTable") + with self.assertRaises(mib.SMIException): + self.mib.get("SNIMPY2-MIB", "snimpy2InetAddressTable") + + +class TestMibLoaderSnimpy(TestMibSnimpy): + """Tests with a different MIB loader""" + + def mibLoader(self): + return MibLoader() + class TestSmi(unittest.TestCase):