Skip to content

Commit

Permalink
Do not put the subscription-manager password onto the command line. (#…
Browse files Browse the repository at this point in the history
…492)

* Do not put the subscription-manager password onto the command line.

Fixes CVE-2022-0852

Passing values on the command line is insecure.  With this change,
the rhsm password is passed interactively to subscription-manager
instead of being passed on the commandline when we shell out to it.

The structure of this change deserves a bit of description.  Previously,
we called one function to assemble all of the information needed to
invoke subscription-manager and then returned that as a string that
could be run as a command line.  We called a second function with that
string to actually run the command.

To send the password interactively, we need to stop adding the password
to the string of command line arguments but it still makes sense to keep
the code that figures out the password together with the code which
finds the other command line args.  So it makes sense to keep a single
function to do that but return the password and other args separately.

We could use a dict, a class, or a tuple as the return value from the
function.  That doesn't feel too ugly.  But then we need to pass that
structure into the function which takes care of invoking
subscription-manager on the command line and that *does* feel ugly.
That function would have to care about the structure of the data we pass
in (If a tuple, what is the order?  If a dict, what are the field
names?, etc).  To take care of this, we can make the data structure that
we return from assembling the data a class and the function which calls
subscription-manager a method of that class because it's quite natural
for a method to have knowledge of what attributes the class contains.

Hmm... but now that we have a class with behaviours (methods), it starts
to feel like we could do some more things.  A function that fills in the
values of a class, validates that the data is proper, and then returns
an instance of that class is really a constructor, right?  So it makes
sense to move the function which assembles the data and returns the
class a constructor.  But that particular function isn't very generic:
it uses knowledge of our global toolopts.tool_opts to populate the
class.  So let's write a simple __init__() that takes all of the values
needed as separate parameters and then create an alternative constructor
(an @classmethod which returns an instance of the class) which gets the
data from a ToolOpt, and then calls __init__() with those values and
returns the resulting class.

Okay, things are much cleaner now, but there's one more thing that we
can do.  We now have a class that has a constructor to read in data and
a single method that we can call to return some results.  What do we
call an object that can be called to return a result?  A function or
more generically, in python, a Callable.  We can turn this class into a
callable by renaming the method which actually invokes
subscription-manager __call__().

What we have at the end of all this is a way to create a function which
knows about the settings in tool_opts which we can then call to perform
our subscription-manager needs::

    registration_command = RegistrationCommand.from_tool_opts()
    return_code = registration_command()

OAMG-6551 #done  convert2rhel now passes the rhsm password to subscription-manager securely.

* Modify the hiding of secret to hide both --password SECRET and
  --password=SECRET.  Currently we only use it with passwords that we
  are passing in the second form (to subscription-manager) but catching
  both will future proof us in case we use this function for more things
  in the future. (Eric Gustavsson)
  * Note: using generator expressions was tried here but found that they
    only bind the variable being iterated over at definition time, the
    rest of the variables are bound when the generator is used.  This
    means that constructing a set of generators in a loop doesn't really
    work as the loop variables that you use in the generator will have a
    different value by the time you're done.

    So a nested for loop and if-else is the way to implement this.

* Add global_tool_opts fixture to conftest.py which monkeypatches a
  fresh ToolOpts into convert2rhel.toolopts.tool_opts.  That way tests
  can modify that without worrying about polluting other tests.

  * How toolopts is imported in the code makes a difference whether this
    fixture is sufficient or if you need to do a little more work. If
    the import is::

      from convert2rhel import toolopts
      do_something(toolopts.tool_opts.username)

   then your test can just do::

      def test_thing_that_uses_toolopts(global_tool_opts):
          global_tool_opts.username = 'badger'

   Most of our code, though, imports like this::

      # In subscription.py, for instance
      from convert2rhel.toolopts import tool_opts
      do_something(tool_opts.username)

   so the tests should do something like this::

      def test_toolopts_differently(global_test_opts, monkeypatch):
          monkeypatch.setattr(subscription, 'tool_opts', global_tool_opts)

* Sometimes a process will close stdout before it is done processing.
  When that happens, we need to wait() for the process to end before
  closing the pty.  If we don't wait, the process will receive a HUP
  signal which may end it before it is done.
  * But on RHEL7, pexpect.wait() will throw an exception if you wait on
    an already finished process.  So we need to ignore that exception
    there.

lgtm is flagging some cases where it thinks we are logging sensitive
data.  Here's why we're ignoring lgtm:

* One case logs the username to the terminal as a hint for the user as
  to what account is being used.  We consider username to not be
  sensitive.
* One case logs the subscription-manager invocation.  We run the command
  line through hide_secrets() to remove private information when we do
  this.
* The last case logs which argument was passed to subscription-manager
  without a secret attached to it.  ie: "--password" is passed to
  subscription-manager without a password being added afterwards.  In
  this case, the string "--password" will be logged.

Testing farm doesn't have python-devel installed.
We need that to install psutil needed as a testing dependency.

Co-authored-by: Daniel Diblik <[email protected]>
  • Loading branch information
abadger and danmyway authored May 27, 2022
1 parent 8658831 commit 8d72fb0
Show file tree
Hide file tree
Showing 13 changed files with 983 additions and 287 deletions.
4 changes: 4 additions & 0 deletions .gitleaks.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[allowlist]
paths = [
'''tests/integration/tier0/check-cve/test_cve_fixes.py''',
]
246 changes: 195 additions & 51 deletions convert2rhel/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,21 @@ def register_system():

# Loop the registration process until successful registration
attempt = 0
while True and attempt < MAX_NUM_OF_ATTEMPTS_TO_SUBSCRIBE:
registration_cmd = get_registration_cmd()

while attempt < MAX_NUM_OF_ATTEMPTS_TO_SUBSCRIBE:
registration_cmd = RegistrationCommand.from_tool_opts(tool_opts)
attempt_msg = ""
if attempt > 0:
attempt_msg = "Attempt %d of %d: " % (attempt + 1, MAX_NUM_OF_ATTEMPTS_TO_SUBSCRIBE)
loggerinst.info("%sRegistering the system using subscription-manager ...", attempt_msg)

output, ret_code = call_registration_cmd(registration_cmd)
output, ret_code = registration_cmd()
if ret_code == 0:
# Handling a signal interrupt that was previously handled by
# subscription-manager.
if "user interrupted process" in output.lower():
raise KeyboardInterrupt
return

loggerinst.info("System registration failed with return code = %s" % str(ret_code))
if tool_opts.credentials_thru_cli:
loggerinst.warning(
Expand All @@ -138,65 +138,209 @@ def register_system():
loggerinst.critical("Unable to register the system through subscription-manager.")


def get_registration_cmd():
"""Build a command for subscription-manager for registering the system."""
loggerinst.info("Building subscription-manager command ... ")
registration_cmd = ["subscription-manager", "register", "--force"]

loggerinst.info("Checking for activation key ...")
if tool_opts.activation_key:
# Activation key has been passed
# -> username/password not required
# -> organization required
loggerinst.info(" ... activation key detected: %s" % tool_opts.activation_key)

# TODO: Parse the output of 'subscription-manager orgs' and let the
# user choose from the available organizations. If there's just one,
# pick it automatically.
# Organization is required when activation key is used
class RegistrationCommand(object):
def __init__(self, activation_key=None, org=None, username=None, password=None, server_url=None):
"""
A callable that can register a system with subscription-manager.
:kwarg server_url: Optional URL to the subscription-manager backend.
Useful when the customer has an on-prem subscription-manager instance.
:kwarg activation_key: subscription-manager activation_key that can be
used to register the system. Org must be specified if this was given.
:kwarg org: The organization that the activation_key is associated with.
It is required if activation_key is specified.
:kwarg username: Username to authenticate with subscription-manager.
Required if password is specified.
:kwarg password: Password to authenticate with subscription-manager.
It is required if username is specified.
.. note:: Either activation_key and org or username and password must
be specified.
"""
self.cmd = "subscription-manager"
self.server_url = server_url

if activation_key and not org:
raise ValueError("org must be specified if activation_key is used")

self.activation_key = activation_key
self.org = org

self.password = password
self.username = username

if (password and not username) or (username and not password):
raise ValueError("username and password must be used together")

elif not password:
# No password set
if not self.activation_key:
raise ValueError("activation_key and org or username and password must be specified")

@classmethod
def from_tool_opts(cls, tool_opts):
"""
Alternate constructor that gets subscription-manager args from ToolOpts.
convert2rhel's command-line contains the information needed to register
with subscription-manager. Get the information from the passed in
ToolOpts structure to create the RegistrationCommand.
:arg tool_opts: The :class:`convert2rhel.toolopts.ToolOpts` structure to
retrieve the subscription-manager information from.
"""
loggerinst.info("Gathering subscription-manager registration info ... ")

registration_attributes = {}
if tool_opts.org:
loggerinst.info(" ... org detected")

org = tool_opts.org
while not org:
org = utils.prompt_user("Organization: ")

registration_cmd.extend(("--activationkey=%s" % tool_opts.activation_key, "--org=%s" % org))
else:
loggerinst.info(" ... activation key not found, username and password required")
loggerinst.info(" ... organization detected")
registration_attributes["org"] = tool_opts.org

if tool_opts.activation_key:
# Activation key has been passed
# -> username/password not required
# -> organization required
loggerinst.info(" ... activation key detected")
registration_attributes["activation_key"] = tool_opts.activation_key

while "org" not in registration_attributes:
loggerinst.info(" ... activation key requires organization")
# Organization is required when activation key is used
# TODO: Parse the output of 'subscription-manager orgs' and let the
# user choose from the available organizations. If there's just one,
# pick it automatically.
org = utils.prompt_user("Organization: ").strip()
# In case the user entered the empty string
if org:
registration_attributes["org"] = org
else:
# No activation key -> username/password required
if tool_opts.username and tool_opts.password:
loggerinst.info(" ... activation key not found, using given username and password")
else:
loggerinst.info(" ... activation key not found, username and password required")

if tool_opts.username:
loggerinst.info(" ... username detected")
username = tool_opts.username
else:
username = ""
while not username:
username = utils.prompt_user("Username: ")

registration_attributes["username"] = username

if tool_opts.password:
loggerinst.info(" ... password detected")
password = tool_opts.password
else:
if tool_opts.username:
# Hint user for which username they need to enter pswd
loggerinst.info("Username: %s", username) # lgtm[py/clear-text-logging-sensitive-data]
password = ""
while not password:
password = utils.prompt_user("Password: ", password=True)

registration_attributes["password"] = password

if tool_opts.serverurl:
loggerinst.debug(" ... using custom RHSM URL")
registration_attributes["server_url"] = tool_opts.serverurl

return cls(**registration_attributes)

@property
def args(self):
"""
This property is a list of the command-line arguments that will be passed to
subscription-manager to register the system. Set the individual attributes for
:attr:`server_url`, :attr:`activation_key`, etc to affect the values here.
.. note:: :attr:`password` is not passed on the command line. Instead,
it is sent to the running subscription-manager process via pexpect.
"""
args = ["register", "--force"]

if self.server_url:
args.append("--serverurl=%s" % self.server_url)

if self.activation_key:
args.append("--activationkey=%s" % self.activation_key)

if self.org:
args.append("--org=%s" % self.org)

if self.username:
args.append("--username=%s" % self.username)

return args

def __call__(self):
"""
Run the subscription-manager command.
Wrapper for running the subscription-manager command that keeps
secrets secure.
"""
if self.password:
loggerinst.debug(
"Calling command '%s %s'" % (self.cmd, " ".join(hide_secrets(self.args)))
) # lgtm[py/clear-text-logging-sensitive-data]
output, ret_code = utils.run_cmd_in_pty(
[self.cmd] + self.args, expect_script=(("[Pp]assword: ", self.password + "\n"),), print_cmd=False
)
else:
# Warning: Currently activation_key can only be specified on the CLI. This is insecure
# but there's nothing we can do about it for now. Once subscription-manager issue:
# https://issues.redhat.com/browse/ENT-4724 is implemented, we can change both password
# and activation_key to use a file-based approach to passing the secrets.
output, ret_code = utils.run_subprocess([self.cmd] + self.args, print_cmd=False)

if tool_opts.username:
loggerinst.info(" ... username detected")
return output, ret_code

username = tool_opts.username
while not username:
username = utils.prompt_user("Username: ")

if tool_opts.password:
loggerinst.info(" ... password detected")
def hide_secrets(args):
"""
Replace secret values with asterisks.
password = tool_opts.password
while not password:
password = utils.prompt_user("Password: ", password=True)
This function takes a list of arguments which will be passed to
subscription-manager on the command line and returns a new list
that has any secret values obscured with asterisks.
registration_cmd.extend(("--username=%s" % username, "--password=%s" % password))
:arg args: An argument list for subscription-manager which may contain
secret values.
:returns: A new list of arguments with secret values hidden.
"""
obfuscation_string = "*" * 5
secret_args = frozenset(("--password", "--activationkey", "--token"))

if tool_opts.serverurl:
loggerinst.debug(" ... using custom RHSM URL")
registration_cmd.append("--serverurl=%s" % tool_opts.serverurl)
sanitized_list = []
hide_next = False
for arg in args:
if hide_next:
# Second part of a two part secret argument (like --password *SECRET*)
arg = obfuscation_string
hide_next = False

return registration_cmd
elif arg in secret_args:
# First part of a two part secret argument (like *--password* SECRET)
hide_next = True

else:
# A secret argument in one part (like --password=SECRET)
for problem_arg in secret_args:
if arg.startswith(problem_arg + "="):
arg = "{0}={1}".format(problem_arg, obfuscation_string)

def call_registration_cmd(registration_cmd):
"""Wrapper for run_subprocess that avoids leaking password in the log."""
loggerinst.debug("Calling command '%s'" % hide_password(" ".join(registration_cmd)))
return utils.run_subprocess(registration_cmd, print_cmd=False)
sanitized_list.append(arg)

if hide_next:
loggerinst.debug(
"Passed arguments had unexpected secret argument,"
" '{0}', without a secret".format(sanitized_list[-1]) # lgtm[py/clear-text-logging-sensitive-data]
)

def hide_password(cmd):
"""Replace plaintext password with asterisks."""
return re.sub('--password=".*?"', '--password="*****"', cmd)
return sanitized_list


def replace_subscription_manager():
Expand Down
2 changes: 1 addition & 1 deletion convert2rhel/unit_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_check_for_yum_updates(monkeypatch):
monkeypatch.setattr(package_handler, "get_packages_to_update", value=packages_to_update_mock)

assert package_handler.get_packages_to_update() is not None
assert packages_to_update_mock.assert_called_once_with(["package-1", "package-2"])
packages_to_update_mock.assert_called_once_with(["package-1", "package-2"])
```

And this other example, of a test that don't need any mocks for external
Expand Down
9 changes: 8 additions & 1 deletion convert2rhel/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from convert2rhel import redhatrelease, utils
from convert2rhel import redhatrelease, toolopts, utils
from convert2rhel.logger import setup_logger_handler
from convert2rhel.systeminfo import system_info
from convert2rhel.toolopts import tool_opts
Expand Down Expand Up @@ -60,6 +60,13 @@ def setup_logger(tmpdir):
setup_logger_handler(log_name="convert2rhel", log_dir=str(tmpdir))


@pytest.fixture
def global_tool_opts(monkeypatch):
local_tool_opts = toolopts.ToolOpts()
monkeypatch.setattr(toolopts, "tool_opts", local_tool_opts)
return local_tool_opts


@pytest.fixture()
def pretend_os(request, pkg_root, monkeypatch):
"""Parametric fixture to pretend to be one of available OS for convertion.
Expand Down
Loading

0 comments on commit 8d72fb0

Please sign in to comment.