From 4f3b8dea7d8957fa3fe393b5f8ff7acedfeb0277 Mon Sep 17 00:00:00 2001 From: Matthias Zepper Date: Tue, 25 Apr 2023 14:18:58 +0200 Subject: [PATCH] Refactor the CLI commands for the Singularity Cache Dir --- nf_core/__main__.py | 36 +++++- nf_core/download.py | 287 +++++++++++++++++++++++++++++--------------- 2 files changed, 220 insertions(+), 103 deletions(-) diff --git a/nf_core/__main__.py b/nf_core/__main__.py index e6ec0cc164..071040a399 100644 --- a/nf_core/__main__.py +++ b/nf_core/__main__.py @@ -233,11 +233,30 @@ def launch(pipeline, id, revision, command_only, params_in, params_out, save_all "-c", "--container", type=click.Choice(["none", "singularity"]), help="Download software container images" ) @click.option( - "--singularity-cache-only/--singularity-cache-copy", - help="Don't / do copy images to the output directory and set 'singularity.cacheDir' in workflow", + "-s", + "--singularity-cache", + type=click.Choice(["amend", "copy", "remote"]), + help="Utilize the 'singularity.cacheDir' in the download process, if applicable.", +) +@click.option( + "-i", + "--singularity-cache-index", + type=str, + help="List of images already available in a remote 'singularity.cacheDir', imposes --singularity-cache=remote", ) @click.option("-p", "--parallel-downloads", type=int, default=4, help="Number of parallel image downloads") -def download(pipeline, revision, outdir, compress, force, tower, container, singularity_cache_only, parallel_downloads): +def download( + pipeline, + revision, + outdir, + compress, + force, + tower, + container, + singularity_cache, + singularity_cache_index, + parallel_downloads, +): """ Download a pipeline, nf-core/configs and pipeline singularity images. @@ -245,7 +264,16 @@ def download(pipeline, revision, outdir, compress, force, tower, container, sing workflow to use relative paths to the configs and singularity images. """ dl = nf_core.download.DownloadWorkflow( - pipeline, revision, outdir, compress, force, tower, container, singularity_cache_only, parallel_downloads + pipeline, + revision, + outdir, + compress, + force, + tower, + container, + singularity_cache, + singularity_cache_index, + parallel_downloads, ) dl.download_workflow() diff --git a/nf_core/download.py b/nf_core/download.py index ca5d64fc46..cf7e34eba2 100644 --- a/nf_core/download.py +++ b/nf_core/download.py @@ -77,7 +77,7 @@ class DownloadWorkflow: Args: pipeline (str): A nf-core pipeline name. revision (List[str]): The workflow revision to download, like `1.0`. Defaults to None. - singularity (bool): Flag, if the Singularity container should be downloaded as well. Defaults to False. + container (bool): Flag, if the Singularity container should be downloaded as well. Defaults to False. tower (bool): Flag, to customize the download for Nextflow Tower (convert to git bare repo). Defaults to False. outdir (str): Path to the local download directory. Defaults to None. """ @@ -91,7 +91,8 @@ def __init__( force=False, tower=False, container=None, - singularity_cache_only=False, + singularity_cache=None, + singularity_cache_index=None, parallel_downloads=4, ): self.pipeline = pipeline @@ -106,9 +107,12 @@ def __init__( self.compress_type = compress_type self.force = force self.tower = tower - self.include_configs = True + self.include_configs = None self.container = container - self.singularity_cache_only = singularity_cache_only + self.singularity_cache = ( + singularity_cache if not singularity_cache_index else "remote" + ) # if a singularity_cache_index is given, use the file and overrule choice. + self.singularity_cache_index = singularity_cache_index self.parallel_downloads = parallel_downloads self.wf_revisions = {} @@ -117,6 +121,7 @@ def __init__( self.wf_download_url = {} self.nf_config = {} self.containers = [] + self.containers_remote = [] # stores the remote images provided in the file. # Fetch remote workflows self.wfs = nf_core.list.Workflows() @@ -134,11 +139,16 @@ def download_workflow(self): self.prompt_revision() self.get_revision_hash() # Inclusion of configs is unnecessary for Tower. - if not self.tower: + if not self.tower and self.include_configs is None: self.prompt_config_inclusion() - self.prompt_container_download() - self.prompt_use_singularity_cachedir() - self.prompt_singularity_cachedir_only() + if not self.singularity_cache == "remote": + self.prompt_container_download() + self.prompt_singularity_cachedir_creation() + else: + self.container = "singularity" + self.prompt_singularity_cachedir_utilization() + self.prompt_singularity_cachedir_remote(retry=False) + self.read_remote_containers() # Nothing meaningful to compress here. if not self.tower: self.prompt_compression_type() @@ -220,9 +230,6 @@ def download_workflow_classic(self): if self.container == "singularity": self.find_container_images(revision_dirname) - # Download the singularity images - if self.container == "singularity": - log.info(f"Found {len(self.containers)} container{'s' if len(self.containers) > 1 else ''}") try: self.get_singularity_images() except OSError as e: @@ -260,8 +267,6 @@ def download_workflow_tower(self): # Collect all required singularity images self.find_container_images(self.workflow_repo.access()) - # Download the singularity images - log.info(f"Found {len(self.containers)} container{'s' if len(self.containers) > 1 else ''}") try: self.get_singularity_images() except OSError as e: @@ -280,24 +285,27 @@ def prompt_pipeline_name(self): self.pipeline = nf_core.utils.prompt_remote_pipeline_name(self.wfs) def prompt_revision(self): - """Prompt for pipeline revision / branch""" - # Prompt user for revision tag if '--revision' was not set - # If --tower is specified, allow to select multiple revisions - + """ + Prompt for pipeline revision / branch + Prompt user for revision tag if '--revision' was not set + If --tower is specified, allow to select multiple revisions + Also the classic download allows for multiple revisions, but + """ if not bool(self.revision): (choice, tag_set) = nf_core.utils.prompt_pipeline_release_branch( self.wf_revisions, self.wf_branches, multiple=self.tower ) + """ + The checkbox() prompt unfortunately does not support passing a Validator, + so a user who keeps pressing Enter will flounder past the selection without choice. - # The checkbox() prompt unfortunately does not support passing a Validator, - # so a user who keeps pressing Enter will bump through the selection without choice. - - # bool(choice), bool(tag_set): + bool(choice), bool(tag_set): ############################# - # True, True: A choice was made and revisions were available. - # False, True: No selection was made, but revisions were available -> defaults to all available. - # False, False: No selection was made because no revisions were available -> raise AssertionError. - # True, False: Congratulations, you found a bug! That combo shouldn't happen. + True, True: A choice was made and revisions were available. + False, True: No selection was made, but revisions were available -> defaults to all available. + False, False: No selection was made because no revisions were available -> raise AssertionError. + True, False: Congratulations, you found a bug! That combo shouldn't happen. + """ if bool(choice): # have to make sure that self.revision is a list of strings, regardless if choice is str or list of strings. @@ -351,10 +359,14 @@ def get_revision_hash(self): def prompt_config_inclusion(self): """Prompt for inclusion of institutional configurations""" - self.include_configs = questionary.confirm( - "Include the nf-core's default institutional configuration files into the download?", - style=nf_core.utils.nfcore_question_style, - ).ask() + if stderr.is_interactive: # Use rich auto-detection of interactive shells + self.include_configs = questionary.confirm( + "Include the nf-core's default institutional configuration files into the download?", + style=nf_core.utils.nfcore_question_style, + ).ask() + else: + self.include_configs = False + # do not include by default. def prompt_container_download(self): """Prompt whether to download container images or not""" @@ -367,7 +379,7 @@ def prompt_container_download(self): style=nf_core.utils.nfcore_question_style, ).unsafe_ask() - def prompt_use_singularity_cachedir(self): + def prompt_singularity_cachedir_creation(self): """Prompt about using $NXF_SINGULARITY_CACHEDIR if not already set""" if ( self.container == "singularity" @@ -381,6 +393,7 @@ def prompt_use_singularity_cachedir(self): if rich.prompt.Confirm.ask( "[blue bold]?[/] [bold]Define [blue not bold]$NXF_SINGULARITY_CACHEDIR[/] for a shared Singularity image download folder?[/]" ): + self.singularity_cache == "amend" # Prompt user for a cache directory path cachedir_path = None while cachedir_path is None: @@ -425,25 +438,89 @@ def prompt_use_singularity_cachedir(self): "You will need reload your terminal after the download completes for this to take effect." ) - def prompt_singularity_cachedir_only(self): + def prompt_singularity_cachedir_utilization(self): """Ask if we should *only* use $NXF_SINGULARITY_CACHEDIR without copying into target""" if ( - self.singularity_cache_only is None + self.singularity_cache is None # no choice regarding singularity cache has been made. and self.container == "singularity" and os.environ.get("NXF_SINGULARITY_CACHEDIR") is not None ): stderr.print( - "\nIf you are working on the same system where you will run Nextflow, you can leave the downloaded images in the " - "[blue not bold]$NXF_SINGULARITY_CACHEDIR[/] folder, Nextflow will automatically find them. " + "\nIf you are working on the same system where you will run Nextflow, you can amend the downloaded images to the ones in the" + "[blue not bold]$NXF_SINGULARITY_CACHEDIR[/] folder, Nextflow will automatically find them." "However if you will transfer the downloaded files to a different system then they should be copied to the target folder." ) - self.singularity_cache_only = rich.prompt.Confirm.ask( - "[blue bold]?[/] [bold]Copy singularity images from [blue not bold]$NXF_SINGULARITY_CACHEDIR[/] to the target folder?[/]" + self.singularity_cache = rich.prompt.Prompt.ask( + "[blue bold]?[/] [bold]Copy singularity images from [blue not bold]$NXF_SINGULARITY_CACHEDIR[/] to the target folder or amend new images to the collection?[/]", + choices=["amend", "copy"], ) - # Sanity check, for when passed as a cli flag - if self.singularity_cache_only and self.container != "singularity": - raise AssertionError("Command has '--singularity-cache-only' set, but '--container' is not 'singularity'") + def prompt_singularity_cachedir_remote(self, retry): + """Prompt about the index of a remote $NXF_SINGULARITY_CACHEDIR""" + if ( + self.container == "singularity" + and self.singularity_cache == "remote" + and self.singularity_cache_index is None + and stderr.is_interactive # Use rich auto-detection of interactive shells + ): + stderr.print( + "\nNextflow and nf-core can use an environment variable called [blue]$NXF_SINGULARITY_CACHEDIR[/] that is a path to a directory where remote Singularity images are stored. " + "This allows downloaded images to be cached in a central location." + ) + # Prompt user for a file listing the contents of the remote cache directory + cachedir_index = None + while cachedir_index is None: + prompt_cachedir_index = questionary.path( + "Specify a list of the remote images already present in the remote system :", + file_filter="*.txt", + style=nf_core.utils.nfcore_question_style, + ).unsafe_ask() + cachedir_index = os.path.abspath(os.path.expanduser(prompt_cachedir_index)) + if prompt_cachedir_index == "": + log.error("Will disregard contents of a remote [blue]$NXF_SINGULARITY_CACHEDIR[/]") + self.singularity_cache_index = None + self.singularity_cache = "copy" + elif not os.access(cachedir_index, os.R_OK): + log.error(f"'{cachedir_index}' is not a valid, readable file.") + cachedir_index = None + if cachedir_index: + self.singularity_cache_index = cachedir_index + if retry: # invoke parsing the file again. + self.read_remote_containers() + + def read_remote_containers(self): + """Reads the file specified as index for the remote Singularity cache dir""" + if ( + self.container == "singularity" + and self.singularity_cache == "remote" + and self.singularity_cache_index is not None + ): + n_total_images = 0 + try: + with open(self.singularity_cache_index) as indexfile: + for line in indexfile.readlines(): + match = re.search(r"([^\/\\]+\.img)", line, re.S) + if match: + n_total_images += 1 + self.containers_remote.append(match.group(0)) + if n_total_images == 0: + raise LookupError("Could not find valid container names in the index file.") + else: + log.info( + f"Successfully read {n_total_images} containers from the remote $NXF_SINGULARITY_CACHE contents." + ) + self.containers_remote = sorted(list(set(self.containers_remote))) + except (FileNotFoundError, LookupError) as e: + log.error(f"[red]Issue with reading the specified remote $NXF_SINGULARITY_CACHE index:[/]\n{e}\n") + if rich.prompt.Confirm.ask(f"[blue]Specify a new index file and try again?"): + self.prompt_singularity_cachedir_remote(retry=True) + else: + log.info("Proceeding without consideration of the remote $NXF_SINGULARITY_CACHE index.") + self.singularity_cache_index = None + if os.environ.get("NXF_SINGULARITY_CACHEDIR"): + self.singularity_cache = "copy" # default to copy if possible, otherwise skip. + else: + self.singularity_cache = None def prompt_compression_type(self): """Ask user if we should compress the downloaded files""" @@ -531,7 +608,7 @@ def wf_use_local_configs(self, revision_dirname): nfconfig = nfconfig.replace(find_str, repl_str) # Append the singularity.cacheDir to the end if we need it - if self.container == "singularity" and not self.singularity_cache_only: + if self.container == "singularity" and self.singularity_cache == "copy": nfconfig += ( f"\n\n// Added by `nf-core download` v{nf_core.__version__} //\n" + 'singularity.cacheDir = "${projectDir}/../singularity-images/"' @@ -674,8 +751,14 @@ def get_singularity_images(self): if len(self.containers) == 0: log.info("No container names found in workflow") else: + log.info( + f"Found {len(self.containers)} container image{'s' if len(self.containers) > 1 else ''} in workflow." + ) + with DownloadProgress() as progress: - task = progress.add_task("all_containers", total=len(self.containers), progress_type="summary") + task = progress.add_task( + "Collecting container images", total=len(self.containers), progress_type="summary" + ) # Organise containers based on what we need to do with them containers_exist = [] @@ -697,8 +780,8 @@ def get_singularity_images(self): log.debug(f"Cache directory not found, creating: {cache_path_dir}") os.makedirs(cache_path_dir) - # We already have the target file in place, return - if os.path.exists(out_path): + # We already have the target file in place or in remote cache, return + if os.path.exists(out_path) or os.path.basename(out_path) in self.containers_remote: containers_exist.append(container) continue @@ -722,56 +805,62 @@ def get_singularity_images(self): "Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH" ) - # Go through each method of fetching containers in order - for container in containers_exist: - progress.update(task, description="Image file exists") - progress.update(task, advance=1) - - for container in containers_cache: - progress.update(task, description="Copying singularity images from cache") - self.singularity_copy_cache_image(*container) - progress.update(task, advance=1) - - with concurrent.futures.ThreadPoolExecutor(max_workers=self.parallel_downloads) as pool: - progress.update(task, description="Downloading singularity images") - - # Kick off concurrent downloads - future_downloads = [ - pool.submit(self.singularity_download_image, *container, progress) - for container in containers_download - ] - - # Make ctrl-c work with multi-threading - self.kill_with_fire = False - - try: - # Iterate over each threaded download, waiting for them to finish - for future in concurrent.futures.as_completed(future_downloads): - future.result() - try: - progress.update(task, advance=1) - except Exception as e: - log.error(f"Error updating progress bar: {e}") - - except KeyboardInterrupt: - # Cancel the future threads that haven't started yet - for future in future_downloads: - future.cancel() - # Set the variable that the threaded function looks for - # Will trigger an exception from each thread - self.kill_with_fire = True - # Re-raise exception on the main thread - raise - - for container in containers_pull: - progress.update(task, description="Pulling singularity images") - try: - self.singularity_pull_image(*container, progress) - except RuntimeWarning as r: - # Raise exception if this is not possible - log.error("Not able to pull image. Service might be down or internet connection is dead.") - raise r - progress.update(task, advance=1) + if containers_exist: + if self.singularity_cache_index is not None: + log.info(f"{len(containers_exist)} are already cached remotely and won't be retrieved.") + # Go through each method of fetching containers in order + for container in containers_exist: + progress.update(task, description="Image file exists at destination") + progress.update(task, advance=1) + + if containers_cache: + for container in containers_cache: + progress.update(task, description="Copying singularity images from cache") + self.singularity_copy_cache_image(*container) + progress.update(task, advance=1) + + if containers_download or containers_pull: + # if clause gives slightly better UX, because Download is no longer displayed if nothing is left to be downloaded. + with concurrent.futures.ThreadPoolExecutor(max_workers=self.parallel_downloads) as pool: + progress.update(task, description="Downloading singularity images") + + # Kick off concurrent downloads + future_downloads = [ + pool.submit(self.singularity_download_image, *container, progress) + for container in containers_download + ] + + # Make ctrl-c work with multi-threading + self.kill_with_fire = False + + try: + # Iterate over each threaded download, waiting for them to finish + for future in concurrent.futures.as_completed(future_downloads): + future.result() + try: + progress.update(task, advance=1) + except Exception as e: + log.error(f"Error updating progress bar: {e}") + + except KeyboardInterrupt: + # Cancel the future threads that haven't started yet + for future in future_downloads: + future.cancel() + # Set the variable that the threaded function looks for + # Will trigger an exception from each thread + self.kill_with_fire = True + # Re-raise exception on the main thread + raise + + for container in containers_pull: + progress.update(task, description="Pulling singularity images") + try: + self.singularity_pull_image(*container, progress) + except RuntimeWarning as r: + # Raise exception if this is not possible + log.error("Not able to pull image. Service might be down or internet connection is dead.") + raise r + progress.update(task, advance=1) def singularity_image_filenames(self, container): """Check Singularity cache for image, copy to destination folder if found. @@ -810,11 +899,11 @@ def singularity_image_filenames(self, container): if os.environ.get("NXF_SINGULARITY_CACHEDIR"): cache_path = os.path.join(os.environ["NXF_SINGULARITY_CACHEDIR"], out_name) # Use only the cache - set this as the main output path - if self.singularity_cache_only: + if self.singularity_cache == "amend": out_path = cache_path cache_path = None - elif self.singularity_cache_only: - raise FileNotFoundError("'--singularity-cache' specified but no '$NXF_SINGULARITY_CACHEDIR' set!") + elif self.singularity_cache in ["amend", "copy"]: + raise FileNotFoundError("Singularity cache is required but no '$NXF_SINGULARITY_CACHEDIR' set!") return (out_path, cache_path) @@ -998,7 +1087,6 @@ def __init__( remote_url, revision, commit, - no_pull=False, hide_progress=False, in_cache=True, ): @@ -1028,6 +1116,7 @@ def __init__( self.commit = [] self.fullname = nf_core.modules.modules_utils.repo_full_name_from_remote(self.remote_url) self.retries = 0 # retries for setting up the locally cached repository + self.hide_progress = hide_progress self.setup_local_repo(remote_url, in_cache=in_cache) @@ -1089,7 +1178,7 @@ def setup_local_repo(self, remote, in_cache=True): rich.progress.BarColumn(bar_width=None), "[bold yellow]{task.fields[state]}", transient=True, - disable=os.environ.get("HIDE_PROGRESS", None) is not None, + disable=os.environ.get("HIDE_PROGRESS", None) is not None or self.hide_progress, ) with pbar: self.repo = git.Repo.clone_from( @@ -1112,7 +1201,7 @@ def setup_local_repo(self, remote, in_cache=True): rich.progress.BarColumn(bar_width=None), "[bold yellow]{task.fields[state]}", transient=True, - disable=os.environ.get("HIDE_PROGRESS", None) is not None, + disable=os.environ.get("HIDE_PROGRESS", None) is not None or self.hide_progress, ) with pbar: self.repo.remotes.origin.fetch(