From 8d72fb030ed31116fdb256b327d299337b000af4 Mon Sep 17 00:00:00 2001 From: Toshio Kuratomi Date: Thu, 26 May 2022 23:54:24 -0700 Subject: [PATCH] Do not put the subscription-manager password onto the command line. (#492) * Do not put the subscription-manager password onto the command line. Fixes CVE-2022-0852 Passing values on the command line is insecure. With this change, the rhsm password is passed interactively to subscription-manager instead of being passed on the commandline when we shell out to it. The structure of this change deserves a bit of description. Previously, we called one function to assemble all of the information needed to invoke subscription-manager and then returned that as a string that could be run as a command line. We called a second function with that string to actually run the command. To send the password interactively, we need to stop adding the password to the string of command line arguments but it still makes sense to keep the code that figures out the password together with the code which finds the other command line args. So it makes sense to keep a single function to do that but return the password and other args separately. We could use a dict, a class, or a tuple as the return value from the function. That doesn't feel too ugly. But then we need to pass that structure into the function which takes care of invoking subscription-manager on the command line and that *does* feel ugly. That function would have to care about the structure of the data we pass in (If a tuple, what is the order? If a dict, what are the field names?, etc). To take care of this, we can make the data structure that we return from assembling the data a class and the function which calls subscription-manager a method of that class because it's quite natural for a method to have knowledge of what attributes the class contains. Hmm... but now that we have a class with behaviours (methods), it starts to feel like we could do some more things. A function that fills in the values of a class, validates that the data is proper, and then returns an instance of that class is really a constructor, right? So it makes sense to move the function which assembles the data and returns the class a constructor. But that particular function isn't very generic: it uses knowledge of our global toolopts.tool_opts to populate the class. So let's write a simple __init__() that takes all of the values needed as separate parameters and then create an alternative constructor (an @classmethod which returns an instance of the class) which gets the data from a ToolOpt, and then calls __init__() with those values and returns the resulting class. Okay, things are much cleaner now, but there's one more thing that we can do. We now have a class that has a constructor to read in data and a single method that we can call to return some results. What do we call an object that can be called to return a result? A function or more generically, in python, a Callable. We can turn this class into a callable by renaming the method which actually invokes subscription-manager __call__(). What we have at the end of all this is a way to create a function which knows about the settings in tool_opts which we can then call to perform our subscription-manager needs:: registration_command = RegistrationCommand.from_tool_opts() return_code = registration_command() OAMG-6551 #done convert2rhel now passes the rhsm password to subscription-manager securely. * Modify the hiding of secret to hide both --password SECRET and --password=SECRET. Currently we only use it with passwords that we are passing in the second form (to subscription-manager) but catching both will future proof us in case we use this function for more things in the future. (Eric Gustavsson) * Note: using generator expressions was tried here but found that they only bind the variable being iterated over at definition time, the rest of the variables are bound when the generator is used. This means that constructing a set of generators in a loop doesn't really work as the loop variables that you use in the generator will have a different value by the time you're done. So a nested for loop and if-else is the way to implement this. * Add global_tool_opts fixture to conftest.py which monkeypatches a fresh ToolOpts into convert2rhel.toolopts.tool_opts. That way tests can modify that without worrying about polluting other tests. * How toolopts is imported in the code makes a difference whether this fixture is sufficient or if you need to do a little more work. If the import is:: from convert2rhel import toolopts do_something(toolopts.tool_opts.username) then your test can just do:: def test_thing_that_uses_toolopts(global_tool_opts): global_tool_opts.username = 'badger' Most of our code, though, imports like this:: # In subscription.py, for instance from convert2rhel.toolopts import tool_opts do_something(tool_opts.username) so the tests should do something like this:: def test_toolopts_differently(global_test_opts, monkeypatch): monkeypatch.setattr(subscription, 'tool_opts', global_tool_opts) * Sometimes a process will close stdout before it is done processing. When that happens, we need to wait() for the process to end before closing the pty. If we don't wait, the process will receive a HUP signal which may end it before it is done. * But on RHEL7, pexpect.wait() will throw an exception if you wait on an already finished process. So we need to ignore that exception there. lgtm is flagging some cases where it thinks we are logging sensitive data. Here's why we're ignoring lgtm: * One case logs the username to the terminal as a hint for the user as to what account is being used. We consider username to not be sensitive. * One case logs the subscription-manager invocation. We run the command line through hide_secrets() to remove private information when we do this. * The last case logs which argument was passed to subscription-manager without a secret attached to it. ie: "--password" is passed to subscription-manager without a password being added afterwards. In this case, the string "--password" will be logged. Testing farm doesn't have python-devel installed. We need that to install psutil needed as a testing dependency. Co-authored-by: Daniel Diblik --- .gitleaks.toml | 4 + convert2rhel/subscription.py | 246 ++++-- convert2rhel/unit_tests/README.md | 2 +- convert2rhel/unit_tests/conftest.py | 9 +- convert2rhel/unit_tests/subscription_test.py | 820 +++++++++++++----- convert2rhel/unit_tests/systeminfo_test.py | 2 + convert2rhel/unit_tests/utils_test.py | 62 ++ convert2rhel/utils.py | 68 +- plans/tier0.fmf | 4 + .../roles/install-testing-deps/tasks/main.yml | 16 +- .../tier0/check-cve-2022-1662/main.fmf | 6 + .../check-cve-2022-1662/test_cve_fixes.py | 29 + .../check-user-response/test_user_response.py | 2 +- 13 files changed, 983 insertions(+), 287 deletions(-) create mode 100644 .gitleaks.toml create mode 100644 tests/integration/tier0/check-cve-2022-1662/main.fmf create mode 100644 tests/integration/tier0/check-cve-2022-1662/test_cve_fixes.py diff --git a/.gitleaks.toml b/.gitleaks.toml new file mode 100644 index 0000000000..1ba2df43a7 --- /dev/null +++ b/.gitleaks.toml @@ -0,0 +1,4 @@ +[allowlist] +paths = [ + '''tests/integration/tier0/check-cve/test_cve_fixes.py''', +] diff --git a/convert2rhel/subscription.py b/convert2rhel/subscription.py index aa6a613f41..c990e4c1c3 100644 --- a/convert2rhel/subscription.py +++ b/convert2rhel/subscription.py @@ -109,21 +109,21 @@ def register_system(): # Loop the registration process until successful registration attempt = 0 - while True and attempt < MAX_NUM_OF_ATTEMPTS_TO_SUBSCRIBE: - registration_cmd = get_registration_cmd() - + while attempt < MAX_NUM_OF_ATTEMPTS_TO_SUBSCRIBE: + registration_cmd = RegistrationCommand.from_tool_opts(tool_opts) attempt_msg = "" if attempt > 0: attempt_msg = "Attempt %d of %d: " % (attempt + 1, MAX_NUM_OF_ATTEMPTS_TO_SUBSCRIBE) loggerinst.info("%sRegistering the system using subscription-manager ...", attempt_msg) - output, ret_code = call_registration_cmd(registration_cmd) + output, ret_code = registration_cmd() if ret_code == 0: # Handling a signal interrupt that was previously handled by # subscription-manager. if "user interrupted process" in output.lower(): raise KeyboardInterrupt return + loggerinst.info("System registration failed with return code = %s" % str(ret_code)) if tool_opts.credentials_thru_cli: loggerinst.warning( @@ -138,65 +138,209 @@ def register_system(): loggerinst.critical("Unable to register the system through subscription-manager.") -def get_registration_cmd(): - """Build a command for subscription-manager for registering the system.""" - loggerinst.info("Building subscription-manager command ... ") - registration_cmd = ["subscription-manager", "register", "--force"] - - loggerinst.info("Checking for activation key ...") - if tool_opts.activation_key: - # Activation key has been passed - # -> username/password not required - # -> organization required - loggerinst.info(" ... activation key detected: %s" % tool_opts.activation_key) - - # TODO: Parse the output of 'subscription-manager orgs' and let the - # user choose from the available organizations. If there's just one, - # pick it automatically. - # Organization is required when activation key is used +class RegistrationCommand(object): + def __init__(self, activation_key=None, org=None, username=None, password=None, server_url=None): + """ + A callable that can register a system with subscription-manager. + + :kwarg server_url: Optional URL to the subscription-manager backend. + Useful when the customer has an on-prem subscription-manager instance. + :kwarg activation_key: subscription-manager activation_key that can be + used to register the system. Org must be specified if this was given. + :kwarg org: The organization that the activation_key is associated with. + It is required if activation_key is specified. + :kwarg username: Username to authenticate with subscription-manager. + Required if password is specified. + :kwarg password: Password to authenticate with subscription-manager. + It is required if username is specified. + + .. note:: Either activation_key and org or username and password must + be specified. + """ + self.cmd = "subscription-manager" + self.server_url = server_url + + if activation_key and not org: + raise ValueError("org must be specified if activation_key is used") + + self.activation_key = activation_key + self.org = org + + self.password = password + self.username = username + + if (password and not username) or (username and not password): + raise ValueError("username and password must be used together") + + elif not password: + # No password set + if not self.activation_key: + raise ValueError("activation_key and org or username and password must be specified") + + @classmethod + def from_tool_opts(cls, tool_opts): + """ + Alternate constructor that gets subscription-manager args from ToolOpts. + + convert2rhel's command-line contains the information needed to register + with subscription-manager. Get the information from the passed in + ToolOpts structure to create the RegistrationCommand. + + :arg tool_opts: The :class:`convert2rhel.toolopts.ToolOpts` structure to + retrieve the subscription-manager information from. + """ + loggerinst.info("Gathering subscription-manager registration info ... ") + + registration_attributes = {} if tool_opts.org: - loggerinst.info(" ... org detected") - - org = tool_opts.org - while not org: - org = utils.prompt_user("Organization: ") - - registration_cmd.extend(("--activationkey=%s" % tool_opts.activation_key, "--org=%s" % org)) - else: - loggerinst.info(" ... activation key not found, username and password required") + loggerinst.info(" ... organization detected") + registration_attributes["org"] = tool_opts.org + + if tool_opts.activation_key: + # Activation key has been passed + # -> username/password not required + # -> organization required + loggerinst.info(" ... activation key detected") + registration_attributes["activation_key"] = tool_opts.activation_key + + while "org" not in registration_attributes: + loggerinst.info(" ... activation key requires organization") + # Organization is required when activation key is used + # TODO: Parse the output of 'subscription-manager orgs' and let the + # user choose from the available organizations. If there's just one, + # pick it automatically. + org = utils.prompt_user("Organization: ").strip() + # In case the user entered the empty string + if org: + registration_attributes["org"] = org + else: + # No activation key -> username/password required + if tool_opts.username and tool_opts.password: + loggerinst.info(" ... activation key not found, using given username and password") + else: + loggerinst.info(" ... activation key not found, username and password required") + + if tool_opts.username: + loggerinst.info(" ... username detected") + username = tool_opts.username + else: + username = "" + while not username: + username = utils.prompt_user("Username: ") + + registration_attributes["username"] = username + + if tool_opts.password: + loggerinst.info(" ... password detected") + password = tool_opts.password + else: + if tool_opts.username: + # Hint user for which username they need to enter pswd + loggerinst.info("Username: %s", username) # lgtm[py/clear-text-logging-sensitive-data] + password = "" + while not password: + password = utils.prompt_user("Password: ", password=True) + + registration_attributes["password"] = password + + if tool_opts.serverurl: + loggerinst.debug(" ... using custom RHSM URL") + registration_attributes["server_url"] = tool_opts.serverurl + + return cls(**registration_attributes) + + @property + def args(self): + """ + This property is a list of the command-line arguments that will be passed to + subscription-manager to register the system. Set the individual attributes for + :attr:`server_url`, :attr:`activation_key`, etc to affect the values here. + + .. note:: :attr:`password` is not passed on the command line. Instead, + it is sent to the running subscription-manager process via pexpect. + """ + args = ["register", "--force"] + + if self.server_url: + args.append("--serverurl=%s" % self.server_url) + + if self.activation_key: + args.append("--activationkey=%s" % self.activation_key) + + if self.org: + args.append("--org=%s" % self.org) + + if self.username: + args.append("--username=%s" % self.username) + + return args + + def __call__(self): + """ + Run the subscription-manager command. + + Wrapper for running the subscription-manager command that keeps + secrets secure. + """ + if self.password: + loggerinst.debug( + "Calling command '%s %s'" % (self.cmd, " ".join(hide_secrets(self.args))) + ) # lgtm[py/clear-text-logging-sensitive-data] + output, ret_code = utils.run_cmd_in_pty( + [self.cmd] + self.args, expect_script=(("[Pp]assword: ", self.password + "\n"),), print_cmd=False + ) + else: + # Warning: Currently activation_key can only be specified on the CLI. This is insecure + # but there's nothing we can do about it for now. Once subscription-manager issue: + # https://issues.redhat.com/browse/ENT-4724 is implemented, we can change both password + # and activation_key to use a file-based approach to passing the secrets. + output, ret_code = utils.run_subprocess([self.cmd] + self.args, print_cmd=False) - if tool_opts.username: - loggerinst.info(" ... username detected") + return output, ret_code - username = tool_opts.username - while not username: - username = utils.prompt_user("Username: ") - if tool_opts.password: - loggerinst.info(" ... password detected") +def hide_secrets(args): + """ + Replace secret values with asterisks. - password = tool_opts.password - while not password: - password = utils.prompt_user("Password: ", password=True) + This function takes a list of arguments which will be passed to + subscription-manager on the command line and returns a new list + that has any secret values obscured with asterisks. - registration_cmd.extend(("--username=%s" % username, "--password=%s" % password)) + :arg args: An argument list for subscription-manager which may contain + secret values. + :returns: A new list of arguments with secret values hidden. + """ + obfuscation_string = "*" * 5 + secret_args = frozenset(("--password", "--activationkey", "--token")) - if tool_opts.serverurl: - loggerinst.debug(" ... using custom RHSM URL") - registration_cmd.append("--serverurl=%s" % tool_opts.serverurl) + sanitized_list = [] + hide_next = False + for arg in args: + if hide_next: + # Second part of a two part secret argument (like --password *SECRET*) + arg = obfuscation_string + hide_next = False - return registration_cmd + elif arg in secret_args: + # First part of a two part secret argument (like *--password* SECRET) + hide_next = True + else: + # A secret argument in one part (like --password=SECRET) + for problem_arg in secret_args: + if arg.startswith(problem_arg + "="): + arg = "{0}={1}".format(problem_arg, obfuscation_string) -def call_registration_cmd(registration_cmd): - """Wrapper for run_subprocess that avoids leaking password in the log.""" - loggerinst.debug("Calling command '%s'" % hide_password(" ".join(registration_cmd))) - return utils.run_subprocess(registration_cmd, print_cmd=False) + sanitized_list.append(arg) + if hide_next: + loggerinst.debug( + "Passed arguments had unexpected secret argument," + " '{0}', without a secret".format(sanitized_list[-1]) # lgtm[py/clear-text-logging-sensitive-data] + ) -def hide_password(cmd): - """Replace plaintext password with asterisks.""" - return re.sub('--password=".*?"', '--password="*****"', cmd) + return sanitized_list def replace_subscription_manager(): diff --git a/convert2rhel/unit_tests/README.md b/convert2rhel/unit_tests/README.md index 0b74f0019b..27751e90d0 100644 --- a/convert2rhel/unit_tests/README.md +++ b/convert2rhel/unit_tests/README.md @@ -49,7 +49,7 @@ def test_check_for_yum_updates(monkeypatch): monkeypatch.setattr(package_handler, "get_packages_to_update", value=packages_to_update_mock) assert package_handler.get_packages_to_update() is not None - assert packages_to_update_mock.assert_called_once_with(["package-1", "package-2"]) + packages_to_update_mock.assert_called_once_with(["package-1", "package-2"]) ``` And this other example, of a test that don't need any mocks for external diff --git a/convert2rhel/unit_tests/conftest.py b/convert2rhel/unit_tests/conftest.py index 13473c4cfa..5fec6af042 100644 --- a/convert2rhel/unit_tests/conftest.py +++ b/convert2rhel/unit_tests/conftest.py @@ -2,7 +2,7 @@ import pytest -from convert2rhel import redhatrelease, utils +from convert2rhel import redhatrelease, toolopts, utils from convert2rhel.logger import setup_logger_handler from convert2rhel.systeminfo import system_info from convert2rhel.toolopts import tool_opts @@ -60,6 +60,13 @@ def setup_logger(tmpdir): setup_logger_handler(log_name="convert2rhel", log_dir=str(tmpdir)) +@pytest.fixture +def global_tool_opts(monkeypatch): + local_tool_opts = toolopts.ToolOpts() + monkeypatch.setattr(toolopts, "tool_opts", local_tool_opts) + return local_tool_opts + + @pytest.fixture() def pretend_os(request, pkg_root, monkeypatch): """Parametric fixture to pretend to be one of available OS for convertion. diff --git a/convert2rhel/unit_tests/subscription_test.py b/convert2rhel/unit_tests/subscription_test.py index 6bf674fcfb..6073983a7e 100644 --- a/convert2rhel/unit_tests/subscription_test.py +++ b/convert2rhel/unit_tests/subscription_test.py @@ -22,10 +22,11 @@ from collections import namedtuple +import pexpect import pytest from convert2rhel import unit_tests # Imports unit_tests/__init__.py -from convert2rhel import pkghandler, subscription, utils +from convert2rhel import pkghandler, subscription, toolopts, utils from convert2rhel.systeminfo import system_info from convert2rhel.toolopts import tool_opts from convert2rhel.unit_tests import GetLoggerMocked, run_subprocess_side_effect @@ -45,77 +46,109 @@ def __call__(self, *args, **kwargs): self.called += 1 -class TestSubscription(unittest.TestCase): - class GetOneSubMocked(unit_tests.MockFunction): - def __call__(self, *args, **kwargs): - Sub = namedtuple("Sub", ["pool_id", "sub_raw"]) +class RunSubprocessMocked(unit_tests.MockFunction): + def __init__(self, tuples=None): + # you can specify sequence of return (object, return code) as + # a list of tuple that will be consumed continuosly on the each + # call; when the list is consumed or it is empty, the default + # tuple is returned + self.tuples = tuples + self.default_tuple = ("output", 0) + self.called = 0 + self.cmd = [] - subscription1 = Sub("samplepool", "Subscription description") - return [subscription1] + def __call__(self, cmd, *args, **kwargs): + self.cmd = cmd + self.called += 1 - class GetAvailSubsMocked(unit_tests.MockFunction): - def __call__(self, *args, **kwargs): - Sub = namedtuple("Sub", ["pool_id", "sub_raw"]) + if self.tuples: + return self.tuples.pop(0) + return self.default_tuple - subscription1 = Sub("samplepool", "Subscription description") - subscription2 = Sub("pool0", "sub desc") - return [subscription1, subscription2] - class GetNoAvailSubsMocked(unit_tests.MockFunction): - def __call__(self, *args, **kwargs): +class PromptUserLoopMocked(unit_tests.MockFunction): + def __init__(self): + self.called = {} + + def __call__(self, *args, **kwargs): + return_value = "" + + # args[0] is the current question being asked + if args[0] not in self.called: + self.called[args[0]] = 0 + + if self.called[args[0]] >= 1: + return_value = "test" + + self.called[args[0]] += 1 + return return_value + + +class LetUserChooseItemMocked(unit_tests.MockFunction): + def __init__(self): + self.called = 0 + + def __call__(self, *args, **kwargs): + self.called += 1 + return 0 + + +class GetOneSubMocked(unit_tests.MockFunction): + def __call__(self, *args, **kwargs): + Sub = namedtuple("Sub", ["pool_id", "sub_raw"]) + + subscription1 = Sub("samplepool", "Subscription description") + return [subscription1] + + +class GetAvailSubsMocked(unit_tests.MockFunction): + def __call__(self, *args, **kwargs): + Sub = namedtuple("Sub", ["pool_id", "sub_raw"]) + + subscription1 = Sub("samplepool", "Subscription description") + subscription2 = Sub("pool0", "sub desc") + return [subscription1, subscription2] + + +class GetNoAvailSubsMocked(unit_tests.MockFunction): + def __call__(self, *args, **kwargs): + return [] + + +class GetNoAvailSubsOnceMocked(unit_tests.MockFunction): + def __init__(self): + self.empty_last_call = False + + def __call__(self, *args, **kwargs): + if not self.empty_last_call: + self.empty_last_call = True return [] - class GetNoAvailSubsOnceMocked(unit_tests.MockFunction): - def __init__(self): - self.empty_last_call = False + self.empty_last_call = False + return [namedtuple("Sub", ["pool_id", "sub_raw"])("samplepool", "Subscription description")] - def __call__(self, *args, **kwargs): - if not self.empty_last_call: - self.empty_last_call = True - return [] - self.empty_last_call = False - return [namedtuple("Sub", ["pool_id", "sub_raw"])("samplepool", "Subscription description")] +class RegistrationCmdCallMocked(unit_tests.MockFunction): + def __init__(self): + self.called = 0 - class LetUserChooseItemMocked(unit_tests.MockFunction): - def __init__(self): - self.called = 0 + def __call__(self): + self.called += 1 + return ("User interrupted process.", 0) - def __call__(self, *args, **kwargs): - self.called += 1 - return 0 - - class GetRegistrationCmdMocked(unit_tests.MockFunction): - def __call__(self): - return "subscription-manager register whatever-options" - - class CallRegistrationCmdMocked(unit_tests.MockFunction): - def __init__(self, cmd): - self.cmd = cmd - - def __call__(self, cmd): - self.cmd = cmd - return ("User interrupted process.", 0) - - class RunSubprocessMocked(unit_tests.MockFunction): - def __init__(self, tuples=None): - # you can specify sequence of return (object, return code) as - # a list of tuple that will be consumed continuosly on the each - # call; when the list is consumed or it is empty, the default - # tuple is returned - self.tuples = tuples - self.default_tuple = ("output", 0) - self.called = 0 - self.cmd = [] - def __call__(self, cmd, *args, **kwargs): - self.cmd = cmd - self.called += 1 +class RegistrationCmdFromTooloptsMocked(unit_tests.MockFunction): + def __init__(self): + self.tool_opts = None + self.called = 0 + + def __call__(self, tool_opts): + self.called += 1 + self.tool_opts = tool_opts + return RegistrationCmdCallMocked() - if self.tuples: - return self.tuples.pop(0) - return self.default_tuple +class TestSubscription(unittest.TestCase): class IsFileMocked(unit_tests.MockFunction): def __init__(self, is_file): self.is_file = is_file @@ -127,23 +160,6 @@ class PromptUserMocked(unit_tests.MockFunction): def __call__(self, *args, **kwargs): return True - class PromptUserLoopMocked(unit_tests.MockFunction): - def __init__(self): - self.called = {} - - def __call__(self, *args, **kwargs): - return_value = "" - - # args[0] is the current question being asked - if args[0] not in self.called: - self.called[args[0]] = 0 - - if self.called[args[0]] >= 1: - return_value = "test" - - self.called[args[0]] += 1 - return return_value - class RemoveFileMocked(unit_tests.MockFunction): def __init__(self, removed=True): self.removed = removed @@ -175,138 +191,6 @@ def __call__(self, command, args): def setUp(self): tool_opts.__init__() - def test_get_registration_cmd(self): - tool_opts.username = "user" - tool_opts.password = "pass with space" - expected = ["subscription-manager", "register", "--force", "--username=user", "--password=pass with space"] - self.assertEqual(subscription.get_registration_cmd(), expected) - - @unit_tests.mock(subscription, "get_avail_subs", GetOneSubMocked()) - @unit_tests.mock(utils, "let_user_choose_item", LetUserChooseItemMocked()) - @unit_tests.mock(utils, "run_subprocess", RunSubprocessMocked()) - def test_attach_subscription_one_sub_available(self): - self.assertEqual(subscription.attach_subscription(), True) - self.assertEqual(utils.let_user_choose_item.called, 0) - - def test_get_registration_cmd_activation_key(self): - tool_opts.activation_key = "activation_key" - tool_opts.org = "org" - expected = ["subscription-manager", "register", "--force", "--activationkey=activation_key", "--org=org"] - self.assertEqual(subscription.get_registration_cmd(), expected) - - @unit_tests.mock(utils, "prompt_user", PromptUserLoopMocked()) - def test_get_registration_cmd_activation_key_empty_string(self): - tool_opts.activation_key = "activation_key" - expected = ["subscription-manager", "register", "--force", "--activationkey=activation_key", "--org=test"] - self.assertEqual(subscription.get_registration_cmd(), expected) - self.assertEqual(utils.prompt_user.called, {"Organization: ": 2}) - - @unit_tests.mock(utils, "prompt_user", PromptUserLoopMocked()) - def test_get_registration_cmd_empty_string(self): - expected = ["subscription-manager", "register", "--force", "--username=test", "--password=test"] - self.assertEqual(subscription.get_registration_cmd(), expected) - self.assertEqual(utils.prompt_user.called, {"Username: ": 2, "Password: ": 2}) - - @unit_tests.mock(subscription, "get_avail_subs", GetOneSubMocked()) - @unit_tests.mock(utils, "let_user_choose_item", LetUserChooseItemMocked()) - @unit_tests.mock(utils, "run_subprocess", RunSubprocessMocked()) - def test_attach_subscription_one_sub_available(self): - self.assertEqual(subscription.attach_subscription(), True) - self.assertEqual(utils.let_user_choose_item.called, 0) - - @unit_tests.mock(subscription, "get_avail_subs", GetAvailSubsMocked()) - @unit_tests.mock(utils, "let_user_choose_item", LetUserChooseItemMocked()) - @unit_tests.mock(utils, "run_subprocess", RunSubprocessMocked()) - def test_attach_subscription_multiple_subs_available(self): - self.assertEqual(subscription.attach_subscription(), True) - self.assertEqual(utils.let_user_choose_item.called, 1) - - @unit_tests.mock(subscription, "get_avail_subs", GetAvailSubsMocked()) - @unit_tests.mock(utils, "let_user_choose_item", LetUserChooseItemMocked()) - @unit_tests.mock(utils, "run_subprocess", RunSubprocessMocked()) - @unit_tests.mock(tool_opts, "activation_key", "dummy_activate_key") - @unit_tests.mock(subscription, "loggerinst", GetLoggerMocked()) - def test_attach_subscription_available_with_activation_key(self): - self.assertEqual(subscription.attach_subscription(), True) - self.assertEqual(len(subscription.loggerinst.info_msgs), 1) - - @unit_tests.mock(subscription, "get_avail_subs", GetNoAvailSubsMocked()) - def test_attach_subscription_none_available(self): - self.assertEqual(subscription.attach_subscription(), False) - - @unit_tests.mock(subscription, "register_system", DumbCallable()) - @unit_tests.mock(subscription, "get_avail_subs", GetAvailSubsMocked()) - @unit_tests.mock(utils, "let_user_choose_item", LetUserChooseItemMocked()) - @unit_tests.mock(utils, "run_subprocess", RunSubprocessMocked()) - def test_subscribe_system(self): - tool_opts.username = "user" - tool_opts.password = "pass" - subscription.subscribe_system() - self.assertEqual(subscription.register_system.called, 1) - - @unit_tests.mock(subscription, "register_system", DumbCallable()) - @unit_tests.mock(subscription, "get_avail_subs", GetNoAvailSubsOnceMocked()) - @unit_tests.mock(utils, "let_user_choose_item", LetUserChooseItemMocked()) - @unit_tests.mock(utils, "run_subprocess", RunSubprocessMocked()) - def test_subscribe_system_fail_once(self): - tool_opts.username = "user" - tool_opts.password = "pass" - subscription.subscribe_system() - self.assertEqual(subscription.register_system.called, 2) - - @unit_tests.mock(subscription, "loggerinst", GetLoggerMocked()) - @unit_tests.mock(subscription, "MAX_NUM_OF_ATTEMPTS_TO_SUBSCRIBE", 1) - @unit_tests.mock(utils, "run_subprocess", RunSubprocessMocked([("nope", 1)])) - @unit_tests.mock(subscription, "sleep", mock.Mock()) - def test_register_system_fail_non_interactive(self): - # Check the critical severity is logged when the credentials are given - # on the cmdline but registration fails - tool_opts.username = "user" - tool_opts.password = "pass" - tool_opts.credentials_thru_cli = True - self.assertRaises(SystemExit, subscription.register_system) - self.assertEqual(len(subscription.loggerinst.critical_msgs), 1) - - @unit_tests.mock(utils, "run_subprocess", RunSubprocessMocked(tuples=[("nope", 1), ("nope", 2), ("Success", 0)])) - @unit_tests.mock(subscription.logging, "getLogger", GetLoggerMocked()) - @unit_tests.mock(subscription, "get_registration_cmd", GetRegistrationCmdMocked()) - @unit_tests.mock(subscription, "sleep", mock.Mock()) - def test_register_system_fail_interactive(self): - # Check the function tries to register multiple times without - # critical log. - tool_opts.credentials_thru_cli = False - subscription.register_system() - self.assertEqual(utils.run_subprocess.called, 3) - self.assertEqual(len(subscription.logging.getLogger.critical_msgs), 0) - - @unit_tests.mock(subscription, "get_registration_cmd", GetRegistrationCmdMocked()) - @unit_tests.mock(subscription, "call_registration_cmd", CallRegistrationCmdMocked("cmd")) - def test_register_system_fail_with_keyboardinterrupt(self): - self.assertRaises(KeyboardInterrupt, subscription.register_system) - - def test_hiding_password(self): - test_cmd = "subscription-manager register --force " '--username=jdoe --password="%s" --org=0123' - pswds_to_test = ["my favourite password", "\\)(*&^%f %##@^%&*&^(", " ", ""] - for pswd in pswds_to_test: - sanitized_cmd = subscription.hide_password(test_cmd % pswd) - self.assertEqual( - sanitized_cmd, "subscription-manager register --force " '--username=jdoe --password="*****" --org=0123' - ) - - def test_rhsm_serverurl(self): - tool_opts.username = "user" - tool_opts.password = "pass" - tool_opts.serverurl = "url" - expected = [ - "subscription-manager", - "register", - "--force", - "--username=user", - "--password=pass", - "--serverurl=url", - ] - self.assertEqual(subscription.get_registration_cmd(), expected) - @unit_tests.mock(subscription.logging, "getLogger", GetLoggerMocked()) def test_get_pool_id(self): # Check that we can distill the pool id from the subscription description @@ -487,6 +371,505 @@ def __call__(self, pkg, dest, reposdir=None): return self.to_return +@pytest.fixture +def tool_opts(global_tool_opts, monkeypatch): + monkeypatch.setattr(subscription, "tool_opts", global_tool_opts) + return global_tool_opts + + +class TestSubscribeSystem(object): + def test_subscribe_system(self, tool_opts, monkeypatch): + monkeypatch.setattr(subscription, "register_system", DumbCallable()) + monkeypatch.setattr(subscription, "get_avail_subs", GetAvailSubsMocked()) + monkeypatch.setattr(utils, "let_user_choose_item", LetUserChooseItemMocked()) + monkeypatch.setattr(utils, "run_subprocess", RunSubprocessMocked()) + + tool_opts.username = "user" + tool_opts.password = "pass" + + subscription.subscribe_system() + + assert subscription.register_system.called == 1 + + def test_subscribe_system_fail_once(self, tool_opts, monkeypatch): + monkeypatch.setattr(subscription, "register_system", DumbCallable()) + monkeypatch.setattr(subscription, "get_avail_subs", GetNoAvailSubsOnceMocked()) + monkeypatch.setattr(utils, "let_user_choose_item", LetUserChooseItemMocked()) + monkeypatch.setattr(utils, "run_subprocess", RunSubprocessMocked()) + + tool_opts.username = "user" + tool_opts.password = "pass" + + subscription.subscribe_system() + + assert subscription.register_system.called == 2 + + +@pytest.mark.usefixtures("tool_opts", scope="function") +class TestAttachSubscription(object): + def test_attach_subscription_one_sub_available(self, monkeypatch): + monkeypatch.setattr(subscription, "get_avail_subs", GetOneSubMocked()) + monkeypatch.setattr(utils, "let_user_choose_item", LetUserChooseItemMocked()) + monkeypatch.setattr(utils, "run_subprocess", RunSubprocessMocked()) + + assert subscription.attach_subscription() is True + assert utils.let_user_choose_item.called == 0 + + def test_attach_subscription_multiple_subs_available(self, monkeypatch): + monkeypatch.setattr(subscription, "get_avail_subs", GetAvailSubsMocked()) + monkeypatch.setattr(utils, "let_user_choose_item", LetUserChooseItemMocked()) + monkeypatch.setattr(utils, "run_subprocess", RunSubprocessMocked()) + + assert subscription.attach_subscription() is True + assert utils.let_user_choose_item.called == 1 + + def test_attach_subscription_available_with_activation_key(self, monkeypatch, caplog): + monkeypatch.setattr(subscription, "get_avail_subs", GetAvailSubsMocked()) + monkeypatch.setattr(utils, "let_user_choose_item", LetUserChooseItemMocked()) + monkeypatch.setattr(utils, "run_subprocess", RunSubprocessMocked()) + monkeypatch.setattr(toolopts.tool_opts, "activation_key", "dummy_activation_key") + + assert subscription.attach_subscription() is True + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == "INFO" + + def test_attach_subscription_none_available(self, monkeypatch): + monkeypatch.setattr(subscription, "get_avail_subs", GetNoAvailSubsMocked()) + + assert subscription.attach_subscription() is False + + +class TestRegisterSystem(object): + def test_register_system_fail_non_interactive(self, tool_opts, monkeypatch, caplog): + """Check the critical severity is logged when the credentials are given on the cmdline but registration fails.""" + monkeypatch.setattr(subscription, "MAX_NUM_OF_ATTEMPTS_TO_SUBSCRIBE", 1) + monkeypatch.setattr(subscription, "sleep", mock.Mock()) + + fake_spawn = mock.Mock() + fake_spawn.before.decode = mock.Mock(return_value="nope") + fake_spawn.exitstatus = 1 + monkeypatch.setattr(utils, "PexpectSizedWindowSpawn", fake_spawn) + + tool_opts.username = "user" + tool_opts.password = "pass" + tool_opts.credentials_thru_cli = True + + with pytest.raises(SystemExit): + subscription.register_system() + + assert caplog.records[-1].levelname == "CRITICAL" + + def test_register_system_fail_interactive(self, tool_opts, monkeypatch, caplog): + """Check the function tries to register multiple times without critical log.""" + tool_opts.credentials_thru_cli = False + monkeypatch.setattr(subscription, "sleep", mock.Mock()) + + fake_from_tool_opts = mock.Mock( + return_value=subscription.RegistrationCommand(username="invalid", password="invalid") + ) + monkeypatch.setattr(subscription.RegistrationCommand, "from_tool_opts", fake_from_tool_opts) + + # We may want to move this to the toplevel if we have other needs to + # test pexpect driven code. If we do so, though, we would want to + # make it a bit more generic: + # * Be able to set iterations before success + # * Allow setting both exitstatus and signalstatus + # * Allow setting stdout output + # * Probably make the output and status settable per invocation rather + # than using a count + class FakeProcess(mock.Mock): + called_count = 0 + + @property + def exitstatus(self): + self.called_count += 1 + return self.called_count % 3 + + fake_process = FakeProcess() + fake_process.before.decode = mock.Mock(side_effect=("nope", "nope", "Success")) + fake_spawn = mock.Mock(return_value=fake_process) + monkeypatch.setattr(utils, "PexpectSizedWindowSpawn", fake_spawn) + + subscription.register_system() + + assert len(fake_spawn.call_args_list) == 3 + assert "CRITICAL" not in [rec.levelname for rec in caplog.records] + + def test_register_system_fail_with_keyboardinterrupt(self, monkeypatch): + monkeypatch.setattr(subscription.RegistrationCommand, "from_tool_opts", RegistrationCmdFromTooloptsMocked()) + + with pytest.raises(KeyboardInterrupt) as err: + subscription.register_system() + + +class TestRegistrationCommand(object): + @pytest.mark.parametrize( + "registration_kwargs", + ( + { + "server_url": "http://localhost/", + "activation_key": "0xDEADBEEF", + "org": "Local Organization", + }, + { + "server_url": "http://localhost/", + "org": "Local Organization", + "username": "me_myself_and_i", + "password": "a password", + }, + { + "username": "me_myself_and_i", + "password": "a password", + }, + ), + ) + def test_instantiate_via_init(self, registration_kwargs): + """Test all valid combinations of args to RegistratoinCommand.__init__().""" + reg_cmd = subscription.RegistrationCommand(**registration_kwargs) + assert reg_cmd.cmd == "subscription-manager" + + if "server_url" in registration_kwargs: + assert reg_cmd.server_url == registration_kwargs["server_url"] + + if "activation_key" in registration_kwargs: + assert reg_cmd.activation_key == registration_kwargs["activation_key"] + + if "org" in registration_kwargs: + assert reg_cmd.org == registration_kwargs["org"] + + if "password" in registration_kwargs: + assert reg_cmd.password == registration_kwargs["password"] + assert reg_cmd.username == registration_kwargs["username"] + + assert reg_cmd.activation_key or reg_cmd.username + + @pytest.mark.parametrize( + "registration_kwargs, error_message", + ( + # No credentials specified + ( + { + "server_url": "http://localhost/", + "org": "Local Organization", + }, + "activation_key and org or username and password must be specified", + ), + # Activation key without an org + ( + { + "server_url": "http://localhost/", + "activation_key": "0xDEADBEEF", + }, + "org must be specified if activation_key is used", + ), + # Username without a password + ( + { + "server_url": "http://localhost/", + "username": "me_myself_and_i", + }, + "username and password must be used together", + ), + # Password without a username + ( + { + "server_url": "http://localhost/", + "password": "a password", + }, + "username and password must be used together", + ), + ), + ) + def test_instantiate_failures(self, registration_kwargs, error_message): + """Test various failures instantiating RegistrationCommand.""" + with pytest.raises(ValueError, match=error_message): + cmd = subscription.RegistrationCommand(**registration_kwargs) + + @pytest.mark.parametrize( + "registration_kwargs", + ( + { + "server_url": "http://localhost/", + "activation_key": "0xDEADBEEF", + "org": "Local Organization", + }, + { + "server_url": "http://localhost/", + "org": "Local Organization", + "username": "me_myself_and_i", + "password": "a password", + }, + { + "username": "me_myself_and_i", + "password": "a password", + }, + ), + ) + def test_from_tool_opts_all_data_on_cli(self, registration_kwargs, tool_opts): + """Test that the RegistrationCommand is created from toolopts successfully.""" + if "server_url" in registration_kwargs: + tool_opts.serverurl = registration_kwargs["server_url"] + + if "org" in registration_kwargs: + tool_opts.org = registration_kwargs["org"] + + if "activation_key" in registration_kwargs: + tool_opts.activation_key = registration_kwargs["activation_key"] + + if "username" in registration_kwargs: + tool_opts.username = registration_kwargs["username"] + + if "password" in registration_kwargs: + tool_opts.password = registration_kwargs["password"] + + reg_cmd = subscription.RegistrationCommand.from_tool_opts(tool_opts) + + assert reg_cmd.cmd == "subscription-manager" + + if "server_url" in registration_kwargs: + assert reg_cmd.server_url == registration_kwargs["server_url"] + + if "org" in registration_kwargs: + assert reg_cmd.org == registration_kwargs["org"] + + if "activation_key" in registration_kwargs: + assert reg_cmd.activation_key == registration_kwargs["activation_key"] + + if "username" in registration_kwargs: + assert reg_cmd.username == registration_kwargs["username"] + + if "password" in registration_kwargs: + assert reg_cmd.password == registration_kwargs["password"] + + @pytest.mark.parametrize( + "registration_kwargs, prompt_input", + ( + # activation_key and not org + ( + {"activation_key": "0xDEADBEEF"}, + {"Organization: ": "Local Organization"}, + ), + # no activation_key no password + ( + {"username": "me_myself_and_i"}, + {"Password: ": "a password"}, + ), + # no activation_key no username + ( + {"password": "a password"}, + {"Username: ": "me_myself_and_i"}, + ), + # no credentials at all + ( + {}, + {"Username: ": "me_myself_and_i", "Password: ": "a password"}, + ), + ), + ) + def test_from_tool_opts_interactive_data(self, registration_kwargs, prompt_input, tool_opts, monkeypatch): + """Test that things work when we interactively ask for more data.""" + + def prompt_user(prompt, password=False): + if prompt in prompt_input: + return prompt_input[prompt] + raise Exception("Should not have been called with that prompt for the input") + + fake_prompt_user = mock.Mock(side_effect=prompt_user) + + monkeypatch.setattr(utils, "prompt_user", fake_prompt_user) + + for option_name, option_value in registration_kwargs.items(): + setattr(tool_opts, option_name, option_value) + + registration_cmd = subscription.RegistrationCommand.from_tool_opts(tool_opts) + + if "Organization: " in prompt_input: + assert registration_cmd.org == prompt_input["Organization: "] + + if "Password: " in prompt_input: + assert registration_cmd.password == prompt_input["Password: "] + + if "Username: " in prompt_input: + assert registration_cmd.username == prompt_input["Username: "] + + # assert that we prompted the user the number of times that we expected + assert fake_prompt_user.call_count == len(prompt_input) + + def test_from_tool_opts_activation_key_empty_string(self, tool_opts, monkeypatch): + monkeypatch.setattr(utils, "prompt_user", PromptUserLoopMocked()) + tool_opts.activation_key = "activation_key" + + registration_cmd = subscription.RegistrationCommand.from_tool_opts(tool_opts) + + assert registration_cmd.activation_key == "activation_key" + assert registration_cmd.org == "test" + assert utils.prompt_user.called == {"Organization: ": 2} + + def test_from_tool_opts_username_empty_string(self, tool_opts, monkeypatch): + monkeypatch.setattr(utils, "prompt_user", PromptUserLoopMocked()) + + registration_cmd = subscription.RegistrationCommand.from_tool_opts(tool_opts) + + assert registration_cmd.username == "test" + assert registration_cmd.password == "test" + assert utils.prompt_user.called == {"Username: ": 2, "Password: ": 2} + + @pytest.mark.parametrize( + "registration_kwargs", + ( + { + "server_url": "http://localhost/", + "activation_key": "0xDEADBEEF", + "org": "Local Organization", + }, + { + "server_url": "http://localhost/", + "org": "Local Organization", + "username": "me_myself_and_i", + "password": "a password", + }, + { + "username": "me_myself_and_i", + "password": "a password", + }, + ), + ) + def test_args(self, registration_kwargs): + """Test that the argument list is generated correctly.""" + reg_cmd = subscription.RegistrationCommand(**registration_kwargs) + + args_list = reg_cmd.args + + # Assert that these are always added + assert args_list[0] == "register" + assert "--force" in args_list + + # Assert that password was not added to the args_list + assert len([arg for arg in args_list if "password" in arg]) == 0 + + # Assert the other args were added + if "server_url" in registration_kwargs: + assert "--serverurl={server_url}".format(**registration_kwargs) in args_list + + if "activation_key" in registration_kwargs: + assert "--activationkey={activation_key}".format(**registration_kwargs) in args_list + + if "org" in registration_kwargs: + assert "--org={org}".format(**registration_kwargs) in args_list + + if "username" in registration_kwargs: + assert "--username={username}".format(**registration_kwargs) in args_list + + expected_length = len(registration_kwargs) + 2 + if "password" in registration_kwargs: + expected_length -= 1 + + assert len(args_list) == expected_length + + def test_calling_registration_command_activation_key(self, monkeypatch): + monkeypatch.setattr(utils, "run_subprocess", mock.Mock(return_value=("", 0))) + monkeypatch.setattr(utils, "run_cmd_in_pty", mock.Mock(return_value=("", 0))) + + reg_cmd = subscription.RegistrationCommand(activation_key="0xDEADBEEF", org="Local Organization") + assert reg_cmd() == ("", 0) + + utils.run_subprocess.assert_called_once_with( + ["subscription-manager", "register", "--force", "--activationkey=0xDEADBEEF", "--org=Local Organization"], + print_cmd=False, + ) + assert utils.run_cmd_in_pty.call_count == 0 + + def test_calling_registration_command_password(self, monkeypatch): + monkeypatch.setattr(utils, "run_subprocess", mock.Mock(return_value=("", 0))) + monkeypatch.setattr(utils, "run_cmd_in_pty", mock.Mock(return_value=("", 0))) + + reg_cmd = subscription.RegistrationCommand(username="me_myself_and_i", password="a password") + reg_cmd() + + utils.run_cmd_in_pty.assert_called_once_with( + ["subscription-manager", "register", "--force", "--username=me_myself_and_i"], + expect_script=(("[Pp]assword: ", "a password\n"),), + print_cmd=False, + ) + assert utils.run_cmd_in_pty.call_count == 1 + + assert utils.run_subprocess.call_count == 0 + + +@pytest.mark.parametrize( + ("secret",), + ( + ("my favourite password",), + ("\\)(*&^%f %##@^%&*&^(",), + (" ",), + ("",), + ), +) +def test_hide_secrets(secret): + test_cmd = [ + "register", + "--force", + "--username=jdoe", + "--password", + secret, + "--org=0123", + "--activationkey=%s" % secret, + ] + sanitized_cmd = subscription.hide_secrets(test_cmd) + assert sanitized_cmd == [ + "register", + "--force", + "--username=jdoe", + "--password", + "*****", + "--org=0123", + "--activationkey=*****", + ] + + +def test_hide_secrets_no_secrets(): + """Test that a list with no secrets to hide is not modified.""" + test_cmd = [ + "register", + "--force", + "--username=jdoe", + "--org=0123", + ] + sanitized_cmd = subscription.hide_secrets(test_cmd) + assert sanitized_cmd == [ + "register", + "--force", + "--username=jdoe", + "--org=0123", + ] + + +def test_hide_secret_unexpected_input(caplog): + test_cmd = [ + "register", + "--force", + "--password=SECRETS", + "--username=jdoe", + "--org=0123", + "--activationkey", + # This is missing the activationkey as the second argument + ] + + sanitized_cmd = subscription.hide_secrets(test_cmd) + + assert sanitized_cmd == [ + "register", + "--force", + "--password=*****", + "--username=jdoe", + "--org=0123", + "--activationkey", + ] + assert len(caplog.records) == 1 + assert caplog.records[-1].levelname == "FILE" + assert "Passed arguments had unexpected secret argument," " '--activationkey', without a secret" in caplog.text + + class DownloadRHSMPkgsMocked(unit_tests.MockFunction): def __init__(self): self.called = 0 @@ -554,7 +937,7 @@ def test_download_rhsm_pkgs(version, pkgs_to_download, monkeypatch): assert frozenset(subscription._download_rhsm_pkgs.pkgs_to_download) == pkgs_to_download -class TestUnregisteringSystem: +class TestUnregisteringSystem(object): @pytest.mark.parametrize( ("output", "ret_code", "expected"), (("", 0, "System unregistered successfully."), ("Failed to unregister.", 1, "System unregistration failed")), @@ -583,25 +966,30 @@ def test_unregister_system(self, output, ret_code, expected, monkeypatch, caplog assert expected in caplog.records[-1].message def test_unregister_system_submgr_not_found(self, monkeypatch, caplog): - rpm_command = "rpm --quiet -q subscription-manager" + rpm_command = ["rpm", "--quiet", "-q", "subscription-manager"] + run_subprocess_mock = mock.Mock( side_effect=unit_tests.run_subprocess_side_effect( - ((rpm_command,), ("", 1)), + (rpm_command, ("", 1)), ) ) monkeypatch.setattr(utils, "run_subprocess", value=run_subprocess_mock) subscription.unregister_system() assert "The subscription-manager package is not installed." in caplog.records[-1].message - def test_unregister_system_keep_rhsm(self, monkeypatch, caplog): - monkeypatch.setattr(tool_opts, "keep_rhsm", value=True) + def test_unregister_system_keep_rhsm(self, monkeypatch, caplog, tool_opts): + tool_opts.keep_rhsm = True + subscription.unregister_system() + assert "Skipping due to the use of --keep-rhsm." in caplog.records[-1].message - @mock.patch("convert2rhel.toolopts.tool_opts.keep_rhsm", True) - def test_unregister_system_skipped(self, monkeypatch, caplog): + def test_unregister_system_skipped(self, monkeypatch, caplog, tool_opts): + tool_opts.keep_rhsm = True monkeypatch.setattr(pkghandler, "get_installed_pkg_objects", mock.Mock()) + subscription.unregister_system() + assert "Skipping due to the use of --keep-rhsm." in caplog.text pkghandler.get_installed_pkg_objects.assert_not_called() @@ -632,7 +1020,7 @@ def test_download_rhsm_pkgs_skipped(monkeypatch, caplog): ) def test_verify_rhsm_installed(submgr_installed, keep_rhsm, critical_string, monkeypatch, caplog): if keep_rhsm: - monkeypatch.setattr(tool_opts, "keep_rhsm", keep_rhsm) + monkeypatch.setattr(toolopts.tool_opts, "keep_rhsm", keep_rhsm) if submgr_installed: monkeypatch.setattr( diff --git a/convert2rhel/unit_tests/systeminfo_test.py b/convert2rhel/unit_tests/systeminfo_test.py index 74dc89991e..0d27e7d344 100644 --- a/convert2rhel/unit_tests/systeminfo_test.py +++ b/convert2rhel/unit_tests/systeminfo_test.py @@ -161,6 +161,8 @@ def test_modified_rpm_files_diff_with_differences_after_conversion(self): @unit_tests.mock(logger, "LOG_DIR", unit_tests.TMP_DIR) @unit_tests.mock(utils, "run_subprocess", RunSubprocessMocked(("rpmva\n", 0))) def test_generate_rpm_va(self): + # TODO: move class from unittest to pytest and use global tool_opts fixture + tool_opts.no_rpm_va = False # Check that rpm -Va is executed (default) and stored into the specific file. system_info.generate_rpm_va() diff --git a/convert2rhel/unit_tests/utils_test.py b/convert2rhel/unit_tests/utils_test.py index b9a86b9e05..ab2b7fa8ed 100644 --- a/convert2rhel/unit_tests/utils_test.py +++ b/convert2rhel/unit_tests/utils_test.py @@ -15,6 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . import getpass +import logging import os import sys import unittest @@ -262,6 +263,67 @@ def test_is_rpm_based_os(self): assert is_rpm_based_os() in (True, False) +@pytest.mark.parametrize( + "command, expected_output, expected_code", + ( + (["echo", "foobar"], "foobar", 0), + (["sh", "-c", "exit 56"], "", 56), + ), +) +def test_run_cmd_in_pty_simple(command, expected_output, expected_code, monkeypatch): + output, code = utils.run_cmd_in_pty(command) + assert output.strip() == expected_output + assert code == expected_code + + +def test_run_cmd_in_pty_expect_script(): + if sys.version_info < (3,): + prompt_cmd = "raw_input" + else: + prompt_cmd = "input" + output, code = utils.run_cmd_in_pty( + [sys.executable, "-c", 'print(%s("Ask for password: "))' % prompt_cmd], + expect_script=(("password: *", "Foo bar\n"),), + ) + + assert output.strip().splitlines()[-1] == "Foo bar" + assert code == 0 + + +@pytest.mark.parametrize( + "print_cmd, print_output", + ( + (True, True), + (True, False), + (False, True), + (False, False), + ), +) +def test_run_cmd_in_pty_quiet_options(print_cmd, print_output, global_tool_opts, caplog): + global_tool_opts.debug = True + caplog.set_level(logging.DEBUG) + + output, code = utils.run_cmd_in_pty(["echo", "foo bar"], print_cmd=print_cmd, print_output=print_output) + + expected_count = 1 # There will always be one debug log stating the pty columns + if print_cmd: + assert caplog.records[0].levelname == "DEBUG" + assert caplog.records[0].message.strip() == "Calling command 'echo foo bar'" + expected_count += 1 + + if print_output: + assert caplog.records[-1].levelname == "INFO" + assert caplog.records[-1].message.strip() == "foo bar" + expected_count += 1 + + assert len(caplog.records) == expected_count + + +def test_run_cmd_in_pty_check_for_deprecated_string(): + with pytest.raises(TypeError, match="cmd should be a list, not a str"): + utils.run_cmd_in_pty("echo foobar") + + def test_get_package_name_from_rpm(monkeypatch): monkeypatch.setattr(utils, "rpm", get_rpm_mocked()) monkeypatch.setattr(utils, "get_rpm_header", lambda _: get_rpm_header_mocked()) diff --git a/convert2rhel/utils.py b/convert2rhel/utils.py index e0d43a3a70..fe0ee16720 100644 --- a/convert2rhel/utils.py +++ b/convert2rhel/utils.py @@ -153,15 +153,22 @@ def run_subprocess(cmd, print_cmd=True, print_output=True): return output, return_code -def run_cmd_in_pty(cmd, print_cmd=True, print_output=True, columns=120): +def run_cmd_in_pty(cmd, expect_script=(), print_cmd=True, print_output=True, columns=120): """Similar to run_subprocess(), but the command is executed in a pseudo-terminal. The pseudo-terminal can be useful when a command prints out a different output with or without an active terminal session. E.g. yumdownloader does not print the name of the downloaded rpm if not executed from a terminal. Switching off printing the command can be useful in case it contains a password in plain text. - :param cmd: The command to execute, including the options, e.g. "ls -al" - :type cmd: string + :param cmd: The command to execute, including the options as a list, e.g. ["ls", "-al"] + :type cmd: list + :param expect_script: An iterable of pairs of expected strings and response strings. By giving + these pairs, interactive programs can be scripted. Example: + run_cmd_in_pty(['sudo', 'whoami'], [('password: ', 'sudo_password\n')]) + Note1: The caller is responsible for adding newlines to the response strings where + needed. Note2: This function will await pexpect.EOF after all of the pairs in expect_script + have been exhausted. + :type expect_script: iterable of 2-tuples or strings: :param print_cmd: Log the command (to both logfile and stdout) :type print_cmd: bool :param print_output: Log the combined stdout and stderr of the executed command (to both logfile and stdout) @@ -180,28 +187,59 @@ def run_cmd_in_pty(cmd, print_cmd=True, print_output=True, columns=120): if print_cmd: loggerinst.debug("Calling command '%s'" % " ".join(cmd)) - class PexpectSizedWindowSpawn(pexpect.spawn): - # https://github.com/pexpect/pexpect/issues/134 - def setwinsize(self, rows, cols): - super(PexpectSizedWindowSpawn, self).setwinsize(0, columns) - - process = PexpectSizedWindowSpawn(cmd[0], cmd[1:], env={"LC_ALL": "C"}, timeout=None) - - # The setting of window size is super unreliable - process.setwinsize(0, columns) + process = PexpectSizedWindowSpawn(cmd[0], cmd[1:], env={"LC_ALL": "C", "LANG": "C"}, timeout=None) + # Needed on RHEL-8+ (see comments near PexpectSizedWindowSpawn definition) + process.setwinsize(1, columns) loggerinst.debug("Pseudo-PTY columns set to: %s" % (process.getwinsize(),)) + for expect, send in expect_script: + process.expect(expect) + process.send(send) + process.expect(pexpect.EOF) + try: + process.wait() + except pexpect.ExceptionPexpect: + # RHEL 7's pexpect throws an exception if the process has already exited + # We're just waiting to be sure that the process has finished so we can + # ignore the exception. + pass + + # Per the pexpect API, this is necessary in order to get the return code + process.close() + return_code = process.exitstatus + output = process.before.decode() if print_output: loggerinst.info(output.rstrip("\n")) - process.close() # Per the pexpect API, this is necessary in order to get the return code - return_code = process.exitstatus - return output, return_code +# For pexpect released prior to 2015 (RHEL7's pexpect-2.3), +# spawn.__init__() hardcodes a call to setwinsize(24, 80) to set the +# initial terminal size. There is no official way to set the terminal size +# to a custom value before the process starts. This can cause an issue with +# truncated lines for processes which read the terminal size when they +# start and never refresh that value (like yumdownloader) +# +# overriding setwinsize to set the columns to the size we want in this +# subclass is a kludge for the issue. On pexpect-2.3, it fixes the issue +# because of the setwinsize call in __init__() at the cost of never being +# able to change the column size later. On later pexpect (RHEL-8 has +# pexpect-4.3), this doesn't fix the issue of the terminal size being small +# when the subprocess starts but dnf download checks the terminal's size +# just before it prints the statusline we care about. So setting the +# terminal size via setwinsize() after the process is created works (note: +# there is a race condition there but it's unlikely to ever trigger as it +# would require downloading a package to happen quicker than the time +# between calling spawn.__init__() and spawn.setwinsize()) +class PexpectSizedWindowSpawn(pexpect.spawn): + # https://github.com/pexpect/pexpect/issues/134 + def setwinsize(self, rows, cols): + super(PexpectSizedWindowSpawn, self).setwinsize(rows, 120) + + def let_user_choose_item(num_of_options, item_to_choose): """Ask user to enter a number corresponding to the item they choose.""" while True: # Loop until user enters a valid number diff --git a/plans/tier0.fmf b/plans/tier0.fmf index bcf8e9cf6c..d1e3e5b727 100644 --- a/plans/tier0.fmf +++ b/plans/tier0.fmf @@ -5,6 +5,10 @@ discover+: discover+: test: basic-sanity-checks +/check_cve: + discover+: + test: check-cve-2022-1662 + /check_user_response: discover+: test: check-user-response diff --git a/tests/ansible_collections/roles/install-testing-deps/tasks/main.yml b/tests/ansible_collections/roles/install-testing-deps/tasks/main.yml index cefdd951a7..96e7fe6237 100644 --- a/tests/ansible_collections/roles/install-testing-deps/tasks/main.yml +++ b/tests/ansible_collections/roles/install-testing-deps/tasks/main.yml @@ -1,7 +1,11 @@ --- - name: Ensure python3 yum: - name: python3 + # gcc and python3-devel are needed for psutil + name: + - "python3" + - "gcc" + - "python3-devel" state: present - name: Install pip if not present @@ -11,6 +15,14 @@ - name: Install pytest framework dependencies pip: - name: ["pytest", "pytest-cov", "envparse", "click", "pexpect", "dataclasses", "jsonschema"] + name: + - "pytest" + - "pytest-cov" + - "envparse" + - "click" + - "pexpect" + - "dataclasses" + - "jsonschema" + - "psutil" # Use pip3 in case pip was installed via rpm package on this system executable: pip3 diff --git a/tests/integration/tier0/check-cve-2022-1662/main.fmf b/tests/integration/tier0/check-cve-2022-1662/main.fmf new file mode 100644 index 0000000000..7da180d8a2 --- /dev/null +++ b/tests/integration/tier0/check-cve-2022-1662/main.fmf @@ -0,0 +1,6 @@ +summary: check cve-2022-1662 is fixed + +tier: 0 + +test: | + pytest -svv diff --git a/tests/integration/tier0/check-cve-2022-1662/test_cve_fixes.py b/tests/integration/tier0/check-cve-2022-1662/test_cve_fixes.py new file mode 100644 index 0000000000..3072246154 --- /dev/null +++ b/tests/integration/tier0/check-cve-2022-1662/test_cve_fixes.py @@ -0,0 +1,29 @@ +from multiprocessing import Pool + +import psutil + + +def watchdog(): + while True: + for process in psutil.process_iter(): + # For some reason the psutil catches subscription-manager in process.name() + # as 'subscription-ma', thus using 'subscription' to catch it + if "subscription" in process.name(): + return process.cmdline() + + +def test_passing_password_to_submrg(convert2rhel): + username = "testname" + password = "EXAMPLE&hTYGHKPvU7Ewd" + with convert2rhel(f"-y --no-rpm-va -u {username} -p {password}") as c2r: + # Just to be sure, try to run through all three tries + # of the registration process in case the race condition applies + for subscription_try in range(2): + c2r.expect("Registering the system using subscription-manager ...") + # Run watchdog function using multiprocessing pool + # as soon as Convert2RHEL tries to call subscription-manager + with Pool(processes=1) as pool: + watcher = pool.apply_async(watchdog, ()) + # Check for the password not being passed to the subscription-manager + print(watcher.get()) + assert not [cmdline for cmdline in watcher.get() if password in cmdline] diff --git a/tests/integration/tier0/check-user-response/test_user_response.py b/tests/integration/tier0/check-user-response/test_user_response.py index ad468dea0c..2075edcf97 100644 --- a/tests/integration/tier0/check-user-response/test_user_response.py +++ b/tests/integration/tier0/check-user-response/test_user_response.py @@ -41,7 +41,7 @@ def test_check_user_response_organization(convert2rhel): env.str("RHSM_KEY"), ) ) as c2r: - c2r.expect_exact(" ... activation key detected: ") + c2r.expect_exact("activation key detected") c2r.expect_exact("Organization: ") c2r.sendline() assert c2r.expect_exact(["Organization", "Registering the system"]) == 0