-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #13 from Oemercetin06/mlst
Added MLST feature
- Loading branch information
Showing
8 changed files
with
708 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
""" Module for utility functions used in other modules regarding MLST. """ | ||
|
||
__author__ = "Cetin, Oemer" | ||
|
||
import requests | ||
import json | ||
from io import StringIO | ||
from pathlib import Path | ||
from Bio import SeqIO | ||
from src.xspect.definitions import get_xspect_model_path, get_xspect_runs_path | ||
|
||
def create_fasta_files(locus_path:Path, fasta_batch:str): | ||
"""Create Fasta-Files for every allele of a locus.""" | ||
# fasta_batch = full string of a fasta file containing every allele sequence of a locus | ||
for record in SeqIO.parse(StringIO(fasta_batch), "fasta"): | ||
number = record.id.split("_")[-1] # example id = Oxf_cpn60_263 | ||
output_fasta_file = locus_path / f"Allele_ID_{number}.fasta" | ||
if output_fasta_file.exists(): continue # Ignore existing ones | ||
with (open(output_fasta_file, "w") as allele): | ||
SeqIO.write(record, allele, "fasta") | ||
|
||
def pick_species_number_from_db(available_species:dict) -> str: | ||
"""Returns the chosen species from all available ones in the database.""" | ||
# The "database" string can look like this: pubmlst_abaumannii_seqdef | ||
for counter, database in available_species.items(): | ||
print(str(counter) + ":" + database.split("_")[1]) | ||
print("\nPick one of the above databases") | ||
while True: | ||
try: | ||
choice = input("Choose a species by selecting the corresponding number:") | ||
if int(choice) in available_species.keys(): | ||
chosen_species = available_species.get(int(choice)) | ||
return chosen_species | ||
else: | ||
print("Wrong input! Try again with a number that is available in the list above.") | ||
except ValueError: | ||
print("Wrong input! Try again with a number that is available in the list above.") | ||
|
||
def pick_scheme_number_from_db(available_schemes:dict) -> str: | ||
"""Returns the chosen schemes from all available ones of a species.""" | ||
# List all available schemes of a species database | ||
for counter, scheme in available_schemes.items(): | ||
print(str(counter) + ":" + scheme[0]) | ||
print("\nPick any available scheme that is listed for download") | ||
while True: | ||
try: | ||
choice = input("Choose a scheme by selecting the corresponding number:") | ||
if int(choice) in available_schemes.keys(): | ||
chosen_scheme = available_schemes.get(int(choice))[1] | ||
return chosen_scheme | ||
else: | ||
print("Wrong input! Try again with a number that is available in the above list.") | ||
except ValueError: | ||
print("Wrong input! Try again with a number that is available in the above list.") | ||
|
||
def scheme_list_to_dict(scheme_list:list[str]): | ||
"""Converts the scheme list attribute into a dictionary with a number as the key.""" | ||
return dict(zip(range(1, len(scheme_list) + 1), scheme_list)) | ||
|
||
def pick_scheme_from_models_dir() -> Path: | ||
"""Returns the chosen scheme from models that have been fitted prior.""" | ||
schemes = {}; counter = 1 | ||
for entry in sorted((get_xspect_model_path() / "MLST").iterdir()): | ||
schemes[counter] = entry | ||
counter += 1 | ||
return pick_scheme(schemes) | ||
|
||
def pick_scheme(available_schemes:dict) -> Path: | ||
"""Returns the chosen scheme from the scheme list.""" | ||
if not available_schemes: | ||
raise ValueError("No scheme has been chosen for download yet!") | ||
|
||
if len(available_schemes.items()) == 1: | ||
return next(iter(available_schemes.values())) | ||
|
||
# List available schemes | ||
for counter, scheme in available_schemes.items(): | ||
# For Strain Typing with an API-POST Request to the db | ||
if str(scheme).startswith("http"): | ||
scheme_json = requests.get(scheme).json() | ||
print(str(counter) + ":" + scheme_json["description"]) | ||
|
||
# To pick a scheme after download for fitting | ||
else: | ||
print(str(counter) + ":" + str(scheme).split("/")[-1]) | ||
|
||
print("\nPick a scheme for strain type prediction") | ||
while True: | ||
try: | ||
choice = input("Choose a scheme by selecting the corresponding number:") | ||
if int(choice) in available_schemes.keys(): | ||
chosen_scheme = available_schemes.get(int(choice)) | ||
return chosen_scheme | ||
else: | ||
print("Wrong input! Try again with a number that is available in the above list.") | ||
except ValueError: | ||
print("Wrong input! Try again with a number that is available in the above list.") | ||
|
||
class MlstResult: | ||
"""Class for storing mlst results.""" | ||
def __init__( | ||
self, | ||
scheme_model:str, | ||
steps:int, | ||
hits: dict[str,list[dict]], | ||
): | ||
self.scheme_model = scheme_model | ||
self.steps = steps | ||
self.hits = hits | ||
|
||
def get_results(self) -> dict: | ||
"""Stores the result of a prediction in a dictionary.""" | ||
results = { | ||
seq_id: result | ||
for seq_id, result in self.hits.items() | ||
} | ||
return results | ||
|
||
def to_dict(self) -> dict: | ||
"""Converts all attributes into one dictionary.""" | ||
result = { | ||
"Scheme":self.scheme_model, | ||
"Steps":self.steps, | ||
"Results": self.get_results() | ||
} | ||
return result | ||
|
||
def save(self, display:str, file_path:Path) -> None: | ||
"""Saves the result inside the "runs" directory""" | ||
file_name = str(file_path).split("/")[-1] | ||
json_path = get_xspect_runs_path() / "MLST" / f"{file_name}-{display}.json" | ||
json_path.parent.mkdir(exist_ok=True, parents=True) | ||
json_object = json.dumps(self.to_dict(), indent=4) | ||
|
||
with open(json_path, "w", encoding="utf-8") as file: | ||
file.write(json_object) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
"""Module for connecting with the PubMLST database via API requests and downloading allele files.""" | ||
|
||
__author__ = "Cetin, Oemer" | ||
|
||
import requests | ||
import json | ||
from src.xspect.mlst_feature.mlst_helper import ( | ||
create_fasta_files, | ||
pick_species_number_from_db, | ||
pick_scheme_number_from_db, | ||
pick_scheme, | ||
scheme_list_to_dict | ||
) | ||
from src.xspect.definitions import ( | ||
get_xspect_mlst_path, | ||
get_xspect_upload_path | ||
) | ||
|
||
class PubMLSTHandler: | ||
"""Class for communicating with PubMLST and downloading alleles (FASTA-Format) from all loci.""" | ||
base_url = "http://rest.pubmlst.org/db" | ||
|
||
def __init__(self): | ||
# Default values: Oxford (1) and Pasteur (2) schemes of A.baumannii species | ||
self.scheme_list = [ | ||
self.base_url + "/pubmlst_abaumannii_seqdef/schemes/1", | ||
self.base_url + "/pubmlst_abaumannii_seqdef/schemes/2" | ||
] | ||
self.scheme_paths = [] | ||
|
||
def get_scheme_paths(self) -> dict: | ||
"""Returns the scheme paths in a dictionary""" | ||
return scheme_list_to_dict(self.scheme_paths) | ||
|
||
def choose_schemes(self) -> None: | ||
"""Changes the scheme list attribute to feature other schemes from some species""" | ||
available_species = {}; available_schemes = {}; chosen_schemes = []; counter = 1 | ||
# retrieve all available species | ||
species_url = PubMLSTHandler.base_url | ||
for species_databases in requests.get(species_url).json(): | ||
for database in species_databases["databases"]: | ||
if database["name"].endswith("seqdef"): | ||
available_species[counter] = database["name"] | ||
counter += 1 | ||
# pick a species out of the available ones | ||
chosen_species = pick_species_number_from_db(available_species) | ||
|
||
counter = 1 | ||
scheme_url = f"{species_url}/{chosen_species}/schemes" | ||
for scheme in requests.get(scheme_url).json()["schemes"]: | ||
# scheme["description"] stores the name of a scheme. | ||
# scheme["scheme"] stores the URL that is needed for downloading all loci. | ||
available_schemes[counter] = [scheme["description"], scheme["scheme"]] | ||
counter += 1 | ||
|
||
# Selection process of available scheme from a species for download (doubles are caught!) | ||
while True: | ||
chosen_scheme = pick_scheme_number_from_db(available_schemes) | ||
chosen_schemes.append(chosen_scheme) if chosen_scheme not in chosen_schemes else None | ||
choice = input("Do you want to pick another scheme to download? (y/n):").lower() | ||
if choice != "y": | ||
break | ||
self.scheme_list = chosen_schemes | ||
|
||
def download_alleles(self, choice:False): | ||
"""Downloads every allele FASTA-file from all loci of the scheme list attribute""" | ||
if choice: # pick an own scheme if not Oxford or Pasteur | ||
self.choose_schemes() # changes the scheme_list attribute | ||
|
||
for scheme in self.scheme_list: | ||
scheme_json = requests.get(scheme).json() | ||
# We only want the name and the respective featured loci of a scheme | ||
scheme_name = scheme_json["description"] | ||
locus_list = scheme_json["loci"] | ||
|
||
species_name = scheme.split("_")[1] # name = pubmlst_abaumannii_seqdef | ||
scheme_path = get_xspect_mlst_path() / species_name / scheme_name | ||
self.scheme_paths.append(scheme_path) | ||
|
||
for locus_url in locus_list: | ||
# After using split the last part ([-1]) of the url is the locus name | ||
locus_name = locus_url.split("/")[-1] | ||
locus_path = get_xspect_mlst_path() / species_name / scheme_name / locus_name | ||
|
||
if not locus_path.exists(): | ||
locus_path.mkdir(exist_ok=True, parents=True) | ||
|
||
alleles = requests.get(f"{locus_url}/alleles_fasta").text | ||
create_fasta_files(locus_path,alleles) | ||
|
||
def assign_strain_type_by_db(self): | ||
"""Sends an API-POST-Request to the database for MLST without bloom filters""" | ||
scheme_url = str(pick_scheme(scheme_list_to_dict(self.scheme_list))) + "/sequence" | ||
fasta_file = get_xspect_upload_path() / "Test.fna" | ||
with open(fasta_file, 'r') as file: | ||
data = file.read() | ||
payload = { # Essential API-POST-Body | ||
"sequence": data, | ||
"filetype": "fasta", | ||
} | ||
response = requests.post(scheme_url, data=json.dumps(payload)).json() | ||
|
||
for locus, meta_data in response["exact_matches"].items(): | ||
# meta_data is a list containing a dictionary, therefore [0] and then key value. | ||
# Example: 'Pas_fusA': [{'href': some URL, 'allele_id': '2'}] | ||
print(locus + ":" + meta_data[0]["allele_id"], end= "; ") | ||
print("\nStrain Type:", response["fields"]) |
Oops, something went wrong.