diff --git a/surfactant/infoextractors/js_file.py b/surfactant/infoextractors/js_file.py index 793bb475..f1d3f02d 100644 --- a/surfactant/infoextractors/js_file.py +++ b/surfactant/infoextractors/js_file.py @@ -4,7 +4,7 @@ # SPDX-License-Identifier: MIT import json import re -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import requests from loguru import logger @@ -18,7 +18,7 @@ class JSDatabaseManager: def __init__(self): self.js_lib_database = None - def load_db(self): + def load_db(self) -> None: js_lib_file = ( ConfigManager().get_data_dir_path() / "infoextractors" / "js_library_patterns.json" ) @@ -32,7 +32,7 @@ def load_db(self): ) self.js_lib_database = None - def get_database(self): + def get_database(self) -> Optional[Dict[str, Any]]: return self.js_lib_database @@ -90,7 +90,7 @@ def match_by_attribute(attribute: str, content: str, database: Dict) -> List[Dic return libs -def download_database() -> dict: +def download_database() -> Optional[Dict[str, Any]]: url = "https://raw.githubusercontent.com/RetireJS/retire.js/master/repository/jsrepository-master.json" response = requests.get(url) if response.status_code == 200: @@ -128,7 +128,7 @@ def strip_irrelevant_data(retirejs_db: dict) -> dict: @surfactant.plugin.hookimpl -def update_db(): +def update_db() -> str: """Retrieves the javascript library CVE database used by retire.js (https://github.com/RetireJS/retire.js/blob/master/repository/jsrepository-master.json) and only keeps the contents under each library's "extractors" section, which contains file hashes and regexes relevant for detecting a specific javascript library by its file name or contents. The resulting smaller json is written to js_library_patterns.json in the same directory. This smaller file will be read from to make the checks later on.""" @@ -147,12 +147,12 @@ def update_db(): @surfactant.plugin.hookimpl -def short_name(): +def short_name() -> str: return "js_file" @surfactant.plugin.hookimpl -def init_hook(command_name=None): +def init_hook(command_name: Optional[str]=None): """Initialization hook to load the JavaScript library database.""" if command_name != "update-db": # Do not load the database if only updating the database. logger.info("Initializing js_file...") diff --git a/surfactant/plugin/manager.py b/surfactant/plugin/manager.py index 931897df..83114225 100644 --- a/surfactant/plugin/manager.py +++ b/surfactant/plugin/manager.py @@ -9,6 +9,7 @@ from surfactant.configmanager import ConfigManager from surfactant.plugin import hookspecs +from typing import List, Optional, Any def _register_plugins(pm: pluggy.PluginManager) -> None: @@ -69,7 +70,7 @@ def _register_plugins(pm: pluggy.PluginManager) -> None: pm.register(plugin) -def set_blocked_plugins(pm: pluggy.PluginManager): +def set_blocked_plugins(pm: pluggy.PluginManager) -> None: """Gets the current list of blocked plugins from the config manager, then blocks and unregisters them with the plugin manager.""" config_manager = ConfigManager() @@ -121,7 +122,7 @@ def is_hook_implemented(pm: pluggy.PluginManager, plugin: object, hook_name: str return False -def print_plugins(pm: pluggy.PluginManager): +def print_plugins(pm: pluggy.PluginManager) -> None: print("PLUGINS") for plugin in pm.get_plugins(): plugin_name = pm.get_name(plugin) if pm.get_name(plugin) else "" @@ -139,7 +140,25 @@ def print_plugins(pm: pluggy.PluginManager): print(f"\t short name: {short_name}\n") -def find_io_plugin(pm: pluggy.PluginManager, io_format: str, function_name: str): +def find_io_plugin(pm: pluggy.PluginManager, io_format: str, function_name: str) -> Optional[Any]: + """ + Finds and returns a plugin that matches the specified input/output format and has the desired function. + + Args: + :parampm (pluggy.PluginManager): The plugin manager instance. + :param io_format (str) : The name that the plugin is registered as. + :param function_name (str) : The name of the function + + Returns: + Optional[Any]: The found plugin instance that matches the specified `io_format` and + implements the `function_name`, or `None` if no such plugin is found. + If no plugin is found, an error is logged, and the program exits. + + Raises: + SystemExit: If no plugin matching the criteria is found, the function logs an error message + and exits the program. + """ + found_plugin = pm.get_plugin(io_format) if found_plugin is None: @@ -160,14 +179,14 @@ def find_io_plugin(pm: pluggy.PluginManager, io_format: str, function_name: str) return found_plugin -def find_plugin_by_name(pm: pluggy.PluginManager, name: str): +def find_plugin_by_name(pm: pluggy.PluginManager, name: str) -> Optional[Any]: """ Finds a plugin by matching the given name against the plugin's registered name, canonical name, and its short name (if applicable). - :param pm: The plugin manager instance. - :param name: The name to match against the plugin's registered, canonical, and short names. - :return: The matched plugin instance or None if no match is found. + :param pm (pluggy.PluginManager): The plugin manager instance. + :param name (str): The name to match against the plugin's registered, canonical, and short names. + :return (Any | None): The matched plugin instance or None if no match is found. """ # Convert the name to lowercase for case-insensitive comparison name_lower = name.lower() @@ -194,13 +213,13 @@ def find_plugin_by_name(pm: pluggy.PluginManager, name: str): return None -def call_init_hooks(pm, hook_filter=None, command_name=None): +def call_init_hooks(pm: pluggy.PluginManager, hook_filter: List[str]=None, command_name: str=None) -> None: """Call the initialization hook for plugins that implement it. Args: - pm: The plugin manager instance. - hook_filter: A list of hook names to filter which plugins get initialized. - command_name: The name of the command invoking the initialization. + :param pm (pluggy.PluginManager): The plugin manager instance. + :param hook_filter (List[str]): A list of hook names to filter which plugins get initialized. + :param command_name (str): The name of the command invoking the initialization. """ for plugin in pm.get_plugins(): if is_hook_implemented(pm, plugin, "init_hook"):