diff --git a/lsr_role2collection.py b/lsr_role2collection.py index 6f07287c..bfa62e90 100644 --- a/lsr_role2collection.py +++ b/lsr_role2collection.py @@ -25,16 +25,282 @@ import argparse import errno +import fnmatch import logging import os import re -import fnmatch import sys import textwrap -from shutil import copytree, copy2, ignore_patterns, rmtree -from ansible_role_parser import LSRFileTransformerBase, LSRTransformer, get_role_modules from pathlib import Path +from ruamel.yaml import YAML +from shutil import copytree, copy2, ignore_patterns, rmtree + +from ansible.errors import AnsibleParserError +from ansible.parsing.dataloader import DataLoader +from ansible.parsing.mod_args import ModuleArgsParser +from ansible.parsing.yaml.objects import AnsibleMapping, AnsibleSequence + +ALL_ROLE_DIRS = [ + "defaults", + "examples", + "files", + "handlers", + "library", + "meta", + "module_utils", + "tasks", + "templates", + "tests", + "vars", +] + +PLAY_KEYS = { + "gather_facts", + "handlers", + "hosts", + "import_playbook", + "post_tasks", + "pre_tasks", + "roles", + "tasks", +} + +TASK_LIST_KWS = [ + "always", + "block", + "handlers", + "post_tasks", + "pre_tasks", + "rescue", + "tasks", +] + + +class LSRException(Exception): + pass + + +def get_role_dir(role_path, dirpath): + dir_pth = Path(dirpath) + if role_path == dir_pth: + return None, None + relpath = dir_pth.relative_to(role_path) + base_dir = relpath.parts[0] + if base_dir in ALL_ROLE_DIRS: + return base_dir, relpath + return None, None + + +def get_file_type(item): + if isinstance(item, AnsibleMapping): + if "galaxy_info" in item or "dependencies" in item: + return "meta" + return "vars" + elif isinstance(item, AnsibleSequence): + return "tasks" + else: + raise LSRException(f"Error: unknown type of file: {item}") + + +def get_item_type(item): + if isinstance(item, AnsibleMapping): + for key in PLAY_KEYS: + if key in item: + return "play" + if "block" in item: + return "block" + return "task" + else: + raise LSRException(f"Error: unknown type of item: {item}") + + +class LSRFileTransformerBase(object): + + # we used to try to not deindent comment lines in the Ansible yaml, + # but this changed the indentation when comments were used in + # literal strings, which caused test failures - so for now, we + # have to live with poorly indented Ansible comments . . . + # INDENT_RE = re.compile(r'^ (?! *#)', flags=re.MULTILINE) + INDENT_RE = re.compile(r"^ ", flags=re.MULTILINE) + HEADER_RE = re.compile(r"^(---\n|.*\n---\n)", flags=re.DOTALL) + FOOTER_RE = re.compile(r"\n([.][.][.]|[.][.][.]\n.*)$", flags=re.DOTALL) + + def __init__(self, filepath, rolename, args): + self.filepath = filepath + self.prefix = args["prefix"] + self.subrole_prefix = args["subrole_prefix"] + self.replace_dot = args["replace_dot"] + self.role_modules = args["role_modules"] + dl = DataLoader() + self.ans_data = dl.load_from_file(filepath) + if self.ans_data is None: + raise LSRException(f"file is empty {filepath}") + self.file_type = get_file_type(self.ans_data) + self.rolename = rolename + buf = open(filepath).read() + self.ruamel_yaml = YAML(typ="rt") + match = re.search(LSRFileTransformerBase.HEADER_RE, buf) + if match: + self.header = match.group(1) + else: + self.header = "" + match = re.search(LSRFileTransformerBase.FOOTER_RE, buf) + if match: + self.footer = match.group(1) + "\n" + else: + self.footer = "" + self.ruamel_yaml.default_flow_style = False + self.ruamel_yaml.preserve_quotes = True + self.ruamel_yaml.width = 1024 + self.ruamel_data = self.ruamel_yaml.load(buf) + self.ruamel_yaml.indent(mapping=2, sequence=4, offset=2) + self.outputfile = None + self.outputstream = sys.stdout + + def run(self): + if self.file_type == "vars": + self.handle_vars(self.ans_data, self.ruamel_data) + elif self.file_type == "meta": + self.handle_meta(self.ans_data, self.ruamel_data) + else: + for a_item, ru_item in zip(self.ans_data, self.ruamel_data): + self.handle_item(a_item, ru_item) + + def write(self): + def xform(thing): + logging.debug(f"xform thing {thing}") + if self.file_type == "tasks": + thing = re.sub(LSRFileTransformerBase.INDENT_RE, "", thing) + thing = self.header + thing + if not thing.endswith("\n"): + thing = thing + "\n" + thing = thing + self.footer + return thing + + if self.outputfile: + outstrm = open(self.outputfile, "w") + else: + outstrm = self.outputstream + self.ruamel_yaml.dump(self.ruamel_data, outstrm, transform=xform) + + def task_cb(self, a_task, ru_task, module_name, module_args, delegate_to): + """subclass will override""" + pass + + def other_cb(self, a_item, ru_item): + """subclass will override""" + pass + + def vars_cb(self, a_item, ru_item): + """subclass will override""" + pass + + def meta_cb(self, a_item, ru_item): + """subclass will override""" + pass + + def handle_item(self, a_item, ru_item): + """handle any type of item - call the appropriate handlers""" + ans_type = get_item_type(a_item) + self.handle_vars(a_item, ru_item) + self.handle_other(a_item, ru_item) + if ans_type == "task": + self.handle_task(a_item, ru_item) + self.handle_task_list(a_item, ru_item) + + def handle_other(self, a_item, ru_item): + """handle properties of Ansible item other than vars and tasks""" + self.other_cb(a_item, ru_item) + + def handle_vars(self, a_item, ru_item): + """handle vars of Ansible item""" + self.vars_cb(a_item, ru_item) + + def handle_meta(self, a_item, ru_item): + """handle meta/main.yml file""" + self.meta_cb(a_item, ru_item) + + def handle_task(self, a_task, ru_task): + """handle a single task""" + mod_arg_parser = ModuleArgsParser(a_task) + try: + action, args, delegate_to = mod_arg_parser.parse( + skip_action_validation=True + ) + except AnsibleParserError as e: + raise LSRException( + "Couldn't parse task at %s (%s)\n%s" + % (a_task.ansible_pos, e.message, a_task) + ) + self.task_cb(a_task, ru_task, action, args, delegate_to) + + def handle_task_list(self, a_item, ru_item): + """item has one or more fields which hold a list of Task objects""" + for kw in TASK_LIST_KWS: + if kw in a_item: + for a_task, ru_task in zip(a_item[kw], ru_item[kw]): + self.handle_item(a_task, ru_task) + + +def get_role_modules(role_path): + """get the modules from the role + returns a set() of module names""" + role_modules = set() + library_path = Path(os.path.join(role_path, "library")) + if library_path.is_dir(): + for mod_file in library_path.iterdir(): + if mod_file.is_file() and mod_file.stem != "__init__": + role_modules.add(mod_file.stem) + return role_modules + + +class LSRTransformer(object): + """Transform all of the .yml files in a role or role subdir""" + + def __init__( + self, + role_path, + transformer_args, + is_role_dir=True, + role_name=None, + file_xfrm_cls=LSRFileTransformerBase, + ): + """Create a role transformer. The user can specify the specific class + to use for transforming each file, and the extra arguments to pass to the + constructor of that class + is_role_dir - if True, role_path is the role directory (with all of the usual role subdirs) + if False, just operate on the .yml files found in role_path""" + self.role_name = role_name + self.role_path = role_path + self.is_role_dir = is_role_dir + self.transformer_args = transformer_args + self.file_xfrm_cls = file_xfrm_cls + if self.is_role_dir and not self.role_name: + self.role_name = os.path.basename(self.role_path) + + def run(self): + for (dirpath, _, filenames) in os.walk(self.role_path): + if self.is_role_dir: + role_dir, _ = get_role_dir(self.role_path, dirpath) + if not role_dir: + continue + for filename in filenames: + if not filename.endswith(".yml"): + continue + filepath = os.path.join(dirpath, filename) + logging.debug(f"filepath {filepath}") + try: + lsrft = self.file_xfrm_cls( + filepath, self.role_name, self.transformer_args + ) + lsrft.run() + lsrft.write() + except LSRException as lsrex: + logging.debug(f"Could not transform {filepath}: {lsrex}") + + +# ============================================================================== ROLE_DIRS = ( "defaults", @@ -136,25 +402,25 @@ def task_cb(self, a_task, ru_task, module_name, module_args, delegate_to): lsr_rolename = "linux-system-roles." + self.rolename logging.debug(f"\ttask role {rolename}") if rolename == self.rolename or rolename == lsr_rolename: - ru_task[module_name]["name"] = prefix + self.rolename + ru_task[module_name]["name"] = self.prefix + self.rolename elif rolename.startswith("{{ role_path }}"): match = re.match(r"{{ role_path }}/roles/([\w\d\.]+)", rolename) - if match.group(1).startswith(subrole_prefix): - ru_task[module_name]["name"] = prefix + match.group(1).replace( - ".", replace_dot + if match.group(1).startswith(self.subrole_prefix): + ru_task[module_name]["name"] = self.prefix + match.group(1).replace( + ".", self.replace_dot ) else: ru_task[module_name]["name"] = ( - prefix - + subrole_prefix - + match.group(1).replace(".", replace_dot) + self.prefix + + self.subrole_prefix + + match.group(1).replace(".", self.replace_dot) ) - elif module_name in role_modules: + elif module_name in self.role_modules: logging.debug(f"\ttask role module {module_name}") # assumes ru_task is an orderreddict idx = tuple(ru_task).index(module_name) val = ru_task.pop(module_name) - ru_task.insert(idx, prefix + module_name, val) + ru_task.insert(idx, self.prefix + module_name, val) def other_cb(self, a_item, ru_item): """do something with the other non-task information in an item @@ -197,10 +463,10 @@ def change_roles(self, ru_item, roles_kw): if role[key] == lsr_rolename or self.comp_rolenames( role[key], self.rolename ): - role[key] = prefix + self.rolename + role[key] = self.prefix + self.rolename changed = True elif role == lsr_rolename or self.comp_rolenames(role, self.rolename): - role = prefix + self.rolename + role = self.prefix + self.rolename changed = True if changed: ru_item[roles_kw][idx] = role @@ -215,7 +481,6 @@ def write(self): # Once python 3.8 is available in Travis CI, # replace lsr_copytree with shutil.copytree with dirs_exist_ok=True. def lsr_copytree(src, dest, symlinks=False, dirs_exist_ok=False, ignore=None): - ipatterns = ignore_patterns(ignore) if dest.exists(): if dest.is_dir(): for sr in src.iterdir(): @@ -226,28 +491,41 @@ def lsr_copytree(src, dest, symlinks=False, dirs_exist_ok=False, ignore=None): if subsrc.is_dir(): if subdest.exists() and dirs_exist_ok: rmtree(subdest) - copytree( - subsrc, subdest, symlinks=symlinks, ignore=ipatterns + lsr_copytree( + subsrc, + subdest, + symlinks=symlinks, + ignore=ignore, + dirs_exist_ok=True, ) else: - copy2(subsrc, subdest, follow_symlinks=symlinks) + if ( + subdest.exists() or subdest.is_symlink() + ) and dirs_exist_ok: + subdest.unlink() + copy2(subsrc, subdest, follow_symlinks=(not symlinks)) else: if subsrc.is_dir(): if subdest.exists() and dirs_exist_ok: rmtree(subdest) - copytree(subsrc, subdest, symlinks=symlinks) + lsr_copytree( + subsrc, + subdest, + symlinks=symlinks, + dirs_exist_ok=dirs_exist_ok, + ) else: - copy2(subsrc, subdest, follow_symlinks=symlinks) - elif ignore: - dest.unlink() - copytree(src, dest, ignore=ipatterns, symlinks=symlinks) + if (subdest.exists() or subdest.is_symlink()) and dirs_exist_ok: + subdest.unlink() + copy2(subsrc, subdest, follow_symlinks=(not symlinks)) else: - dest.unlink() - copytree(src, dest, symlinks=symlinks) - elif ignore: - copytree(src, dest, ignore=ipatterns, symlinks=symlinks) + if (dest.exists() or dest.is_symlink()) and dirs_exist_ok: + dest.unlink() + copytree(src, dest, symlinks=symlinks, ignore=ignore) else: - copytree(src, dest, symlinks=symlinks) + if dest.is_symlink() and dirs_exist_ok: + dest.unlink() + copytree(src, dest, symlinks=symlinks, ignore=ignore) def dir_to_plugin(v): @@ -274,86 +552,15 @@ def file_replace(path, find, replace, file_patterns): f.write(s) -HOME = os.environ.get("HOME") -parser = argparse.ArgumentParser() -parser.add_argument( - "--namespace", - type=str, - default=os.environ.get("COLLECTION_NAMESPACE", "fedora"), - help="Collection namespace; default to fedora", -) -parser.add_argument( - "--collection", - type=str, - default=os.environ.get("COLLECTION_NAME", "system_roles"), - help="Collection name; default to system_roles", -) -parser.add_argument( - "--dest-path", - type=Path, - default=os.environ.get("COLLECTION_DEST_PATH", HOME + "/.ansible/collections"), - help="Path to parent of collection where role should be migrated", -) -parser.add_argument( - "--src-path", - type=Path, - default=os.environ.get("COLLECTION_SRC_PATH", HOME + "/linux-system-roles"), - help="Path to the parent directory of the source role; default to ${HOME}/linux-system-roles", -) -parser.add_argument( - "--role", - type=str, - default=os.environ.get("COLLECTION_ROLE"), - help="Role to convert to collection", -) -parser.add_argument( - "--replace-dot", - type=str, - default=os.environ.get("COLLECTION_REPLACE_DOT", "_"), - help=( - "If sub-role name contains dots, replace them with the specified value; " - "default to '_'" - ), -) -parser.add_argument( - "--subrole-prefix", - type=str, - default=os.environ.get("COLLECTION_SUBROLE_PREFIX", ""), - help=( - "If sub-role name does not start with the specified value, " - "change the name to start with the value; default to an empty string" - ), -) -args, unknown = parser.parse_known_args() - -role = args.role -if not role: - parser.print_help() - print("Message: role is not specified.") - os._exit(errno.EINVAL) - -namespace = args.namespace -collection = args.collection -prefix = namespace + "." + collection + "." -top_dest_path = args.dest_path.resolve() -replace_dot = args.replace_dot -subrole_prefix = args.subrole_prefix - -dest_path = Path.joinpath( - top_dest_path, "ansible_collections/" + namespace + "/" + collection -) -os.makedirs(dest_path, exist_ok=True) - -roles_dir = dest_path / "roles" -tests_dir = dest_path / "tests" -plugin_dir = dest_path / "plugins" -modules_dir = plugin_dir / "modules" -module_utils_dir = plugin_dir / "module_utils" -docs_dir = dest_path / "docs" - - def copy_tree_with_replace( - src_path, dest_path, prefix, role, TUPLE, isrole=True, ignoreme=None, symlinks=True + src_path, + dest_path, + role, + TUPLE, + transformer_args, + isrole=True, + ignoreme=None, + symlinks=False, ): """ 1. Copy files and dirs in the dir to @@ -367,45 +574,26 @@ def copy_tree_with_replace( src = src_path / dirname if src.is_dir(): if isrole: - dest = roles_dir / role / dirname + dest = dest_path / "roles" / role / dirname else: dest = dest_path / dirname / role print(f"Copying role {src} to {dest}") if ignoreme: - copytree( + lsr_copytree( src, dest, ignore=ignore_patterns(ignoreme), symlinks=symlinks, + dirs_exist_ok=True, ) else: - copytree(src, dest, symlinks=symlinks) - lsrxfrm = LSRTransformer(dest, False, role, LSRFileTransformer) + lsr_copytree(src, dest, symlinks=symlinks, dirs_exist_ok=True) + lsrxfrm = LSRTransformer( + dest, transformer_args, False, role, LSRFileTransformer + ) lsrxfrm.run() -# Run with --role ROLE -src_path = args.src_path.resolve() / role -if not src_path.exists(): - print(f"Error: {src_path} does not exists.") - sys.exit(errno.ENOENT) -_extras = set(os.listdir(src_path)).difference(ALL_DIRS) -try: - _extras.remove(".git") -except KeyError: - pass -extras = [src_path / e for e in _extras] - -# get role modules - will need to find and convert these to use FQCN -role_modules = get_role_modules(src_path) - -# Role - copy subdirectories, tasks, defaults, vars, etc., in the system role to -# DEST_PATH/ansible_collections/NAMESPACE/COLLECTION/roles/ROLE. -copy_tree_with_replace(src_path, dest_path, prefix, role, ROLE_DIRS) - -# ============================================================================== - - def cleanup_symlinks(path, role): """ Clean up symlinks in tests/roles @@ -428,153 +616,8 @@ def cleanup_symlinks(path, role): roles_dir.rmdir() -copy_tree_with_replace( - src_path, dest_path, prefix, role, TESTS, isrole=False, ignoreme="artifacts" -) - -# remove symlinks in the tests/role, then updating the rolename to the collection format -cleanup_symlinks(tests_dir / role, role) - -# ============================================================================== - - -def update_readme(src_path, filename, rolename, comment, issubrole=False): - if not filename.startswith("README"): - return - if filename == "README.md": - title = rolename - else: - m = re.match(r"README(.*)(\.md)", filename) - title = rolename + m.group(1) - main_doc = dest_path / "README.md" - if not main_doc.exists(): - s = textwrap.dedent( - """\ - # {0} {1} collections - - {2} - - * [{3}](roles/{4}) - - """ - ).format(namespace, collection, comment, title, rolename + "/" + filename) - with open(main_doc, "w") as f: - f.write(s) - else: - with open(main_doc) as f: - s = f.read() - if comment not in s: - text = ( - s - + textwrap.dedent( - """\ - - {2} - - * [{3}](roles/{4}) - - """ - ).format( - namespace, collection, comment, title, rolename + "/" + filename - ) - ) - else: - find = r"({0}\n\n)(( |\*|\w|\[|\]|\(|\)|\.|/|-|\n|\r)+)".format( - comment - ) - replace = r"\1\2 * [{0}](roles/{1})\n".format( - title, rolename + "/" + filename - ) - text = re.sub(find, replace, s, flags=re.M) - with open(main_doc, "w") as f: - f.write(text) - - -# Copy docs, design_docs, and examples to -# DEST_PATH/ansible_collections/NAMESPACE/COLLECTION/docs/ROLE. -# Copy README.md to DEST_PATH/ansible_collections/NAMESPACE/COLLECTION/roles/ROLE. -# Generate a top level README.md which contains links to roles/ROLE/README.md. -def process_readme(src_path, filename, rolename, original=None, issubrole=False): - """ - Copy src_path/filename to dest_path/docs/rolename. - filename could be README.md, README-something.md, or something.md. - Create a primary README.md in dest_path, which points to dest_path/docs/rolename/filename - with the title rolename or rolename-something. - """ - src = src_path / filename - dest = roles_dir / rolename / filename - # copy - print(f"Copying doc {filename} to {dest}") - copy2(src, dest, follow_symlinks=False) - dest = roles_dir / rolename - file_patterns = ["*.md"] - file_replace( - dest, "linux-system-roles." + rolename, prefix + rolename, file_patterns - ) - if original: - file_replace(dest, original, prefix + rolename, file_patterns) - if issubrole: - comment = "## Private Roles" - else: - comment = "## Supported Linux System Roles" - update_readme(src_path, filename, rolename, comment, issubrole) - - -dest = dest_path / "docs" / role -for doc in DOCS: - src = src_path / doc - if src.is_dir(): - print(f"Copying docs {src} to {dest}") - lsr_copytree( - src, - dest, - symlinks=False, - ignore="roles", - dirs_exist_ok=True, - ) - if doc == "examples": - lsrxfrm = LSRTransformer(dest, False, role, LSRFileTransformer) - lsrxfrm.run() - elif src.is_file(): - process_readme(src_path, doc, role) - -# Remove symlinks in the docs/role (e.g., in the examples). -# Update the rolename to the collection format as done in the tests. -cleanup_symlinks(dest, role) - -# ============================================================================== - -# Copy library, module_utils, plugins -# Library and plugins are copied to dest_path/plugins -# If plugin is in SUBDIR (currently, just module_utils), -# module_utils/*.py are to dest_path/plugins/module_utils/ROLE/*.py -# module_utils/subdir/*.py are to dest_path/plugins/module_utils/subdir/*.py -SUBDIR = ("module_utils",) -for plugin in PLUGINS: - src = src_path / plugin - plugin_name = dir_to_plugin(plugin) - if not src.is_dir(): - continue - if plugin in SUBDIR: - for sr in src.iterdir(): - if sr.is_dir(): - # If src/sr is a directory, copy it to the dest - dest = plugin_dir / plugin_name / sr.name - print(f"Copying plugin {sr} to {dest}") - lsr_copytree(sr, dest) - else: - # Otherwise, copy it to the plugins/plugin_name/ROLE - dest = plugin_dir / plugin_name / role - dest.mkdir(parents=True, exist_ok=True) - print(f"Copying plugin {sr} to {dest}") - copy2(sr, dest, follow_symlinks=False) - else: - dest = plugin_dir / plugin_name - print(f"Copying plugin {src} to {dest}") - lsr_copytree(src, dest) - - def gather_module_utils_parts(module_utils_dir): + module_utils = [] if module_utils_dir.is_dir(): for root, dirs, files in os.walk(module_utils_dir): for filename in files: @@ -585,6 +628,7 @@ def gather_module_utils_parts(module_utils_dir): if parts[-1] == b"__init__": del parts[-1] module_utils.append(parts) + return module_utils def import_replace(match): @@ -593,40 +637,47 @@ def import_replace(match): 'import ansible_collections.NAMESPACE.COLLECTION.plugins.module_utils.something ...' is returned to replace. """ + _src_path = config["src_path"] + _namespace = config["namespace"] + _collection = config["collection"] + _role = config["role"] + _module_utils = config["module_utils"] + _additional_rewrites = config["additional_rewrites"] + _module_utils_dir = config["module_utils_dir"] parts = match.group(3).split(b".") match_group3 = match.group(3) - src_module_path = src_path / "module_utils" / match.group(3).decode("utf-8") - dest_module_path0 = module_utils_dir / match.group(3).decode("utf-8") - dest_module_path1 = module_utils_dir / role + src_module_path = _src_path / "module_utils" / match.group(3).decode("utf-8") + dest_module_path0 = _module_utils_dir / match.group(3).decode("utf-8") + dest_module_path1 = _module_utils_dir / _role if len(parts) == 1: if not src_module_path.is_dir() and ( dest_module_path0.is_dir() or dest_module_path1.is_dir() ): - match_group3 = (role + "." + match.group(3).decode("utf-8")).encode() + match_group3 = (_role + "." + match.group(3).decode("utf-8")).encode() parts = match_group3.split(b".") - if parts in module_utils: + if parts in _module_utils: if match.group(1) == b"import" and match.group(4) == b"": - additional_rewrites.append(parts) - if src_module_path.exists() or Path(str(src_module_path + ".py")).exists(): + _additional_rewrites.append(parts) + if src_module_path.exists() or Path(str(src_module_path) + ".py").exists(): return b"import ansible_collections.%s.%s.plugins.module_utils.%s" % ( - bytes(namespace, "utf-8"), - bytes(collection, "utf-8"), + bytes(_namespace, "utf-8"), + bytes(_collection, "utf-8"), match_group3, ) else: return ( b"import ansible_collections.%s.%s.plugins.module_utils.%s as %s" % ( - bytes(namespace, "utf-8"), - bytes(collection, "utf-8"), + bytes(_namespace, "utf-8"), + bytes(_collection, "utf-8"), match_group3, parts[-1], ) ) return b"%s ansible_collections.%s.%s.plugins.module_utils.%s%s" % ( match.group(1), - bytes(namespace, "utf-8"), - bytes(collection, "utf-8"), + bytes(_namespace, "utf-8"), + bytes(_collection, "utf-8"), match_group3, match.group(4), ) @@ -634,7 +685,7 @@ def import_replace(match): def get_candidates(parts3, parts5): - from_file0 = module_utils_dir + from_file0 = config["dest_path"] / "plugins" / "module_utils" for p3 in parts3: from_file0 = from_file0 / p3.decode("utf-8") from_file1 = from_file0 @@ -672,6 +723,12 @@ def from_replace(match): - group4 - ( if any - group5 - identifier """ + _src_path = config["src_path"] + _namespace = config["namespace"] + _collection = config["collection"] + _role = config["role"] + _module_utils = config["module_utils"] + _module_utils_dir = config["module_utils_dir"] try: parts3 = match.group(3).split(b".") except AttributeError: @@ -686,15 +743,15 @@ def from_replace(match): # If latter, match.group(3) has to be converted to b'ROLE.module'. match_group3 = match.group(3) if len(parts3) == 1: - src_module_path = src_path / "module_utils" / match.group(3).decode("utf-8") - dest_module_path0 = module_utils_dir / match.group(3).decode("utf-8") - dest_module_path1 = module_utils_dir / role + src_module_path = _src_path / "module_utils" / match.group(3).decode("utf-8") + dest_module_path0 = _module_utils_dir / match.group(3).decode("utf-8") + dest_module_path1 = _module_utils_dir / _role if not src_module_path.is_dir() and ( dest_module_path0.is_dir() or dest_module_path1.is_dir() ): - match_group3 = (role + "." + match.group(3).decode("utf-8")).encode() + match_group3 = (_role + "." + match.group(3).decode("utf-8")).encode() parts3 = match_group3.split(b".") - if parts3 in module_utils: + if parts3 in _module_utils: from_file0, lfrom_file0, from_file1, lfrom_file1 = get_candidates( parts3, parts5 ) @@ -708,8 +765,8 @@ def from_replace(match): b"%s ansible_collections.%s.%s.plugins.module_utils.%s import %s%s" % ( match.group(1), - bytes(namespace, "utf-8"), - bytes(collection, "utf-8"), + bytes(_namespace, "utf-8"), + bytes(_collection, "utf-8"), match_group3, match.group(4), match.group(5), @@ -718,13 +775,13 @@ def from_replace(match): else: return b"%s ansible_collections.%s.%s.plugins.module_utils.%s.__init__ import %s%s" % ( match.group(1), - bytes(namespace, "utf-8"), - bytes(collection, "utf-8"), + bytes(_namespace, "utf-8"), + bytes(_collection, "utf-8"), match_group3, match.group(4), match.group(5), ) - if parts5 in module_utils: + if parts5 in _module_utils: from_file0, lfrom_file0, from_file1, lfrom_file1 = get_candidates( parts3, parts5 ) @@ -739,8 +796,8 @@ def from_replace(match): b"%s ansible_collections.%s.%s.plugins.module_utils.%s import %s%s" % ( match.group(1), - bytes(namespace, "utf-8"), - bytes(collection, "utf-8"), + bytes(_namespace, "utf-8"), + bytes(_collection, "utf-8"), match.group(3), match.group(4), match.group(5), @@ -749,8 +806,8 @@ def from_replace(match): else: return b"%s ansible_collections.%s.%s.plugins.module_utils.%s.__init__ import %s%s" % ( match.group(1), - bytes(namespace, "utf-8"), - bytes(collection, "utf-8"), + bytes(_namespace, "utf-8"), + bytes(_collection, "utf-8"), match.group(3), match.group(4), match.group(5), @@ -763,50 +820,22 @@ def from_replace(match): ): return b"%s ansible_collections.%s.%s.plugins.module_utils import %s%s" % ( match.group(1), - bytes(namespace, "utf-8"), - bytes(collection, "utf-8"), + bytes(_namespace, "utf-8"), + bytes(_collection, "utf-8"), match.group(4), match.group(5), ) else: return b"%s ansible_collections.%s.%s.plugins.module_utils.__init__ import %s%s" % ( match.group(1), - bytes(namespace, "utf-8"), - bytes(collection, "utf-8"), + bytes(_namespace, "utf-8"), + bytes(_collection, "utf-8"), match.group(4), match.group(5), ) return match.group(0) -# Update the python codes which import modules in plugins/{modules,modules_dir}. -additional_rewrites = [] -module_utils = [] -gather_module_utils_parts(module_utils_dir) -for rewrite_dir in (module_utils_dir, modules_dir): - if rewrite_dir.is_dir(): - for root, dirs, files in os.walk(rewrite_dir): - for filename in files: - if os.path.splitext(filename)[1] != ".py": - continue - full_path = Path(root) / filename - text = full_path.read_bytes() - new_text = IMPORT_RE.sub(import_replace, text) - new_text = FROM_RE.sub(from_replace, new_text) - for rewrite in additional_rewrites: - pattern = re.compile( - re.escape(br"ansible.module_utils.%s" % b".".join(rewrite)) - ) - new_text = pattern.sub(rewrite[-1], new_text) - - if text != new_text: - print("Rewriting imports for {}".format(full_path)) - full_path.write_bytes(new_text) - additional_rewrites[:] = [] - -# ============================================================================== - - def add_rolename(filename, rolename): """ A file with an extension, e.g., README.md is converted to README-rolename.md @@ -823,88 +852,403 @@ def add_rolename(filename, rolename): return with_rolename -# Before handling extra files, clean up tox/travis files. -for tox in TOX: - tox_obj = dest_path / tox - if tox_obj.is_dir(): - rmtree(tox_obj) - elif tox_obj.exists(): - tox_obj.unlink() -# Extra files and directories including the sub-roles -for extra in extras: - if extra.name.endswith(".md"): - # E.g., contributing.md, README-devel.md and README-testing.md - process_readme(extra.parent, extra.name, role) - elif extra.is_dir(): - # Copying sub-roles to the roles dir and its tests and README are also - # handled in the same way as the parent role's are. - if extra.name == "roles": - for sr in extra.iterdir(): - # If a role name contains '.', replace it with replace_dot - # convert nested subroles to prefix name with subrole_prefix. - dr = sr.name.replace(".", replace_dot) - if subrole_prefix and not dr.startswith(subrole_prefix): - dr = subrole_prefix + dr - copy_tree_with_replace(sr, dest_path, prefix, dr, ROLE_DIRS) - # copy tests dir to dest_path/"tests" - copy_tree_with_replace( - sr, dest_path, prefix, dr, TESTS, isrole=False, ignoreme="artifacts" +config = {} + + +def main(): + HOME = os.environ.get("HOME") + parser = argparse.ArgumentParser() + parser.add_argument( + "--namespace", + type=str, + default=os.environ.get("COLLECTION_NAMESPACE", "fedora"), + help="Collection namespace; default to fedora", + ) + parser.add_argument( + "--collection", + type=str, + default=os.environ.get("COLLECTION_NAME", "system_roles"), + help="Collection name; default to system_roles", + ) + parser.add_argument( + "--dest-path", + type=Path, + default=os.environ.get("COLLECTION_DEST_PATH", HOME + "/.ansible/collections"), + help="Path to parent of collection where role should be migrated", + ) + parser.add_argument( + "--src-path", + type=Path, + default=os.environ.get("COLLECTION_SRC_PATH", HOME + "/linux-system-roles"), + help="Path to the parent directory of the source role; default to ${HOME}/linux-system-roles", + ) + parser.add_argument( + "--role", + type=str, + default=os.environ.get("COLLECTION_ROLE"), + help="Role to convert to collection", + ) + parser.add_argument( + "--replace-dot", + type=str, + default=os.environ.get("COLLECTION_REPLACE_DOT", "_"), + help=( + "If sub-role name contains dots, replace them with the specified value; " + "default to '_'" + ), + ) + parser.add_argument( + "--subrole-prefix", + type=str, + default=os.environ.get("COLLECTION_SUBROLE_PREFIX", ""), + help=( + "If sub-role name does not start with the specified value, " + "change the name to start with the value; default to an empty string" + ), + ) + args, unknown = parser.parse_known_args() + + role = args.role + if not role: + parser.print_help() + print("Message: role is not specified.") + os._exit(errno.EINVAL) + + namespace = args.namespace + collection = args.collection + prefix = namespace + "." + collection + "." + top_dest_path = args.dest_path.resolve() + replace_dot = args.replace_dot + subrole_prefix = args.subrole_prefix + + dest_path = Path.joinpath( + top_dest_path, "ansible_collections/" + namespace + "/" + collection + ) + os.makedirs(dest_path, exist_ok=True) + + roles_dir = dest_path / "roles" + tests_dir = dest_path / "tests" + plugin_dir = dest_path / "plugins" + modules_dir = plugin_dir / "modules" + module_utils_dir = plugin_dir / "module_utils" + docs_dir = dest_path / "docs" + + # Run with --role ROLE + src_path = args.src_path.resolve() / role + if not src_path.exists(): + print(f"Error: {src_path} does not exists.") + sys.exit(errno.ENOENT) + _extras = set(os.listdir(src_path)).difference(ALL_DIRS) + try: + _extras.remove(".git") + except KeyError: + pass + extras = [src_path / e for e in _extras] + + global config + config = { + "namespace": namespace, + "collection": collection, + "role": role, + "src_path": src_path, + "dest_path": dest_path, + "module_utils_dir": module_utils_dir, + } + + transformer_args = { + "prefix": prefix, + "subrole_prefix": subrole_prefix, + "replace_dot": replace_dot, + # get role modules - will need to find and convert these to use FQCN + "role_modules": get_role_modules(src_path), + } + + # Role - copy subdirectories, tasks, defaults, vars, etc., in the system role to + # DEST_PATH/ansible_collections/NAMESPACE/COLLECTION/roles/ROLE. + copy_tree_with_replace(src_path, dest_path, role, ROLE_DIRS, transformer_args) + + # ============================================================================== + + copy_tree_with_replace( + src_path, + dest_path, + role, + TESTS, + transformer_args, + isrole=False, + ignoreme="artifacts", + symlinks=True, + ) + + # remove symlinks in the tests/role, then updating the rolename to the collection format + cleanup_symlinks(tests_dir / role, role) + + # ============================================================================== + + def update_readme(src_path, filename, rolename, comment, issubrole=False): + if not filename.startswith("README"): + return + if filename == "README.md": + title = rolename + else: + m = re.match(r"README(.*)(\.md)", filename) + title = rolename + m.group(1) + main_doc = dest_path / "README.md" + if not main_doc.exists(): + s = textwrap.dedent( + """\ + # {0} {1} collections + + {2} + + * [{3}](roles/{4}) + + """ + ).format(namespace, collection, comment, title, rolename + "/" + filename) + with open(main_doc, "w") as f: + f.write(s) + else: + with open(main_doc) as f: + s = f.read() + if comment not in s: + text = ( + s + + textwrap.dedent( + """\ + + {2} + + * [{3}](roles/{4}) + + """ + ).format( + namespace, collection, comment, title, rolename + "/" + filename + ) + ) + else: + find = ( + r"({0}\n\n)(( |\*|\w|\[|\]|\(|\)|\.|/|-|\n|\r)+)".format( + comment + ) + ) + replace = r"\1\2 * [{0}](roles/{1})\n".format( + title, rolename + "/" + filename + ) + text = re.sub(find, replace, s, flags=re.M) + with open(main_doc, "w") as f: + f.write(text) + + # Copy docs, design_docs, and examples to + # DEST_PATH/ansible_collections/NAMESPACE/COLLECTION/docs/ROLE. + # Copy README.md to DEST_PATH/ansible_collections/NAMESPACE/COLLECTION/roles/ROLE. + # Generate a top level README.md which contains links to roles/ROLE/README.md. + def process_readme(src_path, filename, rolename, original=None, issubrole=False): + """ + Copy src_path/filename to dest_path/docs/rolename. + filename could be README.md, README-something.md, or something.md. + Create a primary README.md in dest_path, which points to dest_path/docs/rolename/filename + with the title rolename or rolename-something. + """ + src = src_path / filename + dest = roles_dir / rolename / filename + # copy + print(f"Copying doc {filename} to {dest}") + copy2(src, dest, follow_symlinks=False) + dest = roles_dir / rolename + file_patterns = ["*.md"] + file_replace( + dest, "linux-system-roles." + rolename, prefix + rolename, file_patterns + ) + if original: + file_replace(dest, original, prefix + rolename, file_patterns) + if issubrole: + comment = "## Private Roles" + else: + comment = "## Supported Linux System Roles" + update_readme(src_path, filename, rolename, comment, issubrole) + + dest = docs_dir / role + for doc in DOCS: + src = src_path / doc + if src.is_dir(): + print(f"Copying docs {src} to {dest}") + lsr_copytree( + src, + dest, + symlinks=False, + ignore=ignore_patterns("roles"), + dirs_exist_ok=True, + ) + if doc == "examples": + lsrxfrm = LSRTransformer( + dest, + transformer_args, + False, + role, + LSRFileTransformer, ) - # remove symlinks in the tests/role, then updating the rolename to the collection format - cleanup_symlinks(tests_dir / dr, dr) - # copy README.md to dest_path/roles/sr.name - readme = sr / "README.md" - if readme.is_file(): - process_readme( - sr, "README.md", dr, original=sr.name, issubrole=True + lsrxfrm.run() + elif src.is_file(): + process_readme(src_path, doc, role) + + # Remove symlinks in the docs/role (e.g., in the examples). + # Update the rolename to the collection format as done in the tests. + cleanup_symlinks(dest, role) + + # ============================================================================== + + # Copy library, module_utils, plugins + # Library and plugins are copied to dest_path/plugins + # If plugin is in SUBDIR (currently, just module_utils), + # module_utils/*.py are to dest_path/plugins/module_utils/ROLE/*.py + # module_utils/subdir/*.py are to dest_path/plugins/module_utils/subdir/*.py + SUBDIR = ("module_utils",) + for plugin in PLUGINS: + src = src_path / plugin + plugin_name = dir_to_plugin(plugin) + if not src.is_dir(): + continue + if plugin in SUBDIR: + for sr in src.iterdir(): + if sr.is_dir(): + # If src/sr is a directory, copy it to the dest + dest = plugin_dir / plugin_name / sr.name + print(f"Copying plugin {sr} to {dest}") + lsr_copytree(sr, dest) + else: + # Otherwise, copy it to the plugins/plugin_name/ROLE + dest = plugin_dir / plugin_name / role + dest.mkdir(parents=True, exist_ok=True) + print(f"Copying plugin {sr} to {dest}") + copy2(sr, dest, follow_symlinks=False) + else: + dest = plugin_dir / plugin_name + print(f"Copying plugin {src} to {dest}") + lsr_copytree(src, dest) + + # Update the python codes which import modules in plugins/{modules,modules_dir}. + config["module_utils"] = gather_module_utils_parts(module_utils_dir) + additional_rewrites = [] + config["additional_rewrites"] = additional_rewrites + for rewrite_dir in (module_utils_dir, modules_dir): + if rewrite_dir.is_dir(): + for root, dirs, files in os.walk(rewrite_dir): + for filename in files: + if os.path.splitext(filename)[1] != ".py": + continue + full_path = Path(root) / filename + text = full_path.read_bytes() + new_text = IMPORT_RE.sub(import_replace, text) + new_text = FROM_RE.sub(from_replace, new_text) + for rewrite in additional_rewrites: + pattern = re.compile( + re.escape(br"ansible.module_utils.%s" % b".".join(rewrite)) + ) + new_text = pattern.sub(rewrite[-1], new_text) + + if text != new_text: + print("Rewriting imports for {}".format(full_path)) + full_path.write_bytes(new_text) + additional_rewrites[:] = [] + + # ============================================================================== + + # Before handling extra files, clean up tox/travis files. + for tox in TOX: + tox_obj = dest_path / tox + if tox_obj.is_dir(): + rmtree(tox_obj) + elif tox_obj.exists(): + tox_obj.unlink() + # Extra files and directories including the sub-roles + for extra in extras: + if extra.name.endswith(".md"): + # E.g., contributing.md, README-devel.md and README-testing.md + process_readme(extra.parent, extra.name, role) + elif extra.is_dir(): + # Copying sub-roles to the roles dir and its tests and README are also + # handled in the same way as the parent role's are. + if extra.name == "roles": + for sr in extra.iterdir(): + # If a role name contains '.', replace it with replace_dot + # convert nested subroles to prefix name with subrole_prefix. + dr = sr.name.replace(".", replace_dot) + if subrole_prefix and not dr.startswith(subrole_prefix): + dr = subrole_prefix + dr + copy_tree_with_replace( + sr, dest_path, dr, ROLE_DIRS, transformer_args + ) + # copy tests dir to dest_path/"tests" + copy_tree_with_replace( + sr, + dest_path, + dr, + TESTS, + transformer_args, + isrole=False, + ignoreme="artifacts", ) - if sr.name != dr: - # replace "sr.name" with "dr" in role_dir - dirs = ["roles", "docs", "tests"] - for dir in dirs: - role_dir = dest_path / dir - file_patterns = ["*.yml", "*.md"] - file_replace( - role_dir, - re.escape("\b" + sr.name + "\b"), - dr, - file_patterns, + # remove symlinks in the tests/role, then updating the rolename to the collection format + cleanup_symlinks(tests_dir / dr, dr) + # copy README.md to dest_path/roles/sr.name + readme = sr / "README.md" + if readme.is_file(): + process_readme( + sr, "README.md", dr, original=sr.name, issubrole=True ) - # Other extra directories are copied to the collection dir as they are. + if sr.name != dr: + # replace "sr.name" with "dr" in role_dir + dirs = ["roles", "docs", "tests"] + for dir in dirs: + role_dir = dest_path / dir + file_patterns = ["*.yml", "*.md"] + file_replace( + role_dir, + re.escape("\b" + sr.name + "\b"), + dr, + file_patterns, + ) + # Other extra directories are copied to the collection dir as they are. + else: + dest = dest_path / extra.name + print(f"Copying extra {extra} to {dest}") + copytree(extra, dest) + # Other extra files. else: - dest = dest_path / extra.name + if extra.name.endswith(".yml") and "playbook" in extra.name: + # some-playbook.yml is copied to playbooks/role dir. + dest = dest_path / "playbooks" / role + dest.mkdir(parents=True, exist_ok=True) + elif extra.name in TOX: + # If the file in the TOX tuple, it is copied to the collection dir as it is. + dest = dest_path / extra.name + else: + # If the extra file 'filename' has no extension, it is copied to the collection dir as + # 'filename-ROLE'. If the extra file is 'filename.ext', it is copied to 'filename-ROLE.ext'. + dest = dest_path / add_rolename(extra.name, role) print(f"Copying extra {extra} to {dest}") - copytree(extra, dest) - # Other extra files. - else: - if extra.name.endswith(".yml") and "playbook" in extra.name: - # some-playbook.yml is copied to playbooks/role dir. - dest = dest_path / "playbooks" / role - dest.mkdir(parents=True, exist_ok=True) - elif extra.name in TOX: - # If the file in the TOX tuple, it is copied to the collection dir as it is. - dest = dest_path / extra.name - else: - # If the extra file 'filename' has no extension, it is copied to the collection dir as - # 'filename-ROLE'. If the extra file is 'filename.ext', it is copied to 'filename-ROLE.ext'. - dest = dest_path / add_rolename(extra.name, role) - print(f"Copying extra {extra} to {dest}") - copy2(extra, dest, follow_symlinks=False) - -dest = dest_path / "playbooks" / role -if dest.is_dir(): - lsrxfrm = LSRTransformer(dest, False, role, LSRFileTransformer) - lsrxfrm.run() - -default_collections_paths = "~/.ansible/collections:/usr/share/ansible/collections" -default_collections_paths_list = list( - map(os.path.expanduser, default_collections_paths.split(":")) -) -current_dest = os.path.expanduser(str(top_dest_path)) -# top_dest_path is not in the default collections path. -# suggest to run ansible-playbook with ANSIBLE_COLLECTIONS_PATHS env var. -if current_dest not in default_collections_paths_list: - ansible_collections_paths = current_dest + ":" + default_collections_paths - print( - f"Run ansible-playbook with environment variable ANSIBLE_COLLECTIONS_PATHS={ansible_collections_paths}" + copy2(extra, dest, follow_symlinks=False) + + dest = dest_path / "playbooks" / role + if dest.is_dir(): + lsrxfrm = LSRTransformer( + dest, transformer_args, False, role, LSRFileTransformer + ) + lsrxfrm.run() + + default_collections_paths = "~/.ansible/collections:/usr/share/ansible/collections" + default_collections_paths_list = list( + map(os.path.expanduser, default_collections_paths.split(":")) ) + current_dest = os.path.expanduser(str(top_dest_path)) + # top_dest_path is not in the default collections path. + # suggest to run ansible-playbook with ANSIBLE_COLLECTIONS_PATHS env var. + if current_dest not in default_collections_paths_list: + ansible_collections_paths = current_dest + ":" + default_collections_paths + print( + f"Run ansible-playbook with environment variable ANSIBLE_COLLECTIONS_PATHS={ansible_collections_paths}" + ) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/sync-template.sh b/sync-template.sh index e71e5cf3..57924c5c 100755 --- a/sync-template.sh +++ b/sync-template.sh @@ -41,6 +41,7 @@ FILES=( '--copy-if-missing=.travis/custom.sh' '--copy=.travis/preinstall' '--copy=.travis/runblack.sh' + '--copy=.travis/runcollection.sh' '--copy=.travis/runcoveralls.sh' '--copy=.travis/runflake8.sh' '--copy=.travis/runpylint.sh' diff --git a/tests/unit/test_lsr_role2collection.py b/tests/unit/test_lsr_role2collection.py index 0b4cd94d..362bcbd5 100644 --- a/tests/unit/test_lsr_role2collection.py +++ b/tests/unit/test_lsr_role2collection.py @@ -17,6 +17,7 @@ from_replace, gather_module_utils_parts, add_rolename, + config, ) src_path = os.environ.get("COLLECTION_SRC_PATH", "/var/tmp/linux-system-roles") @@ -24,6 +25,8 @@ namespace = os.environ.get("COLLECTION_NAMESPACE", "fedora") collection_name = os.environ.get("COLLECTION_NAME", "system_roles") rolename = "systemrole" +prefix = namespace + "." + collection_name +prefixdot = prefix + "." test_yaml_str = textwrap.dedent( """\ @@ -37,8 +40,6 @@ {1} {2}{3}{4} """ ) -prefix = namespace + "." + collection_name -prefixdot = prefix + "." class LSRRole2Collection(unittest.TestCase): @@ -260,8 +261,14 @@ def test_copy_tree_with_replace(self): self.create_test_tree( role_path / "tasks", test_yaml_str, pre_params, ".yml", is_vertical=False ) + transformer_args = { + "prefix": prefixdot, + "subrole_prefix": "", + "replace_dot": "_", + "role_modules": set(), + } copy_tree_with_replace( - role_path, coll_path, prefixdot, rolename, MYTUPLE, isrole=True + role_path, coll_path, rolename, MYTUPLE, transformer_args, isrole=True ) test_path = coll_path / "roles" / rolename / "tasks" self.check_test_tree( @@ -270,7 +277,7 @@ def test_copy_tree_with_replace(self): shutil.rmtree(coll_path) def test_cleanup_symlinks(self): - """test copy_tree_with_replace""" + """test cleanup_symlinks""" params = [ { @@ -324,14 +331,10 @@ def test_import_replace(self): module_name = "util0" src_module_dir = Path(src_path) / rolename / "module_utils" / module_name src_module_dir.mkdir(parents=True, exist_ok=True) - dest_module_dir_core = ( - Path(dest_path) - / "ansible_collections" - / namespace - / collection_name - / "plugins" - / "module_utils" + dest_base_dir = ( + Path(dest_path) / "ansible_collections" / namespace / collection_name ) + dest_module_dir_core = dest_base_dir / "plugins" / "module_utils" dest_module_dir = dest_module_dir_core / module_name dest_module_dir.mkdir(parents=True, exist_ok=True) input = bytes( @@ -349,6 +352,20 @@ def test_import_replace(self): IMPORT_RE = re.compile( br"(\bimport) (ansible\.module_utils\.)(\S+)(.*)$", flags=re.M ) + config["namespace"] = namespace + config["collection"] = collection_name + config["role"] = rolename + config["src_path"] = Path(src_path) / rolename + config["dest_path"] = dest_base_dir + config["module_utils_dir"] = dest_module_dir_core + config["module_utils"] = [ + [b"util0"], + [b"util0", b"test3"], + [b"util0", b"test2"], + [b"util0", b"test1"], + [b"util0", b"test0"], + ] + config["additional_rewrites"] = [] output = IMPORT_RE.sub(import_replace, input) self.assertEqual(output, expected) shutil.rmtree(src_module_dir) @@ -358,14 +375,10 @@ def test_from_replace(self): module_name = "util0" src_module_dir = Path(src_path) / rolename / "module_utils" / module_name src_module_dir.mkdir(parents=True, exist_ok=True) - dest_module_dir_core = ( - Path(dest_path) - / "ansible_collections" - / namespace - / collection_name - / "plugins" - / "module_utils" + dest_base_dir = ( + Path(dest_path) / "ansible_collections" / namespace / collection_name ) + dest_module_dir_core = dest_base_dir / "plugins" / "module_utils" dest_module_dir = dest_module_dir_core / module_name dest_module_dir.mkdir(parents=True, exist_ok=True) @@ -429,6 +442,20 @@ def test_from_replace(self): br"(\bfrom) (ansible\.module_utils\.?)(\S+)? import (\(*(?:\n|\r\n)?)(.+)$", flags=re.M, ) + config["namespace"] = namespace + config["collection"] = collection_name + config["role"] = rolename + config["src_path"] = Path(src_path) / rolename + config["dest_path"] = dest_base_dir + config["module_utils_dir"] = dest_module_dir_core + config["module_utils"] = [ + [b"util0"], + [b"util0", b"test3"], + [b"util0", b"test2"], + [b"util0", b"test1"], + [b"util0", b"test0"], + ] + config["additional_rewrites"] = [] output = FROM_RE.sub(from_replace, input) self.assertEqual(output, expected) shutil.rmtree(src_module_dir)