diff --git a/pynteny/app/callbacks.py b/pynteny/app/callbacks.py new file mode 100644 index 0000000..eec210a --- /dev/null +++ b/pynteny/app/callbacks.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Callback definitions to be used by streamlit app""" + +import shutil +from pathlib import Path +import streamlit as st + +import pynteny.app.filemanager as filemanager +from pynteny.subcommands import synteny_search, build_database, download_hmms +from pynteny.utils import ConfigParser + + +parent_dir = Path(__file__).parent + + +def select_log_path(): + if st.session_state.log == "Yes": + logfile = Path(st.session_state.outdir) / "pynteny.log" + else: + logfile = None + st.session_state.pynteny_log = logfile + + +def update_log(): + """ + Update Pynteny log with Streamlit log info + """ + config = ConfigParser.get_default_config() + streamlit_log = config.get_field("streamlit_log") + select_log_path() + if st.session_state.pynteny_log is not None: + shutil.copy(streamlit_log, st.session_state.pynteny_log) + + +def search(): + config = ConfigParser.get_default_config() + if (st.session_state.search_state.hmm_dir is None) and ( + not config.get_field("data_downloaded") + ): + with st.spinner("Downloading HMM database, please wait..."): + download_hmms(st.session_state.download_state) + st.success("HMM database downloaded!") + st.session_state.search_state.outdir = st.session_state.outdir + if ( + st.session_state.search_state.data is not None + and st.session_state.search_state.data.exists() + ): + synhits = synteny_search(st.session_state.search_state).getSyntenyHits() + st.session_state.search_state.synteny_hits = synhits[ + [c for c in synhits.columns if c != "full_label"] + ] + update_log() + else: + st.warning("Please, first upload a sequence database file") + + +def build(): + if not st.session_state.sequence_data_uploaded: + st.warning("Please, first upload assembly data file") + else: + st.session_state.build_state.data = st.session_state.search_state.data + st.session_state.build_state.outdir = st.session_state.search_state.outdir + st.session_state.build_state.outfile = ( + Path(st.session_state.search_state.data.parent) + / f"{st.session_state.search_state.data.stem}_labelled.faa" + ) + st.session_state.search_state.data = st.session_state.build_state.outfile + build_database(st.session_state.build_state) + update_log() + + +def upload_data(): + selected_path = filemanager.open_file_explorer() + st.session_state.search_state.data = selected_path + st.session_state.sequence_data_uploaded = True + + +def update_output_dir(): + selected_outdir = filemanager.open_directory_explorer() + if selected_outdir is not None: + st.session_state.outdir = selected_outdir + + +def update_output_prefix(): + st.session_state.search_state.prefix = st.session_state.prefix + + +def update_output_subdirectory(): + subdir = Path(st.session_state.outdir) / st.session_state.subdirectory + if not subdir.exists(): + subdir.mkdir(parents=True, exist_ok=False) + st.session_state.outdir = subdir + + +def select_HMM_dir(): + selected_dir = filemanager.open_directory_explorer() + st.session_state.search_state.hmm_dir = selected_dir + st.success(f"Selected HMM database: {selected_dir}") + + +def select_HMM_meta(): + selected_file = filemanager.open_file_explorer() + st.session_state.search_state.hmm_meta = selected_file + st.success(f"Selected HMM metadata: {selected_file}") + + +def set_number_of_processes(): + st.session_state.build_state.processes = st.session_state.processes + st.session_state.search_state.processes = st.session_state.processes + + +def close_session(): + st.text(" ") + st.text(" ") + st.markdown("# Thanks for using") + st.text(" ") + st.image(st.session_state.sidebar_icon) + st.text(" ") + st.markdown("### Please stop the server by pressing control + c in terminal") + st.stop() diff --git a/pynteny/app/components.py b/pynteny/app/components.py index 027b17e..085682f 100644 --- a/pynteny/app/components.py +++ b/pynteny/app/components.py @@ -3,7 +3,9 @@ import streamlit as st -from pynteny.app.helpers import Callbacks, FileManager, Plotter +from pynteny.app.helpers import plot_dataframe +import pynteny.app.callbacks as callbacks +import pynteny.app.filemanager as filemanager meta = metadata.metadata("pynteny") @@ -11,155 +13,147 @@ __author__ = meta["Author"] -class Sidebar: - @staticmethod - def show(): +def show_sidebar(): - st.sidebar.image( - st.session_state.sidebar_icon, - use_column_width=True, - caption=f"Synteny-aware HMM searches made easy (v{__version__})", - ) + st.sidebar.image( + st.session_state.sidebar_icon, + use_column_width=True, + caption=f"Synteny-aware HMM searches made easy (v{__version__})", + ) + + st.sidebar.text(" ") + st.sidebar.text(" ") - st.sidebar.text(" ") - st.sidebar.text(" ") - - with st.sidebar.expander("Current output directory:", expanded=True): - if (st.session_state.outdir is not None) and ( - st.session_state.outdir.exists() - ): - files_div = st.empty() - with files_div.container(): - FileManager.show_files_in_dir( - st.session_state.outdir, sidebar=False - ) - - st.text(" ") - st.button("Select directory", on_click=Callbacks.update_output_dir) - col1, col2 = st.columns([0.5, 0.5]) - with col1: - st.text_input( - "", - value="", - max_chars=None, - key="subdirectory", - on_change=Callbacks.update_output_subdirectory, - placeholder="Create subdirectory", - ) - with col2: - st.text_input( - "", - value="", - max_chars=None, - key="prefix", - on_change=Callbacks.update_output_prefix, - placeholder="Enter output prefix", - ) - - with st.sidebar.expander("Advanced parameters:", expanded=False): - - st.markdown("Select custom HMM database") - col1, col2 = st.columns([1, 1]) - with col1: - st.button("HMM directory", on_click=Callbacks.select_HMM_dir) - with col2: - st.button("HMM metadata", on_click=Callbacks.select_HMM_meta) - - col1, col2, col3 = st.columns([0.4, 0.3, 0.3]) - with col1: - st.markdown(("Processes:")) - st.slider( - "Processes", - min_value=1, - max_value=os.cpu_count(), - value=os.cpu_count() - 1, - step=1, - on_change=Callbacks.set_number_of_processes, - key="processes", - ) - with col2: - st.markdown(("Output log:")) - output_log = st.select_slider( - "", - options=["No", "Yes"], - value="Yes", - on_change=Callbacks.select_log_path, - key="log", - ) - st.sidebar.text(" ") - col1, col2 = st.sidebar.columns([1, 1]) + with st.sidebar.expander("Current output directory:", expanded=True): + if (st.session_state.outdir is not None) and (st.session_state.outdir.exists()): + files_div = st.empty() + with files_div.container(): + filemanager.show_files_in_dir(st.session_state.outdir, sidebar=False) + + st.text(" ") + st.button("Select directory", on_click=callbacks.update_output_dir) + col1, col2 = st.columns([0.5, 0.5]) with col1: - st.button("Restart session") + st.text_input( + "", + value="", + max_chars=None, + key="subdirectory", + on_change=callbacks.update_output_subdirectory, + placeholder="Create subdirectory", + ) with col2: - st.button("Close session", key="close", on_click=Callbacks.close_session) + st.text_input( + "", + value="", + max_chars=None, + key="prefix", + on_change=callbacks.update_output_prefix, + placeholder="Enter output prefix", + ) + with st.sidebar.expander("Advanced parameters:", expanded=False): -class Mainpage: - @staticmethod - def show(): + st.markdown("Select custom HMM database") + col1, col2 = st.columns([1, 1]) + with col1: + st.button("HMM directory", on_click=callbacks.select_HMM_dir) + with col2: + st.button("HMM metadata", on_click=callbacks.select_HMM_meta) - st.title("Pynteny — Synteny-aware HMM searches made easy") + col1, col2, col3 = st.columns([0.4, 0.3, 0.3]) + with col1: + st.markdown(("Processes:")) + st.slider( + "Processes", + min_value=1, + max_value=os.cpu_count(), + value=os.cpu_count() - 1, + step=1, + on_change=callbacks.set_number_of_processes, + key="processes", + ) + with col2: + st.markdown(("Output log:")) + output_log = st.select_slider( + "", + options=["No", "Yes"], + value="Yes", + on_change=callbacks.select_log_path, + key="log", + ) + st.sidebar.text(" ") + col1, col2 = st.sidebar.columns([1, 1]) + with col1: + st.button("Restart session") + with col2: + st.button("Close session", key="close", on_click=callbacks.close_session) - with st.expander("Select sequence data", expanded=False): - st.info( - """ - Sequence data can be either: - - nucleotide assembly data in FASTA format or - - a GenBank file containing sequence annotations. +def show_mainpage(): - __Note__: This Pynteny instance is run locally, thus files are always kept in your machine. - """ - ) + st.title("Pynteny — Synteny-aware HMM searches made easy") - with st.expander("", expanded=True): - col1, col2 = st.columns([1, 1]) - with col1: - st.button("Upload file", on_click=Callbacks.upload_data) - with col2: - st.button("Build database", on_click=Callbacks.build) + with st.expander("Select sequence data", expanded=False): + st.info( + """ + Sequence data can be either: - if st.session_state.sequence_data_uploaded: - st.info(f"Uploaded file: {st.session_state.search_state.data.name}") + - nucleotide assembly data in FASTA format or + - a GenBank file containing sequence annotations. - with st.expander("Enter synteny structure:", expanded=False): - st.info( - """ - Synteny blocks are specified by strings of ordered HMM names or gene IDs with the following format: + __Note__: This Pynteny instance is run locally, thus files are always kept in your machine. + """ + ) - $$\lt HMM_a \space n_{ab} \space \lt HMM_b \space n_{bc} \space \lt(HMM_{c1}|HMM_{c2}|HMM_{c3}),$$ + with st.expander("", expanded=True): + col1, col2 = st.columns([1, 1]) + with col1: + st.button("Upload file", on_click=callbacks.upload_data) + with col2: + st.button("Build database", on_click=callbacks.build) - where $n_{ab}$ corresponds to the maximum number of genes between $HMM_a$ and $HMM_b$. Additionally: + if st.session_state.sequence_data_uploaded: + st.info(f"Uploaded file: {st.session_state.search_state.data.name}") - - Results can be strand-specific, in that case $>$ preceding a HMM name indicates that the corresponding ORF must be located in the positive (or sense) strand. Likewise, a $<$ symbol indicates that the ORF must be located in the negative (antisense) strand. + with st.expander("Enter synteny structure:", expanded=False): + st.info( + """ + Synteny blocks are specified by strings of ordered HMM names or gene IDs with the following format: - - Searches can be made strand-insensitive by omitting the $>$ or $<$ symbol. + $$\lt HMM_a \space n_{ab} \space \lt HMM_b \space n_{bc} \space \lt(HMM_{c1}|HMM_{c2}|HMM_{c3}),$$ - - Several HMMs can be assigned to the same ORF, in which case the search is performed for all of them. In this case, HMM names must be separated by "|" and grouped within parentheses, as shown above. - """ - ) + where $n_{ab}$ corresponds to the maximum number of genes between $HMM_a$ and $HMM_b$. Additionally: + + - Results can be strand-specific, in that case $>$ preceding a HMM name indicates that the corresponding ORF must be located in the positive (or sense) strand. Likewise, a $<$ symbol indicates that the ORF must be located in the negative (antisense) strand. + + - Searches can be made strand-insensitive by omitting the $>$ or $<$ symbol. + + - Several HMMs can be assigned to the same ORF, in which case the search is performed for all of them. In this case, HMM names must be separated by "|" and grouped within parentheses, as shown above. + """ + ) - with st.expander("", expanded=True): - col1, col2, col3 = st.columns([0.7, 0.15, 0.15]) - with col1: - st.session_state.search_state.synteny_struc = st.text_input( - "", " None: + """ + Show a list of files in directory + """ + filelist = [ + file.name + for file in directory.iterdir() + if (not file.name.startswith(".") and file.is_file()) + ] + filelist.sort(key=lambda x: x.lower()) + dirlist = [ + file.name + for file in directory.iterdir() + if (not file.name.startswith(".") and file.is_dir()) + ] + dirlist.sort(key=lambda x: x.lower()) + itemlist = filelist + dirlist + iconlist = [":page_facing_up:" for _ in filelist] + [ + ":open_file_folder:" for _ in dirlist + ] + + markdown_table = ( + f"\n| :diamond_shape_with_a_dot_inside: | {directory} |\n| --- | --- | " + ) + for icon, object in zip(iconlist, itemlist): + markdown_table += f"\n| {icon} | {object} | " + markdown_table += "\n\n" + + if sidebar: + st.sidebar.markdown(markdown_table) + else: + st.markdown(markdown_table) + + +def open_file_explorer() -> Path: + """ + Open a file explorer and return selected path + """ + root = tk.Tk() + root.geometry("700x350") + root.withdraw() + try: + selected_path = Path(filedialog.askopenfilename()) + except: + selected_path = None + root.destroy() + return selected_path + + +def open_directory_explorer() -> Path: + """ + Open a explorer to select directory and return path + """ + root = tk.Tk() + root.geometry("700x350") + root.withdraw() + try: + selected_dir = Path(filedialog.askdirectory(master=root)) + except: + selected_dir = None + root.destroy() + return selected_dir diff --git a/pynteny/app/helpers.py b/pynteny/app/helpers.py index a080a9e..ba14215 100644 --- a/pynteny/app/helpers.py +++ b/pynteny/app/helpers.py @@ -1,235 +1,48 @@ -import shutil +#!/usr/bin/env python +# -*- coding: utf-8 -*- + from pathlib import Path import pandas as pd -import tkinter as tk -from tkinter import filedialog import streamlit as st from st_aggrid import AgGrid, GridOptionsBuilder -from pynteny.subcommands import synteny_search, build_database, download_hmms -from pynteny.utils import ConfigParser - - parent_dir = Path(__file__).parent -class FileManager: - @staticmethod - def show_files_in_dir(directory: Path, sidebar: bool = False) -> None: - """ - Show a list of files in directory - """ - filelist = [ - file.name - for file in directory.iterdir() - if (not file.name.startswith(".") and file.is_file()) - ] - filelist.sort(key=lambda x: x.lower()) - dirlist = [ - file.name - for file in directory.iterdir() - if (not file.name.startswith(".") and file.is_dir()) - ] - dirlist.sort(key=lambda x: x.lower()) - itemlist = filelist + dirlist - iconlist = [":page_facing_up:" for _ in filelist] + [ - ":open_file_folder:" for _ in dirlist - ] - - markdown_table = ( - f"\n| :diamond_shape_with_a_dot_inside: | {directory} |\n| --- | --- | " - ) - for icon, object in zip(iconlist, itemlist): - markdown_table += f"\n| {icon} | {object} | " - markdown_table += "\n\n" - - if sidebar: - st.sidebar.markdown(markdown_table) - else: - st.markdown(markdown_table) - - @staticmethod - def open_file_explorer() -> Path: - """ - Open a file explorer and return selected path - """ - root = tk.Tk() - root.geometry("700x350") - root.withdraw() - try: - selected_path = Path(filedialog.askopenfilename()) - except: - selected_path = None - root.destroy() - return selected_path - - @staticmethod - def open_directory_explorer() -> Path: - """ - Open a explorer to select directory and return path - """ - root = tk.Tk() - root.geometry("700x350") - root.withdraw() - try: - selected_dir = Path(filedialog.askdirectory(master=root)) - except: - selected_dir = None - root.destroy() - return selected_dir - - -class Callbacks: - @staticmethod - def search(): - config = ConfigParser.get_default_config() - if (st.session_state.search_state.hmm_dir is None) and ( - not config.get_field("data_downloaded") - ): - with st.spinner("Downloading HMM database, please wait..."): - download_hmms(st.session_state.download_state) - st.success("HMM database downloaded!") - st.session_state.search_state.outdir = st.session_state.outdir - if ( - st.session_state.search_state.data is not None - and st.session_state.search_state.data.exists() - ): - synhits = synteny_search(st.session_state.search_state).getSyntenyHits() - st.session_state.search_state.synteny_hits = synhits[ - [c for c in synhits.columns if c != "full_label"] - ] - Logger.update_log() - else: - st.warning("Please, first upload a sequence database file") - - @staticmethod - def build(): - if not st.session_state.sequence_data_uploaded: - st.warning("Please, first upload assembly data file") - else: - st.session_state.build_state.data = st.session_state.search_state.data - st.session_state.build_state.outdir = st.session_state.search_state.outdir - st.session_state.build_state.outfile = ( - Path(st.session_state.search_state.data.parent) - / f"{st.session_state.search_state.data.stem}_labelled.faa" - ) - st.session_state.search_state.data = st.session_state.build_state.outfile - build_database(st.session_state.build_state) - Logger.update_log() - - @staticmethod - def upload_data(): - selected_path = FileManager.open_file_explorer() - st.session_state.search_state.data = selected_path - st.session_state.sequence_data_uploaded = True - - @staticmethod - def update_output_dir(): - selected_outdir = FileManager.open_directory_explorer() - if selected_outdir is not None: - st.session_state.outdir = selected_outdir - - @staticmethod - def update_output_prefix(): - st.session_state.search_state.prefix = st.session_state.prefix - - @staticmethod - def update_output_subdirectory(): - subdir = Path(st.session_state.outdir) / st.session_state.subdirectory - if not subdir.exists(): - subdir.mkdir(parents=True, exist_ok=False) - st.session_state.outdir = subdir - - @staticmethod - def select_HMM_dir(): - selected_dir = FileManager.open_directory_explorer() - st.session_state.search_state.hmm_dir = selected_dir - st.success(f"Selected HMM database: {selected_dir}") - - @staticmethod - def select_HMM_meta(): - selected_file = FileManager.open_file_explorer() - st.session_state.search_state.hmm_meta = selected_file - st.success(f"Selected HMM metadata: {selected_file}") - - @staticmethod - def set_number_of_processes(): - st.session_state.build_state.processes = st.session_state.processes - st.session_state.search_state.processes = st.session_state.processes - - @staticmethod - def select_log_path(): - if st.session_state.log == "Yes": - logfile = Path(st.session_state.outdir) / "pynteny.log" - else: - logfile = None - st.session_state.pynteny_log = logfile - - @staticmethod - def close_session(): - st.text(" ") - st.text(" ") - st.markdown("# Thanks for using") - st.text(" ") - st.image(st.session_state.sidebar_icon) - st.text(" ") - st.markdown("### Please stop the server by pressing control + c in terminal") - st.stop() - - -class Plotter: - @staticmethod - def plot_dataframe(data: pd.DataFrame) -> AgGrid: - """ - Plot dataframe in webpage - themes: streamlit, balham, alpine, material - """ - gb = GridOptionsBuilder.from_dataframe(data) - gb.configure_pagination(paginationAutoPageSize=True) - gb.configure_side_bar() - gridOptions = gb.build() - grid_response = AgGrid( - data, - gridOptions=gridOptions, - data_return_mode="AS_INPUT", - update_mode="MODEL_CHANGED", - fit_columns_on_grid_load=False, - theme="alpine", - enable_enterprise_modules=True, - height=350, - width="100%", - reload_data=True, - ) - return grid_response - - -class Logger: - @staticmethod - def update_log(): - """ - Update Pynteny log with Streamlit log info - """ - config = ConfigParser.get_default_config() - streamlit_log = config.get_field("streamlit_log") - Callbacks.select_log_path() - if st.session_state.pynteny_log is not None: - shutil.copy(streamlit_log, st.session_state.pynteny_log) - - -class ExampleSearch: - @staticmethod - def set_example(): - example_data_dir = Path(Path(parent_dir.parent).parent) / "tests" - st.session_state.sequence_data_uploaded = True - st.session_state.search_state.prefix = "example_" - st.session_state.search_state.data = example_data_dir / "test_data/MG1655.fasta" - st.session_state.search_state.hmm_dir = example_data_dir / "test_data/hmms" - st.session_state.search_state.hmm_meta = ( - example_data_dir / "test_data/hmm_meta.tsv" - ) - search_outdir = Path(st.session_state.outdir) / "pynteny_example" - if not st.session_state.outdir.exists(): - search_outdir.mkdir(parents=True, exist_ok=True) - st.session_state.outdir = search_outdir +def plot_dataframe(data: pd.DataFrame) -> AgGrid: + """ + Plot dataframe in webpage + themes: streamlit, balham, alpine, material + """ + gb = GridOptionsBuilder.from_dataframe(data) + gb.configure_pagination(paginationAutoPageSize=True) + gb.configure_side_bar() + gridOptions = gb.build() + grid_response = AgGrid( + data, + gridOptions=gridOptions, + data_return_mode="AS_INPUT", + update_mode="MODEL_CHANGED", + fit_columns_on_grid_load=False, + theme="alpine", + enable_enterprise_modules=True, + height=350, + width="100%", + reload_data=True, + ) + return grid_response + + +def set_example(): + example_data_dir = Path(Path(parent_dir.parent).parent) / "tests" + st.session_state.sequence_data_uploaded = True + st.session_state.search_state.prefix = "example_" + st.session_state.search_state.data = example_data_dir / "test_data/MG1655.fasta" + st.session_state.search_state.hmm_dir = example_data_dir / "test_data/hmms" + st.session_state.search_state.hmm_meta = example_data_dir / "test_data/hmm_meta.tsv" + search_outdir = Path(st.session_state.outdir) / "pynteny_example" + if not st.session_state.outdir.exists(): + search_outdir.mkdir(parents=True, exist_ok=True) + st.session_state.outdir = search_outdir diff --git a/pynteny/app/main_page.py b/pynteny/app/main_page.py index d44960e..9de34d3 100644 --- a/pynteny/app/main_page.py +++ b/pynteny/app/main_page.py @@ -5,8 +5,8 @@ from PIL import Image from pynteny.utils import CommandArgs -from pynteny.app.helpers import ExampleSearch -from pynteny.app.components import Sidebar, Mainpage +from pynteny.app.helpers import set_example +from pynteny.app.components import show_sidebar, show_mainpage parent_dir = Path(__file__).parent @@ -83,6 +83,6 @@ st.session_state.pynteny_log = None -Sidebar.show() -Mainpage.show() -ExampleSearch.set_example() +show_sidebar() +show_mainpage() +set_example() diff --git a/pynteny/filter.py b/pynteny/filter.py index 09a515e..f028dca 100644 --- a/pynteny/filter.py +++ b/pynteny/filter.py @@ -15,7 +15,8 @@ from pynteny.preprocessing import FASTA from pynteny.hmm import HMMER, PGAP -from pynteny.parser import SyntenyParser, LabelParser +import pynteny.parsers.labelparser as labelparser +import pynteny.parsers.syntenyparser as syntenyparser logger = logging.getLogger(__name__) @@ -44,7 +45,7 @@ def __init__(self, synteny_structure: str, unordered: bool = False) -> None: any order. If ordered, the filters would filter collinear rather than syntenic structures. Defaults to False. """ - parsed_structure = SyntenyParser.parse_synteny_structure(synteny_structure) + parsed_structure = syntenyparser.parse_synteny_structure(synteny_structure) hmm_codes = list(range(len(parsed_structure["hmm_groups"]))) self.hmm_code_order_pattern = hmm_codes @@ -155,10 +156,10 @@ def __init__( self._hmm_hits = hmm_hits self._hmms = list(hmm_hits.keys()) self._synteny_structure = synteny_structure - self._contains_hmm_groups = SyntenyParser.contains_HMM_groups( + self._contains_hmm_groups = syntenyparser.contains_HMM_groups( self._synteny_structure ) - self._parsed_structure = SyntenyParser.parse_synteny_structure( + self._parsed_structure = syntenyparser.parse_synteny_structure( self._synteny_structure ) if self._unordered: @@ -235,7 +236,6 @@ def get_all_HMM_hits(self) -> pd.DataFrame: pd.DataFrame: HMMER3 hit labels matching provided HMMs. """ hit_labels = {} - labelparser = LabelParser() for hmm, hits in self._hmm_hits.items(): labels = hits.id.values.tolist() if not labels: @@ -345,7 +345,7 @@ def _hits_to_dataframe(hits_by_contig: dict) -> pd.DataFrame: for contig, matched_hits in hits_by_contig.items(): for hmm, labels in matched_hits.items(): for label in labels: - parsed_label = LabelParser.parse(label) + parsed_label = labelparser.parse(label) data.append( [ parsed_label["contig"], diff --git a/pynteny/parsers/__init__.py b/pynteny/parsers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pynteny/parsers/labelparser.py b/pynteny/parsers/labelparser.py new file mode 100644 index 0000000..ac84479 --- /dev/null +++ b/pynteny/parsers/labelparser.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Tools to parse record labels to extract coded info +""" + +from __future__ import annotations +import logging + +import pandas as pd + + +logger = logging.getLogger(__name__) + + +def parse(label: str) -> dict: + """Parse sequence labels to obtain contig and locus info + + Args: + label (str): sequence label + + Returns: + dict: dictionary with parsed label info. + """ + parsed_dict = { + "full_label": label, + "gene_id": "", + "contig": "", + "gene_pos": None, + "locus_pos": None, + "strand": "", + } + try: + entry = label.split("__")[0] + meta = label.split("__")[1] + strand = meta.split("_")[-1] + locus_pos = tuple([int(pos) for pos in meta.split("_")[-3:-1]]) + gene_pos = int(meta.split("_")[-4]) + contig = "_".join(meta.split("_")[:-4]) + + parsed_dict["gene_id"] = entry + parsed_dict["contig"] = contig + parsed_dict["gene_pos"] = gene_pos + parsed_dict["locus_pos"] = locus_pos + parsed_dict["strand"] = strand + except Exception: + pass + return parsed_dict + + +def parse_from_list(labels: list[str]) -> pd.DataFrame: + """Parse labels in list of labels and return DataFrame. + + Args: + labels (list, optional): list of labels as stringgs. + + Returns: + pd.DataFrame: Dataframe containing parsed information from labels. + """ + return pd.DataFrame([parse(label) for label in labels]) diff --git a/pynteny/parsers/syntenyparser.py b/pynteny/parsers/syntenyparser.py new file mode 100644 index 0000000..2f9c985 --- /dev/null +++ b/pynteny/parsers/syntenyparser.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Tools to parse synteny structure strings +""" + +from __future__ import annotations +import sys +import logging +from pathlib import Path + +import pandas as pd + +from pynteny.hmm import PGAP +from pynteny.utils import is_right_list_nested_type + +logger = logging.getLogger(__name__) + + +def reformat_synteny_structure(synteny_structure: str) -> str: + """Remove illegal symbols and extra white space.""" + synteny_structure = synteny_structure.replace(" |", "|").replace("| ", "|").strip() + return synteny_structure + + +def contains_HMM_groups(synteny_structure: str) -> bool: + """Check whether structure contains groups of gene-equivalent HMMs.""" + return "|" in synteny_structure + + +def is_valid_structure(synteny_structure: str) -> bool: + """Validate synteny structure format.""" + synteny_structure = synteny_structure.replace(" |", "|").replace("| ", "|").strip() + parsed_struc = parse_synteny_structure(synteny_structure) + right_type = all( + ( + is_right_list_nested_type(parsed_struc["hmm_groups"], str), + is_right_list_nested_type(parsed_struc["strands"], str), + is_right_list_nested_type(parsed_struc["distances"], int), + ) + ) + right_format = len(parsed_struc["hmm_groups"]) == len( + parsed_struc["strands"] + ) and len(parsed_struc["hmm_groups"]) == (len(parsed_struc["distances"]) + 1) + return False if (not right_type or not right_format) else True + + +def split_strand_from_locus(locus_str: str, parsed_symbol: bool = True) -> tuple[str]: + """Split strand info from locus tag / HMM model. + + Args: + locus_str (str): a substring of a synteny structure containing + a gene symbol / HMM name and strand info. + parsed_symbol (bool, optional): if True, strand info '>' is parsed + as 'pos' and '<' as 'neg'. Defaults to True. + + Returns: + tuple[str]: tuple with parsed strand info and gene symbol / HMM name. + """ + locus_str = locus_str.strip() + if locus_str[0] == "<" or locus_str[0] == ">": + sense = locus_str[0] + locus_str = locus_str[1:] + if parsed_symbol: + strand = "pos" if sense == ">" else "neg" + else: + strand = sense + else: + strand = "" + return (strand, locus_str) + + +def get_HMM_groups_in_structure(synteny_structure: str) -> list[str]: + """Get hmm names employed in synteny structure, + if more than one hmm for the same gene, return + a list with all of them. + """ + links = synteny_structure.replace("(", "").replace(")", "").strip().split() + if not links: + logger.error("Invalid format for synteny structure") + sys.exit(1) + hmm_groups = [split_strand_from_locus(h)[1] for h in links if not h.isdigit()] + return hmm_groups + + +def get_gene_symbols_in_structure(synteny_structure: str) -> list[str]: + """Retrieve gene symbols contained in synteny structure.""" + links = synteny_structure.strip().split() + if not links: + logger.error("Invalid format for synteny structure") + sys.exit(1) + gene_symbols = [split_strand_from_locus(h)[1] for h in links if not h.isdigit()] + return gene_symbols + + +def get_all_HMMs_in_structure(synteny_structure: str) -> list[str]: + """Get hmm names employed in synteny structure, + if more than one hmm for the same gene, return + a list with all of them. + """ + hmm_groups = get_HMM_groups_in_structure(synteny_structure) + hmm_names = [hmm for hmm_group in hmm_groups for hmm in hmm_group.split("|")] + return hmm_names + + +def get_strands_in_structure( + synteny_structure: str, parsed_symbol: bool = True +) -> list[str]: + """Get strand sense list in structure. + + Args: + synteny_structure (str): a synteny structure. + parsed_symbol (bool, optional): if True, strand info '>' is parsed + as 'pos' and '<' as 'neg'. Defaults to True. + + Returns: + list[str]: parsed synteny structure as a list of tuples containing + HMM name and strand info for each HMM group. + """ + links = synteny_structure.strip().split() + if not links: + logger.error("Invalid format for synteny structure") + sys.exit(1) + return [ + split_strand_from_locus(h, parsed_symbol)[0] for h in links if not h.isdigit() + ] + + +def get_maximum_distances_in_structure(synteny_structure: str) -> list[int]: + """Get maximum gene distances in synteny structure.""" + links = synteny_structure.strip().split() + if not links: + logger.error("Invalid format for synteny structure") + sys.exit(1) + return [int(dist) for dist in links if dist.isdigit()] + + +def parse_synteny_structure(synteny_structure: str) -> dict: + """Parse synteny structure string. + + Args: + synteny_structure (str): a string like the following: + >hmm_a n_ab ' indicates a hmm target located on the positive strand, + '<' a target located on the negative strand, and n_ab cooresponds + to the maximum number of genes separating matched gene a and b. + Multiple hmms may be employed (limited by computational capabilities). + No order symbol in a hmm indicates that results should be independent + of strand location. + + Returns: + dict: parsed synteny structure. + """ + max_dists = get_maximum_distances_in_structure(synteny_structure) + hmm_groups = get_HMM_groups_in_structure(synteny_structure) + strands = get_strands_in_structure(synteny_structure) + return {"hmm_groups": hmm_groups, "strands": strands, "distances": max_dists} + + +def parse_genes_in_synteny_structure( + synteny_structure: str, hmm_meta: Path +) -> tuple[str, dict]: + """Convert gene-based synteny structure into a HMM-based one. + If a gene symbol matches more than one HMM, return a HMM group + like: (HMM1 | HMM2 | ...). + + Args: + synteny_structure (str): a string like the following: + >hmm_a n_ab ' indicates a hmm target located on the positive strand, + '<' a target located on the negative strand, and n_ab cooresponds + to the maximum number of genes separating matched gene a and b. + Multiple hmms may be employed (limited by computational capabilities). + No order symbol in a hmm indicates that results should be independent + of strand location. + hmm_meta (Path): path to PGAP's metadata file. + + Returns: + tuple[str,dict]: parsed synteny structure where gene symbols are replaced + by HMM names. + """ + pgap = PGAP(hmm_meta) + gene_symbols = get_gene_symbols_in_structure(synteny_structure) + strand_locs = get_strands_in_structure(synteny_structure, parsed_symbol=False) + gene_dists = get_maximum_distances_in_structure(synteny_structure) + hmm_groups = { + gene_symbol: pgap.get_HMM_group_for_gene_symbol(gene_symbol) + for gene_symbol in gene_symbols + } + unmatched_genes = [ + gene_id for gene_id, hmm_group in hmm_groups.items() if not hmm_group + ] + if unmatched_genes: + logger.error( + f"These genes did not get a HMM match in database: {unmatched_genes}" + ) + sys.exit(1) + + hmm_synteny_struc = "" + + for strand, dist, hmm_group in zip( + strand_locs, [""] + gene_dists, hmm_groups.values() + ): + if "|" in hmm_group: + hmm_group = f"({hmm_group})" + hmm_synteny_struc += f"{dist} {strand}{hmm_group} " + + return hmm_synteny_struc.strip(), hmm_groups diff --git a/pynteny/preprocessing.py b/pynteny/preprocessing.py index daf2a63..21a6122 100644 --- a/pynteny/preprocessing.py +++ b/pynteny/preprocessing.py @@ -26,70 +26,66 @@ logger = logging.getLogger(__name__) -class RecordSequence: - """Tools to process nucleotide or peptide sequences""" - - @staticmethod - def remove_stop_sodon_signals(record_seq: str) -> str: - """Remove stop codon signals from peptide sequence - - Args: - record_seq (str): peptide sequence. - - Returns: - str: a peptide sequence without stop codon symbols. - """ - return record_seq.replace("*", "") - - @staticmethod - def is_legit_peptide_sequence(record_seq: str) -> bool: - """Assert that peptide sequence only contains valid symbols. - - Args: - record_seq (str): peptide sequence. - - Returns: - bool: whether peptide sequence only contains legit symbols. - """ - aas = { - "A", - "C", - "D", - "E", - "F", - "G", - "H", - "I", - "K", - "L", - "M", - "N", - "P", - "Q", - "R", - "S", - "T", - "V", - "W", - "Y", - "*", - } - seq_symbols = {s.upper() for s in record_seq} - return seq_symbols.issubset(aas) - - @staticmethod - def is_legit_DNA_sequence(record_seq: str) -> bool: - """Assert that DNA sequence only contains valid symbols. - - Args: - record_seq (str): nucleotide sequence. - - Returns: - bool: whether nucleotide sequence only contains legit symbols. - """ - nts = {"A", "G", "T", "C", "N"} - seq_symbols = {s.upper() for s in record_seq} - return seq_symbols.issubset(nts) +def remove_stop_sodon_signals(record_seq: str) -> str: + """Remove stop codon signals from peptide sequence + + Args: + record_seq (str): peptide sequence. + + Returns: + str: a peptide sequence without stop codon symbols. + """ + return record_seq.replace("*", "") + + +def is_legit_peptide_sequence(record_seq: str) -> bool: + """Assert that peptide sequence only contains valid symbols. + + Args: + record_seq (str): peptide sequence. + + Returns: + bool: whether peptide sequence only contains legit symbols. + """ + aas = { + "A", + "C", + "D", + "E", + "F", + "G", + "H", + "I", + "K", + "L", + "M", + "N", + "P", + "Q", + "R", + "S", + "T", + "V", + "W", + "Y", + "*", + } + seq_symbols = {s.upper() for s in record_seq} + return seq_symbols.issubset(aas) + + +def is_legit_DNA_sequence(record_seq: str) -> bool: + """Assert that DNA sequence only contains valid symbols. + + Args: + record_seq (str): nucleotide sequence. + + Returns: + bool: whether nucleotide sequence only contains legit symbols. + """ + nts = {"A", "G", "T", "C", "N"} + seq_symbols = {s.upper() for s in record_seq} + return seq_symbols.issubset(nts) class FASTA: @@ -200,9 +196,9 @@ def remove_corrupted_sequences( if output_file is None: output_file = Path(dirname) / f"{fname}_modified{ext}" if is_peptide: - isLegitSequence = RecordSequence.is_legit_peptide_sequence + isLegitSequence = is_legit_peptide_sequence else: - isLegitSequence = RecordSequence.is_legit_DNA_sequence + isLegitSequence = is_legit_DNA_sequence fasta = pyfastx.Fasta( self.file_path.as_posix(), build_index=False, full_name=True @@ -210,7 +206,7 @@ def remove_corrupted_sequences( with open(output_file, "w+") as outfile: for record_name, record_seq in fasta: if is_peptide and (not keep_stop_codon): - record_seq = RecordSequence.remove_stop_sodon_signals(record_seq) + record_seq = remove_stop_sodon_signals(record_seq) if isLegitSequence(record_seq): outfile.write(f">{record_name}\n{record_seq}\n") if point_to_new_file: diff --git a/pynteny/subcommands.py b/pynteny/subcommands.py index c28b142..8133a01 100644 --- a/pynteny/subcommands.py +++ b/pynteny/subcommands.py @@ -12,8 +12,9 @@ from pathlib import Path import wget -from pynteny.filter import SyntenyHits, SyntenyParser, filter_FASTA_by_synteny_structure +from pynteny.filter import SyntenyHits, filter_FASTA_by_synteny_structure from pynteny.hmm import PGAP +import pynteny.parsers.syntenyparser as syntenyparser from pynteny.utils import CommandArgs, ConfigParser, is_tar_file, terminal_execute from pynteny.preprocessing import Database @@ -55,8 +56,8 @@ def synteny_search(args) -> SyntenyHits: sys.exit(1) config = ConfigParser.get_default_config() - args.synteny_struc = SyntenyParser.reformat_synteny_structure(args.synteny_struc) - if not SyntenyParser.is_valid_structure(args.synteny_struc): + args.synteny_struc = syntenyparser.reformat_synteny_structure(args.synteny_struc) + if not syntenyparser.is_valid_structure(args.synteny_struc): logger.error( ( f"Invalid synteny structure format: {args.synteny_struc}. " @@ -86,7 +87,7 @@ def synteny_search(args) -> SyntenyHits: ( gene_synteny_struc, gene_to_hmm_group, - ) = SyntenyParser.parse_genes_in_synteny_structure( + ) = syntenyparser.parse_genes_in_synteny_structure( synteny_structure=args.synteny_struc, hmm_meta=args.hmm_meta ) args.synteny_struc = gene_synteny_struc @@ -101,7 +102,7 @@ def synteny_search(args) -> SyntenyHits: else: hmm_dir = args.hmm_dir - hmm_names = SyntenyParser.get_all_HMMs_in_structure(args.synteny_struc) + hmm_names = syntenyparser.get_all_HMMs_in_structure(args.synteny_struc) input_hmms = [ file for file in hmm_dir.iterdir() @@ -192,7 +193,7 @@ def parse_gene_ids(args) -> str: ( gene_synteny_struc, gene_to_hmm_group, - ) = SyntenyParser.parse_genes_in_synteny_structure( + ) = syntenyparser.parse_genes_in_synteny_structure( synteny_structure=args.synteny_struc, hmm_meta=args.hmm_meta ) logger.info( diff --git a/tests/test_parser.py b/tests/test_parser.py index 7d247d3..0a7e43f 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -7,7 +7,9 @@ import unittest from pathlib import Path -from pynteny.filter import LabelParser, SyntenyParser + +import pynteny.parsers.labelparser as labelparser +import pynteny.parsers.syntenyparser as syntenyparser this_file_dir = Path(__file__).parent @@ -18,7 +20,7 @@ def test_parse(self): label = ( "Afifella_marina_BN_126_MMP03080610__FMVW01000002.1_552_0584494_0585393_pos" ) - parsed_dict = LabelParser.parse(label) + parsed_dict = labelparser.parse(label) self.assertDictEqual( parsed_dict, { @@ -38,19 +40,19 @@ class TestSyntenyParser(unittest.TestCase): def test_is_valid_structure(self): self.assertTrue( - SyntenyParser.is_valid_structure(self.syn_struct), + syntenyparser.is_valid_structure(self.syn_struct), "Failed to assess correct synteny structure format", ) def test_split_strand_from_locus(self): locus_str = "