From 759ecd10a4c83929a0b0ffb277416732952e3d0b Mon Sep 17 00:00:00 2001 From: Adrian Romberg Date: Mon, 20 Jan 2025 14:59:02 +0100 Subject: [PATCH] fix code style --- src/xspect/definitions.py | 1 + src/xspect/main.py | 16 ++-- src/xspect/mlst_feature/mlst_helper.py | 75 +++++++++++------- src/xspect/mlst_feature/pub_mlst_handler.py | 52 ++++++++----- .../models/probabilistic_filter_mlst_model.py | 76 ++++++++++++------- 5 files changed, 137 insertions(+), 83 deletions(-) diff --git a/src/xspect/definitions.py b/src/xspect/definitions.py index 75b1176..6035c19 100644 --- a/src/xspect/definitions.py +++ b/src/xspect/definitions.py @@ -41,6 +41,7 @@ def get_xspect_runs_path(): runs_path.mkdir(exist_ok=True, parents=True) return runs_path + def get_xspect_mlst_path(): """Return the path to the XspecT runs directory.""" mlst_path = get_xspect_root_path() / "mlst" diff --git a/src/xspect/main.py b/src/xspect/main.py index b0764a1..6f47a83 100644 --- a/src/xspect/main.py +++ b/src/xspect/main.py @@ -15,12 +15,14 @@ get_xspect_runs_path, fasta_endings, fastq_endings, - get_xspect_model_path + get_xspect_model_path, ) from xspect.pipeline import ModelExecution, Pipeline, PipelineStep from src.xspect.mlst_feature.mlst_helper import pick_scheme, pick_scheme_from_models_dir from src.xspect.mlst_feature.pub_mlst_handler import PubMLSTHandler -from src.xspect.models.probabilistic_filter_mlst_model import ProbabilisticFilterMlstSchemeModel +from src.xspect.models.probabilistic_filter_mlst_model import ( + ProbabilisticFilterMlstSchemeModel, +) @click.group() @@ -125,6 +127,7 @@ def train(genus, bf_assembly_path, svm_assembly_path, svm_step): except ValueError as e: raise click.ClickException(str(e)) from e + @cli.command() @click.option( "-c", @@ -143,21 +146,20 @@ def mlst_train(choose_schemes): species_name = str(scheme_path).split("/")[-2] scheme_name = str(scheme_path).split("/")[-1] model = ProbabilisticFilterMlstSchemeModel( - 31, - f"{species_name}:{scheme_name}", - get_xspect_model_path() + 31, f"{species_name}:{scheme_name}", get_xspect_model_path() ) click.echo("Creating mlst model") model.fit(scheme_path) model.save() click.echo(f"Saved at {model.cobs_path}") + @cli.command() @click.option( "-p", "--path", help="Path to FASTA-file for mlst identification.", - type=click.Path(exists=True, dir_okay=True, file_okay=True) + type=click.Path(exists=True, dir_okay=True, file_okay=True), ) def mlst_classify(path): """Download alleles and train bloom filters.""" @@ -168,10 +170,12 @@ def mlst_classify(path): model.predict(scheme_path, path).save(model.model_display_name, path) click.echo(f"Run saved at {get_xspect_runs_path()}.") + @cli.command() def api(): """Open the XspecT FastAPI.""" uvicorn.run(fastapi.app, host="0.0.0.0", port=8000) + if __name__ == "__main__": cli() diff --git a/src/xspect/mlst_feature/mlst_helper.py b/src/xspect/mlst_feature/mlst_helper.py index 2ba48b1..3b45013 100644 --- a/src/xspect/mlst_feature/mlst_helper.py +++ b/src/xspect/mlst_feature/mlst_helper.py @@ -9,17 +9,20 @@ from Bio import SeqIO from src.xspect.definitions import get_xspect_model_path, get_xspect_runs_path -def create_fasta_files(locus_path:Path, fasta_batch:str): + +def create_fasta_files(locus_path: Path, fasta_batch: str): """Create Fasta-Files for every allele of a locus.""" # fasta_batch = full string of a fasta file containing every allele sequence of a locus for record in SeqIO.parse(StringIO(fasta_batch), "fasta"): - number = record.id.split("_")[-1] # example id = Oxf_cpn60_263 + number = record.id.split("_")[-1] # example id = Oxf_cpn60_263 output_fasta_file = locus_path / f"Allele_ID_{number}.fasta" - if output_fasta_file.exists(): continue # Ignore existing ones - with (open(output_fasta_file, "w") as allele): + if output_fasta_file.exists(): + continue # Ignore existing ones + with open(output_fasta_file, "w") as allele: SeqIO.write(record, allele, "fasta") -def pick_species_number_from_db(available_species:dict) -> str: + +def pick_species_number_from_db(available_species: dict) -> str: """Returns the chosen species from all available ones in the database.""" # The "database" string can look like this: pubmlst_abaumannii_seqdef for counter, database in available_species.items(): @@ -32,11 +35,16 @@ def pick_species_number_from_db(available_species:dict) -> str: chosen_species = available_species.get(int(choice)) return chosen_species else: - print("Wrong input! Try again with a number that is available in the list above.") + print( + "Wrong input! Try again with a number that is available in the list above." + ) except ValueError: - print("Wrong input! Try again with a number that is available in the list above.") + print( + "Wrong input! Try again with a number that is available in the list above." + ) + -def pick_scheme_number_from_db(available_schemes:dict) -> str: +def pick_scheme_number_from_db(available_schemes: dict) -> str: """Returns the chosen schemes from all available ones of a species.""" # List all available schemes of a species database for counter, scheme in available_schemes.items(): @@ -49,23 +57,31 @@ def pick_scheme_number_from_db(available_schemes:dict) -> str: chosen_scheme = available_schemes.get(int(choice))[1] return chosen_scheme else: - print("Wrong input! Try again with a number that is available in the above list.") + print( + "Wrong input! Try again with a number that is available in the above list." + ) except ValueError: - print("Wrong input! Try again with a number that is available in the above list.") + print( + "Wrong input! Try again with a number that is available in the above list." + ) -def scheme_list_to_dict(scheme_list:list[str]): + +def scheme_list_to_dict(scheme_list: list[str]): """Converts the scheme list attribute into a dictionary with a number as the key.""" return dict(zip(range(1, len(scheme_list) + 1), scheme_list)) + def pick_scheme_from_models_dir() -> Path: """Returns the chosen scheme from models that have been fitted prior.""" - schemes = {}; counter = 1 + schemes = {} + counter = 1 for entry in sorted((get_xspect_model_path() / "MLST").iterdir()): schemes[counter] = entry counter += 1 return pick_scheme(schemes) -def pick_scheme(available_schemes:dict) -> Path: + +def pick_scheme(available_schemes: dict) -> Path: """Returns the chosen scheme from the scheme list.""" if not available_schemes: raise ValueError("No scheme has been chosen for download yet!") @@ -92,17 +108,23 @@ def pick_scheme(available_schemes:dict) -> Path: chosen_scheme = available_schemes.get(int(choice)) return chosen_scheme else: - print("Wrong input! Try again with a number that is available in the above list.") + print( + "Wrong input! Try again with a number that is available in the above list." + ) except ValueError: - print("Wrong input! Try again with a number that is available in the above list.") + print( + "Wrong input! Try again with a number that is available in the above list." + ) + class MlstResult: """Class for storing mlst results.""" + def __init__( - self, - scheme_model:str, - steps:int, - hits: dict[str,list[dict]], + self, + scheme_model: str, + steps: int, + hits: dict[str, list[dict]], ): self.scheme_model = scheme_model self.steps = steps @@ -110,22 +132,19 @@ def __init__( def get_results(self) -> dict: """Stores the result of a prediction in a dictionary.""" - results = { - seq_id: result - for seq_id, result in self.hits.items() - } + results = {seq_id: result for seq_id, result in self.hits.items()} return results def to_dict(self) -> dict: """Converts all attributes into one dictionary.""" result = { - "Scheme":self.scheme_model, - "Steps":self.steps, - "Results": self.get_results() + "Scheme": self.scheme_model, + "Steps": self.steps, + "Results": self.get_results(), } return result - def save(self, display:str, file_path:Path) -> None: + def save(self, display: str, file_path: Path) -> None: """Saves the result inside the "runs" directory""" file_name = str(file_path).split("/")[-1] json_path = get_xspect_runs_path() / "MLST" / f"{file_name}-{display}.json" @@ -133,4 +152,4 @@ def save(self, display:str, file_path:Path) -> None: json_object = json.dumps(self.to_dict(), indent=4) with open(json_path, "w", encoding="utf-8") as file: - file.write(json_object) \ No newline at end of file + file.write(json_object) diff --git a/src/xspect/mlst_feature/pub_mlst_handler.py b/src/xspect/mlst_feature/pub_mlst_handler.py index fef3ef8..e250aa0 100644 --- a/src/xspect/mlst_feature/pub_mlst_handler.py +++ b/src/xspect/mlst_feature/pub_mlst_handler.py @@ -9,22 +9,21 @@ pick_species_number_from_db, pick_scheme_number_from_db, pick_scheme, - scheme_list_to_dict -) -from src.xspect.definitions import ( - get_xspect_mlst_path, - get_xspect_upload_path + scheme_list_to_dict, ) +from src.xspect.definitions import get_xspect_mlst_path, get_xspect_upload_path + class PubMLSTHandler: """Class for communicating with PubMLST and downloading alleles (FASTA-Format) from all loci.""" + base_url = "http://rest.pubmlst.org/db" def __init__(self): # Default values: Oxford (1) and Pasteur (2) schemes of A.baumannii species self.scheme_list = [ self.base_url + "/pubmlst_abaumannii_seqdef/schemes/1", - self.base_url + "/pubmlst_abaumannii_seqdef/schemes/2" + self.base_url + "/pubmlst_abaumannii_seqdef/schemes/2", ] self.scheme_paths = [] @@ -34,7 +33,10 @@ def get_scheme_paths(self) -> dict: def choose_schemes(self) -> None: """Changes the scheme list attribute to feature other schemes from some species""" - available_species = {}; available_schemes = {}; chosen_schemes = []; counter = 1 + available_species = {} + available_schemes = {} + chosen_schemes = [] + counter = 1 # retrieve all available species species_url = PubMLSTHandler.base_url for species_databases in requests.get(species_url).json(): @@ -56,16 +58,22 @@ def choose_schemes(self) -> None: # Selection process of available scheme from a species for download (doubles are caught!) while True: chosen_scheme = pick_scheme_number_from_db(available_schemes) - chosen_schemes.append(chosen_scheme) if chosen_scheme not in chosen_schemes else None - choice = input("Do you want to pick another scheme to download? (y/n):").lower() + ( + chosen_schemes.append(chosen_scheme) + if chosen_scheme not in chosen_schemes + else None + ) + choice = input( + "Do you want to pick another scheme to download? (y/n):" + ).lower() if choice != "y": break self.scheme_list = chosen_schemes - def download_alleles(self, choice:False): + def download_alleles(self, choice: False): """Downloads every allele FASTA-file from all loci of the scheme list attribute""" - if choice: # pick an own scheme if not Oxford or Pasteur - self.choose_schemes() # changes the scheme_list attribute + if choice: # pick an own scheme if not Oxford or Pasteur + self.choose_schemes() # changes the scheme_list attribute for scheme in self.scheme_list: scheme_json = requests.get(scheme).json() @@ -73,28 +81,32 @@ def download_alleles(self, choice:False): scheme_name = scheme_json["description"] locus_list = scheme_json["loci"] - species_name = scheme.split("_")[1] # name = pubmlst_abaumannii_seqdef + species_name = scheme.split("_")[1] # name = pubmlst_abaumannii_seqdef scheme_path = get_xspect_mlst_path() / species_name / scheme_name self.scheme_paths.append(scheme_path) for locus_url in locus_list: # After using split the last part ([-1]) of the url is the locus name locus_name = locus_url.split("/")[-1] - locus_path = get_xspect_mlst_path() / species_name / scheme_name / locus_name + locus_path = ( + get_xspect_mlst_path() / species_name / scheme_name / locus_name + ) if not locus_path.exists(): locus_path.mkdir(exist_ok=True, parents=True) alleles = requests.get(f"{locus_url}/alleles_fasta").text - create_fasta_files(locus_path,alleles) + create_fasta_files(locus_path, alleles) def assign_strain_type_by_db(self): """Sends an API-POST-Request to the database for MLST without bloom filters""" - scheme_url = str(pick_scheme(scheme_list_to_dict(self.scheme_list))) + "/sequence" + scheme_url = ( + str(pick_scheme(scheme_list_to_dict(self.scheme_list))) + "/sequence" + ) fasta_file = get_xspect_upload_path() / "Test.fna" - with open(fasta_file, 'r') as file: + with open(fasta_file, "r") as file: data = file.read() - payload = { # Essential API-POST-Body + payload = { # Essential API-POST-Body "sequence": data, "filetype": "fasta", } @@ -103,5 +115,5 @@ def assign_strain_type_by_db(self): for locus, meta_data in response["exact_matches"].items(): # meta_data is a list containing a dictionary, therefore [0] and then key value. # Example: 'Pas_fusA': [{'href': some URL, 'allele_id': '2'}] - print(locus + ":" + meta_data[0]["allele_id"], end= "; ") - print("\nStrain Type:", response["fields"]) \ No newline at end of file + print(locus + ":" + meta_data[0]["allele_id"], end="; ") + print("\nStrain Type:", response["fields"]) diff --git a/src/xspect/models/probabilistic_filter_mlst_model.py b/src/xspect/models/probabilistic_filter_mlst_model.py index d8d8cdd..aea7a7a 100644 --- a/src/xspect/models/probabilistic_filter_mlst_model.py +++ b/src/xspect/models/probabilistic_filter_mlst_model.py @@ -1,4 +1,5 @@ """Probabilistic filter MLST model for sequence data""" + __author__ = "Cetin, Oemer" import cobs_index @@ -12,14 +13,16 @@ from src.xspect.file_io import get_record_iterator from src.xspect.mlst_feature.mlst_helper import MlstResult + class ProbabilisticFilterMlstSchemeModel: """Probabilistic filter MLST scheme model for sequence data""" + def __init__( - self, - k: int, - model_display_name: str, - base_path: Path, - fpr: float = 0.001, + self, + k: int, + model_display_name: str, + base_path: Path, + fpr: float = 0.001, ) -> None: if k < 1: raise ValueError("Invalid k value, must be greater than 0") @@ -50,17 +53,19 @@ def to_dict(self) -> dict: "loci": self.loci, } - def get_cobs_index_path(self, scheme:str, locus:str) -> Path: + def get_cobs_index_path(self, scheme: str, locus: str) -> Path: """Returns the path to the cobs index""" # To differentiate from genus and species models cobs_path = self.base_path / f"{scheme}" cobs_path.mkdir(exist_ok=True, parents=True) return cobs_path / f"{locus}.cobs_compact" - def fit(self, scheme_path:Path) -> None: + def fit(self, scheme_path: Path) -> None: """Trains a COBS structure for every locus with all its alleles""" if not scheme_path.exists(): - raise ValueError("Scheme not found. Please make sure to download the schemes prior!") + raise ValueError( + "Scheme not found. Please make sure to download the schemes prior!" + ) scheme = str(scheme_path).split("/")[-1] cobs_path = "" @@ -87,15 +92,18 @@ def fit(self, scheme_path:Path) -> None: # Creates COBS data structure for each locus cobs_path = self.get_cobs_index_path(scheme, locus) - cobs_index.compact_construct_list(doclist,str(cobs_path),index_params) + cobs_index.compact_construct_list(doclist, str(cobs_path), index_params) # Saves COBS-file inside the "indices" attribute self.indices.append(cobs_index.Search(str(cobs_path))) - self.scheme_path = scheme_path; self.cobs_path = cobs_path.parent + self.scheme_path = scheme_path + self.cobs_path = cobs_path.parent def save(self) -> None: """Saves the model to disk""" - scheme = str(self.scheme_path).split("/")[-1] # [-1] -> contains the scheme name + scheme = str(self.scheme_path).split("/")[ + -1 + ] # [-1] -> contains the scheme name json_path = self.base_path / scheme / f"{scheme}.json" json_object = json.dumps(self.to_dict(), indent=4) @@ -124,7 +132,7 @@ def load(scheme_path: Path) -> "ProbabilisticFilterMlstSchemeModel": for entry in sorted(json_path.parent.iterdir()): if not entry.exists(): raise FileNotFoundError(f"Index file not found at {entry}") - if str(entry).endswith(".json"): # only COBS-files + if str(entry).endswith(".json"): # only COBS-files continue model.indices.append(cobs_index.Search(str(entry), False)) return model @@ -132,9 +140,7 @@ def load(scheme_path: Path) -> "ProbabilisticFilterMlstSchemeModel": def calculate_hits(self, path: Path, sequence: Seq, step: int = 1) -> list[dict]: """Calculates the hits for a sequence""" if not isinstance(sequence, Seq): - raise ValueError( - "Invalid sequence, must be a Bio.Seq object" - ) + raise ValueError("Invalid sequence, must be a Bio.Seq object") if not len(sequence) > self.k: raise ValueError("Invalid sequence, must be longer than k") @@ -149,7 +155,9 @@ def calculate_hits(self, path: Path, sequence: Seq, step: int = 1) -> list[dict] file_name = str(entry).split("/")[-1] # file_name = locus scheme_path_list.append(file_name.split(".")[0]) # without the file ending - result_dict = {}; highest_results = {}; counter = 0 + result_dict = {} + highest_results = {} + counter = 0 # split the sequence in parts based on sequence length if len(sequence) >= 10000: for index in self.indices: @@ -159,7 +167,8 @@ def calculate_hits(self, path: Path, sequence: Seq, step: int = 1) -> list[dict] for split in split_sequence: res = index.search(split, step=step) split_result = self.get_cobs_result(res) - if not split_result: continue + if not split_result: + continue cobs_results.append(split_result) all_counts = defaultdict(int) @@ -167,7 +176,9 @@ def calculate_hits(self, path: Path, sequence: Seq, step: int = 1) -> list[dict] for name, value in result.items(): all_counts[name] += value - sorted_counts = dict(sorted(all_counts.items(), key=lambda item: -item[1])) + sorted_counts = dict( + sorted(all_counts.items(), key=lambda item: -item[1]) + ) first_key = next(iter(sorted_counts)) highest_result = sorted_counts[first_key] result_dict[scheme_path_list[counter]] = sorted_counts @@ -175,15 +186,19 @@ def calculate_hits(self, path: Path, sequence: Seq, step: int = 1) -> list[dict] counter += 1 else: for index in self.indices: - res = index.search(str(sequence), step=step) # COBS can't handle Seq-Objects + res = index.search( + str(sequence), step=step + ) # COBS can't handle Seq-Objects result_dict[scheme_path_list[counter]] = self.get_cobs_result(res) - highest_results[scheme_path_list[counter]] = self.get_highest_cobs_result(res) + highest_results[scheme_path_list[counter]] = ( + self.get_highest_cobs_result(res) + ) counter += 1 return [{"Strain type": highest_results}, {"All results": result_dict}] def predict( self, - cobs_path:Path, + cobs_path: Path, sequence_input: ( SeqRecord | list[SeqRecord] @@ -191,14 +206,16 @@ def predict( | SeqIO.QualityIO.FastqPhredIterator | Path ), - step:int = 1 + step: int = 1, ) -> MlstResult: """Returns scores for the sequence(s) based on the filters in the model""" if isinstance(sequence_input, SeqRecord): if sequence_input.id == "": sequence_input.id = "test" - hits = {sequence_input.id: self.calculate_hits(cobs_path, sequence_input.seq)} - return MlstResult(self.model_display_name,step,hits) + hits = { + sequence_input.id: self.calculate_hits(cobs_path, sequence_input.seq) + } + return MlstResult(self.model_display_name, step, hits) if isinstance(sequence_input, Path): return ProbabilisticFilterMlstSchemeModel.predict( @@ -207,12 +224,12 @@ def predict( if isinstance( sequence_input, - (SeqIO.FastaIO.FastaIterator, SeqIO.QualityIO.FastqPhredIterator) + (SeqIO.FastaIO.FastaIterator, SeqIO.QualityIO.FastqPhredIterator), ): hits = {} # individual_seq is a SeqRecord-Object for individual_seq in sequence_input: - individual_hits = self.calculate_hits(cobs_path,individual_seq.seq) + individual_hits = self.calculate_hits(cobs_path, individual_seq.seq) hits[individual_seq.id] = individual_hits return MlstResult(self.model_display_name, step, hits) @@ -252,10 +269,11 @@ def sequence_splitter(self, input_sequence: str, allele_len: int) -> list[str]: else: substring_length = allele_len * 100 - substring_list = []; start = 0 + substring_list = [] + start = 0 while start + substring_length <= sequence_len: - substring_list.append(input_sequence[start:start + substring_length]) + substring_list.append(input_sequence[start : start + substring_length]) start += substring_length - self.k + 1 # To not lose kmers when dividing # The remaining string is either appended to the list or added to the last entry. @@ -266,4 +284,4 @@ def sequence_splitter(self, input_sequence: str, allele_len: int) -> list[str]: substring_list[-1] += remaining_substring else: substring_list.append(remaining_substring) - return substring_list \ No newline at end of file + return substring_list