Skip to content

Commit

Permalink
feat: add the match_reference process
Browse files Browse the repository at this point in the history
feat: add all the match_ref analysis scripts

feat: add the match_ref snakemake workflow

feat: only initiate the match_ref wrapping process whenever it is set on the commandline or through the samplesheet

feat: add "NONE" input for primers and gff's support to the match_ref process
  • Loading branch information
florianzwagemaker committed Jul 12, 2023
1 parent 6449dd3 commit 1581c78
Show file tree
Hide file tree
Showing 9 changed files with 944 additions and 14 deletions.
32 changes: 21 additions & 11 deletions ViroConstrictor/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import ViroConstrictor.logging
from ViroConstrictor import __version__
from ViroConstrictor.logging import log
from ViroConstrictor.match_ref import process_match_ref
from ViroConstrictor.parser import CLIparser
from ViroConstrictor.runconfigs import GetSnakemakeRunDetails, WriteYaml
from ViroConstrictor.runreport import WriteReport
Expand Down Expand Up @@ -68,17 +69,21 @@ def get_preset_warning_list(

p_fallbackwarning_df = sample_info_df.loc[sample_info_df["PRESET_SCORE"] == 0.0]
targets, presets = (
list(x)
for x in (
zip(
*zip_longest(
list(set(p_fallbackwarning_df["VIRUS"].tolist())),
list(set(p_fallbackwarning_df["PRESET"].tolist())),
fillvalue="DEFAULT",
(
list(x)
for x in (
zip(
*zip_longest(
list(set(p_fallbackwarning_df["VIRUS"].tolist())),
list(set(p_fallbackwarning_df["PRESET"].tolist())),
fillvalue="DEFAULT",
)
)
)
)
) if p_fallbackwarning_df.shape[0] > 0 else ([], [])
if p_fallbackwarning_df.shape[0] > 0
else ([], [])
)
for _input, _preset in zip(targets, presets):
filtered_df = p_fallbackwarning_df.loc[
(p_fallbackwarning_df["VIRUS"] == _input)
Expand Down Expand Up @@ -138,11 +143,16 @@ def main() -> NoReturn:
if not parsed_input.flags.skip_updates:
update(sys.argv, parsed_input.user_config)

snakemake_run_details = GetSnakemakeRunDetails(inputs_obj=parsed_input)
# check if there's a value in the column 'MATCH-REF' set to True in the parsed_input.samples_df dataframe, if so, process the match-ref, else skip
if parsed_input.samples_df["MATCH-REF"].any():
parsed_input = process_match_ref(parsed_input)

log.info(f"{'='*20} [bold yellow] Starting Workflow [/bold yellow] {'='*20}")
status: bool = False
snakemake_run_details = GetSnakemakeRunDetails(
inputs_obj=parsed_input, samplesheetfilename="samples_main"
)

log.info(f"{'='*20} [bold yellow] Starting Main Workflow [/bold yellow] {'='*20}")
status: bool = False
if parsed_input.user_config["COMPUTING"]["compmode"] == "local":
status = snakemake.snakemake(
snakefile=parsed_input.snakefile,
Expand Down
237 changes: 237 additions & 0 deletions ViroConstrictor/match_ref.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import copy
from typing import Literal

import pandas as pd
from snakemake import snakemake

import ViroConstrictor.logging
from ViroConstrictor.logging import log
from ViroConstrictor.parser import CLIparser
from ViroConstrictor.runconfigs import GetSnakemakeRunDetails, WriteYaml
from ViroConstrictor.runreport import WriteReport


def run_snakemake(
inputs_obj: CLIparser, snakemakedetails: GetSnakemakeRunDetails
) -> bool:
"""
Run Snakemake pipeline.
Parameters
----------
inputs_obj : CLIparser
An instance of the CLIparser class containing the parsed command-line arguments.
snakemakedetails : GetSnakemakeRunDetails
An instance of the GetSnakemakeRunDetails class containing the details of the Snakemake run.
Returns
-------
bool
True if the Snakemake pipeline ran successfully, False otherwise.
"""
if inputs_obj.user_config["COMPUTING"]["compmode"] == "local":
return snakemake(
snakefile=inputs_obj.match_ref_snakefile,
workdir=inputs_obj.workdir,
cores=snakemakedetails.snakemake_run_conf["cores"],
use_conda=snakemakedetails.snakemake_run_conf["use-conda"],
conda_frontend="mamba",
jobname=snakemakedetails.snakemake_run_conf["jobname"],
latency_wait=snakemakedetails.snakemake_run_conf["latency-wait"],
dryrun=snakemakedetails.snakemake_run_conf["dryrun"],
configfiles=[
WriteYaml(
snakemakedetails.snakemake_run_parameters,
f"{inputs_obj.workdir}/config/run_params_MR.yaml",
)
],
restart_times=3,
keepgoing=True,
quiet=["all"], # type: ignore
log_handler=[
ViroConstrictor.logging.snakemake_logger(logfile=inputs_obj.logfile),
],
printshellcmds=False,
)

return snakemake(
snakefile=inputs_obj.match_ref_snakefile,
workdir=inputs_obj.workdir,
cores=snakemakedetails.snakemake_run_conf["cores"],
use_conda=snakemakedetails.snakemake_run_conf["use-conda"],
conda_frontend="mamba",
jobname=snakemakedetails.snakemake_run_conf["jobname"],
latency_wait=snakemakedetails.snakemake_run_conf["latency-wait"],
drmaa=snakemakedetails.snakemake_run_conf["drmaa"],
drmaa_log_dir=snakemakedetails.snakemake_run_conf["drmaa-log-dir"],
dryrun=snakemakedetails.snakemake_run_conf["dryrun"],
configfiles=[
WriteYaml(
snakemakedetails.snakemake_run_parameters,
f"{inputs_obj.workdir}/config/run_params_MR.yaml",
)
],
restart_times=3,
keepgoing=True,
quiet=["all"], # type: ignore
log_handler=[
ViroConstrictor.logging.snakemake_logger(logfile=inputs_obj.logfile),
],
printshellcmds=False,
)


def replace_sets_to_singular_values(df: pd.DataFrame, columns: list) -> pd.DataFrame:
"""
Replace sets in specified columns with their singular values.
Parameters
----------
df : pd.DataFrame
The DataFrame containing the columns to be processed.
columns : list
The list of column names to be processed.
Returns
-------
pd.DataFrame
The DataFrame with the specified columns' sets replaced with their singular values.
"""
for col in columns:
df[col] = df[col].apply(lambda x: list(x)[0])
return df


def get_snakemake_run_details(inputs_obj: CLIparser) -> GetSnakemakeRunDetails:
"""
Filter the samples dataframe to only include samples that require matching to a reference sequence.
Set the index of the filtered dataframe to the "SAMPLE" column.
Convert the filtered dataframe to a dictionary with "SAMPLE" as the key.
Return a GetSnakemakeRunDetails object with the filtered dataframe and the samplesheet filename.
Parameters
----------
inputs_obj : CLIparser
The parsed command line arguments.
Returns
-------
GetSnakemakeRunDetails
The object containing the filtered samples dataframe and the samplesheet filename.
"""
inputs_obj.samples_df = inputs_obj.samples_df[inputs_obj.samples_df["MATCH-REF"]]
inputs_obj.samples_df.set_index("SAMPLE", inplace=True)

inputs_obj.samples_dict = inputs_obj.samples_df.to_dict(orient="index")

return GetSnakemakeRunDetails(inputs_obj, samplesheetfilename="samples_MatchRef")


def replacement_merge_dataframe_on_cols(
original_df: pd.DataFrame,
override_df: pd.DataFrame,
cols_left: list,
cols_right: list,
) -> pd.DataFrame:
"""
Merge two dataframes based on a common column and replace values in the original dataframe with values from the override dataframe.
Parameters
----------
original_df : pd.DataFrame
The original dataframe to be merged and updated.
override_df : pd.DataFrame
The dataframe containing the updated values to be merged into the original dataframe.
cols_left : list
The list of column names in the original dataframe to be merged.
cols_right : list
The list of column names in the override dataframe to be merged.
Returns
-------
pd.DataFrame
The merged dataframe with updated values from the override dataframe.
"""
for i in zip(cols_left, cols_right):
original_df[i[0]] = original_df.apply(
lambda x: override_df[i[1]][override_df["sample"] == x["SAMPLE"]].values[0]
if x["SAMPLE"] in override_df["sample"].values and x[i[0]] != "NONE"
else x[i[0]],
axis=1,
)
return original_df


def process_match_ref(parsed_inputs: CLIparser) -> CLIparser:
"""
Process the match_ref step of the ViroConstrictor pipeline.
This function runs the snakemake pipeline for the match_ref step using the parsed inputs. If the pipeline fails, it writes a report and exits with status code 1. If the pipeline succeeds, it reads the resulting dataframe from the match_ref_results.pkl file and extracts the columns "sample", "Reference", "Reference_file", "Primer_file" and "Feat_file". It then groups the dataframe by sample and aggregates the values in each column into a set. The resulting dataframe is then modified to replace all the sets with just one value to only that value (without the set) except for the Reference column. Finally, the values for the columns "REFERENCE", "PRIMERS" and "FEATURES" in parsed_inputs.samples_df are replaced with the values in columns "Reference_file", "Primer_file" and "Feat_file" in the modified dataframe where the sample names match.
Parameters
----------
parsed_inputs : CLIparser
The parsed inputs for the ViroConstrictor pipeline.
Returns
-------
CLIparser
The modified parsed inputs with updated values for the "REFERENCE", "PRIMERS" and "FEATURES" columns.
"""
inputs_obj_match_ref = copy.deepcopy(parsed_inputs)
snakemakedetails = get_snakemake_run_details(inputs_obj_match_ref)
log.info(
f"{'='*20} [bold orange_red1] Starting Match-reference process [/bold orange_red1] {'='*20}"
)
status = run_snakemake(inputs_obj_match_ref, snakemakedetails)

workflow_state: Literal["Failed", "Success"] = (
"Failed" if status is False else "Success"
)
if status is False:
WriteReport(
inputs_obj_match_ref.workdir,
inputs_obj_match_ref.input_path,
inputs_obj_match_ref.exec_start_path,
inputs_obj_match_ref.user_config,
snakemakedetails.snakemake_run_parameters,
snakemakedetails.snakemake_run_conf,
workflow_state,
)
exit(1)

if snakemakedetails.snakemake_run_conf["dryrun"] is True:
return parsed_inputs

# status is True, this means that a file should exist at {workdir}/data/match_ref_results.pkl
df = pd.read_pickle(f"{inputs_obj_match_ref.workdir}/data/match_ref_results.pkl")

# ensure the "Primer_file" and "Feat_file" columns exist in df. If they don't, create them and set their values to "NONE"
if "Primer_file" not in df.columns:
df["Primer_file"] = "NONE"
if "Feat_file" not in df.columns:
df["Feat_file"] = "NONE"

# keep only columns "sample", "Reference", "Reference_file", "Primer_file" and "Feat_file" from df
filt_df = df[["sample", "Reference", "Reference_file", "Primer_file", "Feat_file"]]
imploded_df = filt_df.groupby("sample").agg(set).reset_index()

# replace all the sets with just one value to only that value (without the set) except for the Reference column
imploded_df = replace_sets_to_singular_values(
imploded_df, ["Reference_file", "Primer_file", "Feat_file"]
)

# replace the values for the columns "REFERENCE", "PRIMERS" and "FEATURES" in parsed_inputs.samples_df with the values in columns "Reference_file", "Primer_file" and "Feat_file" in imploded_df where the sample names match.
# if the sample names don't match, the value in the column should be unchanged from the original value
parsed_inputs.samples_df = replacement_merge_dataframe_on_cols(
parsed_inputs.samples_df,
imploded_df,
["REFERENCE", "PRIMERS", "FEATURES"],
["Reference_file", "Primer_file", "Feat_file"],
)

parsed_inputs.samples_df.to_csv("samples_MatchRef.csv", index=False)
parsed_inputs.samples_df.set_index("SAMPLE", inplace=True)
parsed_inputs.samples_dict = parsed_inputs.samples_df.to_dict(orient="index")

return parsed_inputs
16 changes: 13 additions & 3 deletions ViroConstrictor/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, input_args: list[str]) -> None:
self.workdir,
self.exec_start_path,
self.snakefile,
self.match_ref_snakefile,
) = self._get_paths_for_workflow(self.flags)
if not self.samples_dict:
sys.exit(1)
Expand Down Expand Up @@ -259,7 +260,7 @@ def _get_args(self, givenargs: list[str]) -> argparse.Namespace:
action="store_true",
help="Match your data to the best reference available in the given reference fasta file.",
)

optional_args.add_argument(
"--segmented",
"-seg",
Expand Down Expand Up @@ -502,7 +503,7 @@ def _print_missing_asset_warning(

def _get_paths_for_workflow(
self, flags: argparse.Namespace
) -> tuple[str, str, str, str]:
) -> tuple[str, str, str, str, str]:
"""Takes the input and output paths from the command line, and then creates the working directory if
it doesn't exist. It then changes the current working directory to the working directory
Expand All @@ -522,13 +523,22 @@ def _get_paths_for_workflow(
snakefile: str = os.path.join(
os.path.abspath(os.path.dirname(__file__)), "workflow", "workflow.smk"
)
match_ref_snakefile: str = os.path.join(
os.path.abspath(os.path.dirname(__file__)), "workflow", "match-ref.smk"
)

if not os.path.exists(working_directory):
os.makedirs(working_directory)
if os.getcwd() != working_directory:
os.chdir(working_directory)

return input_path, working_directory, exec_start_path, snakefile
return (
input_path,
working_directory,
exec_start_path,
snakefile,
match_ref_snakefile,
)


def file_exists(path: str) -> bool:
Expand Down
Loading

0 comments on commit 1581c78

Please sign in to comment.