diff --git a/nanomotif/_version.py b/nanomotif/_version.py index a0dfd87..e3446b1 100644 --- a/nanomotif/_version.py +++ b/nanomotif/_version.py @@ -1 +1 @@ -__version__ = "0.4.12" \ No newline at end of file +__version__ = "0.4.13" \ No newline at end of file diff --git a/nanomotif/argparser.py b/nanomotif/argparser.py index a68befa..6d46947 100644 --- a/nanomotif/argparser.py +++ b/nanomotif/argparser.py @@ -30,6 +30,8 @@ def create_parser(): parser_shared_find_motifs.add_argument("--threshold_valid_coverage", type=int, default=5, help="minimum valid base coverage for a position to be considered. Default: %(default)s") parser_shared_find_motifs.add_argument("--minimum_kl_divergence", type=float, default=0.05, help="minimum KL-divergence for a position to considered for expansion in motif search. Higher value means less exhaustive, but faster search. Default: %(default)s") parser_shared_find_motifs.add_argument("--min_motifs_contig", type=int, default=20, help="minimum number of times a motif has to have been oberserved in a contig. Default: %(default)s") + parser_shared_find_motifs.add_argument("--read_level_methylation", action="store_true", help="If specified, methylation is calculated on read level instead of contig level. This is slower but produces more stable motifs.") + parser_shared_find_motifs.add_argument("--min_motif_score", type=float, default=0.2, help="minimum score for a motif to be kept after identification considered valid. Default: %(default)s") parser_find_motifs = subparsers.add_parser( 'find_motifs', parents=[parser_positional, parser_optional, parser_shared_find_motifs], diff --git a/nanomotif/bin_consensus.py b/nanomotif/bin_consensus.py index 48c267e..f17266c 100644 --- a/nanomotif/bin_consensus.py +++ b/nanomotif/bin_consensus.py @@ -43,7 +43,7 @@ def convert_motifs_to_regex(motifs): #contig_motifs = nm.postprocess.remove_sub_motifs(motifs_scored_filt) contig_motifs = motifs_scored_filt.with_columns( - pl.col("motif").apply(lambda x: nm.seq.regex_to_iupac(x)).alias("motif"), + pl.col("motif").map_elements(lambda x: nm.seq.regex_to_iupac(x), return_dtype = pl.Utf8).alias("motif"), (pl.col("n_mod") / (pl.col("n_mod") + pl.col("n_nomod"))).alias("mean") ) bin_motifs = contig_motifs.groupby("bin", "motif", "mod_position", "mod_type") \ diff --git a/nanomotif/dataload.py b/nanomotif/dataload.py index ed4e2d2..58d804f 100644 --- a/nanomotif/dataload.py +++ b/nanomotif/dataload.py @@ -39,6 +39,22 @@ def load_pileup(path: str, threads: int = 1, min_fraction: float = 0): pileup = pileup.rename({"column_1":"contig", "column_2": "position", "column_4": "mod_type", "column_6": "strand", "column_11": "fraction_mod", "column_10":"Nvalid_cov"}) return Pileup(pileup) +def load_low_coverage_positions(path_pileup: str, threads: int = 1, min_coverage: float = 5): + """ + Load pileup file from path to pileup.bed output of modkit pileup + """ + pileup = ( + pl.scan_csv(path_pileup, separator = "\t", has_header = False) + .filter(pl.col("column_10") <= min_coverage) + .filter(pl.col("column_10") / (pl.col("column_10") + pl.col("column_17")) > 0.3) + .select(["column_1", "column_2","column_4", "column_6", "column_11", "column_10"]) + .with_columns(pl.col("column_11") / 100) + .collect() + ) + pileup = pileup.rename({"column_1":"contig", "column_2": "position", "column_4": "mod_type", "column_6": "strand", "column_11": "fraction_mod", "column_10":"Nvalid_cov"}) + return pileup + + def load_assembly(path: str): """ Load assembly from path to fasta file diff --git a/nanomotif/evaluate.py b/nanomotif/evaluate.py index 7b67732..99d1ae2 100644 --- a/nanomotif/evaluate.py +++ b/nanomotif/evaluate.py @@ -12,7 +12,7 @@ from nanomotif.constants import * from nanomotif.model import BetaBernoulliModel from nanomotif.utils import subseq_indices, calculate_match_length -from nanomotif.seq import EqualLengthDNASet, DNAsequence +from nanomotif.seq import EqualLengthDNASet, DNAsequence, DNAarray from nanomotif.candidate import Motif, MotifTree from nanomotif.logger import configure_logger from nanomotif.parallel import update_progress_bar @@ -21,13 +21,17 @@ import networkx as nx import warnings import time - +from typing import Generator, Optional, List, Tuple ########################################## # Motif candidate scoring ########################################## -def methylated_motif_occourances(motif, sequence, methylated_positions) -> tuple: +def methylated_motif_occourances( + motif, + sequence, + methylated_positions + ) -> tuple: """ Get occourances of a motif in a contig @@ -53,7 +57,13 @@ def methylated_motif_occourances(motif, sequence, methylated_positions) -> tuple return meth_occurences, nonmeth_occurences -def motif_model_contig(pileup, contig: str, motif, save_motif_positions=False): +def motif_model_contig( + pileup, + contig: str, + motif, + save_motif_positions=False, + na_positions = None + ): """ Get the posterior for a single motif. Uses number of methylated motifs as methylation count. @@ -75,6 +85,14 @@ def motif_model_contig(pileup, contig: str, motif, save_motif_positions=False): index_meth_fwd, index_nonmeth_fwd = methylated_motif_occourances(motif_stripped, contig, meth_positions_fwd) index_meth_rev, index_nonmeth_rev = methylated_motif_occourances(motif_stripped.reverse_compliment(), contig, meth_positions_rev) + if na_positions is not None: + na_positions_fwd = na_positions["fwd"] + index_meth_fwd = np.setdiff1d(index_meth_fwd, na_positions_fwd) + index_nonmeth_fwd = np.setdiff1d(index_nonmeth_fwd, na_positions_fwd) + na_positions_rev = na_positions["rev"] + index_meth_rev = np.setdiff1d(index_meth_rev, na_positions_rev) + index_nonmeth_rev = np.setdiff1d(index_nonmeth_rev, na_positions_rev) + model = BetaBernoulliModel() model.update(len(index_meth_fwd) + len(index_meth_rev), len(index_nonmeth_fwd) + len(index_nonmeth_rev)) @@ -90,7 +108,22 @@ def motif_model_contig(pileup, contig: str, motif, save_motif_positions=False): return model -def methylated_reads_counts(pileup: list, sequence: str, motif: str) -> tuple: +def methylated_reads_counts( + pileup: pl.DataFrame, + sequence: str, + motif: Motif + ) -> tuple: + """ + Get the number of methylated and non-methylated reads for a motif. + + Parameters: + - pileup (Pileup): The pileup to be processed. + - sequence (str): The sequence to be processed. + - motif (Motif): The motif to be processed. + + Returns: + - tuple: A tuple of two integers, the first containing the number of methylated reads, the second containing the number of non-methylated reads. + """ motif_index = subseq_indices(motif.string, sequence) + motif.mod_position pileup = pileup.filter(pl.col("position").is_in(motif_index)) @@ -102,7 +135,7 @@ def methylated_reads_counts(pileup: list, sequence: str, motif: str) -> tuple: return n_mod, n_nonmod -def motif_model_read(pileup, contig: str, motif): +def motif_model_read(pileup: pl.DataFrame, contig: str, motif: Motif) -> BetaBernoulliModel: """ Get the posterior for a single motif. Uses number of methylated reads as methylation count. @@ -134,94 +167,19 @@ def motif_model_read(pileup, contig: str, motif): # Motif candidate state space search ########################################## - - - -# -#def process_sample(assembly, pileup, -# max_candidate_size = 40, -# min_read_methylation_fraction = 0.8, -# min_valid_coverage = 1, -# min_kl_divergence = 0.1 -# ): -# """ -# Process a single sample -# -# Parameters: -# - assembly (Assembly): The assembly to be processed. -# - pileup (Pileup): The pileup to be processed. -# - max_candidate_size (int): The maximum size of the candidate motifs. -# - min_read_methylation_fraction (float): The minimum fraction of reads that must be methylated for a position to be considered methylated. -# - min_valid_coverage (int): The minimum number of reads that must cover a position for it to be considered valid. -# - min_kl_divergence (float): Early stopping criteria, if max KL-divergence falls below, stops building motif. -# - min_cdf_score (float): Minimum score of 1 - cdf(cdf_position) for a motif to be considered valid. -# - cdf_position (float): The position to evaluate the cdf at. -# - min_motif_frequency (int): Used to get minimum number of sequences to evaluate motif at. -# """ -# assert pileup is not None, "Pileup is None" -# assert len(pileup) > 0, "Pileup is empty" -# assert assembly is not None, "Assembly is None" -# assert max_candidate_size > 0, "max_candidate_size must be greater than 0" -# assert min_read_methylation_fraction >= 0 and min_read_methylation_fraction <= 1, "min_read_methylation_fraction must be between 0 and 1" -# assert min_valid_coverage >= 0, "min_valid_coverage must be greater than 0" -# assert min_kl_divergence >= 0, "min_kl_divergence must be greater than 0" -# -# padding = max_candidate_size // 2 -# result = [] -# pileup = pileup \ -# .filter(pl.col("Nvalid_cov") > min_valid_coverage) \ -# .filter(pl.col("fraction_mod") >= min_read_methylation_fraction) \ -# .sort("contig") -# total_tasks = pileup.select(pl.struct(["contig", "mod_type"]).n_unique()).item() -# -# # -# widgets=[ -# progressbar.Timer(), ' | ',progressbar.Counter(), ' of ', str(total_tasks),' - ', progressbar.Percentage(), ' ',progressbar.Bar(),' (', progressbar.ETA(), ') ' -# ] -# for (contig, modtype), subpileup in progressbar.progressbar(pileup.groupby(["contig", "mod_type"]), widgets=widgets, max_value=total_tasks, redirect_stdout=True): -# log.info(f"Processing {contig} {modtype}") -# -# contig_sequence = assembly.assembly[contig] -# index_plus = subpileup.filter(pl.col("strand") == "+").get_column("position").to_list() -# index_minus = subpileup.filter(pl.col("strand") == "-").get_column("position").to_list() -# if len(index_minus) <= 1 or len(index_plus) <= 1: -# log.info("Too few methylated positions") -# continue -# -# sequences_plus = contig_sequence.sample_at_indices(index_plus, padding) -# sequences_minus = contig_sequence.sample_at_indices(index_minus, padding).reverse_compliment() -# sequences = sequences_plus + sequences_minus -# sequences_array = sequences.convert_to_DNAarray() -# -# motif_graph, best_candidates = find_best_candidates( -# sequences_array, -# contig_sequence, -# subpileup, -# min_kl = min_kl_divergence -# ) -# identified_motifs = nxgraph_to_dataframe(motif_graph) \ -# .filter(col("sequence").is_in(best_candidates)) -# -# if len(identified_motifs) == 0: -# log.info("No motifs found") -# continue -# else: -# result.append(identified_motifs.with_columns( -# pl.lit(contig).alias("contig"), -# pl.lit(modtype).alias("mod_type") -# )) -# if len(result) == 0: -# return None -# motifs = pl.concat(result) \ -# .with_columns([ -# pl.col("sequence").apply(lambda motif: motif[count_periods_at_start(motif):len(motif)-count_periods_at_end(motif)]).alias("motif"), -# pl.col("sequence").apply(lambda motif: padding - count_periods_at_start(motif)).alias("mod_position") -# ]) -# return motifs -# -# - -def worker_function(args, counter, lock, assembly, min_kl_divergence, padding, minimum_methylation_fraction_confident, log_dir, verbose, seed): +def worker_function( + args, + counter, + lock, + assembly, + min_kl_divergence, + padding, + minimum_methylation_fraction_confident, + read_level_methylation, + log_dir, + verbose, + seed + ): """ Process a single subpileup for one contig and one modtype @@ -235,7 +193,13 @@ def worker_function(args, counter, lock, assembly, min_kl_divergence, padding, m """ set_seed(seed=seed) warnings.filterwarnings("ignore") - contig, modtype, subpileup = args + contig, modtype, subpileup, low_coverage_positions = args + + if low_coverage_positions is not None: + low_coverage_positions = { + "fwd": low_coverage_positions.filter(pl.col("strand") == "+")["position"].to_numpy(), + "rev": low_coverage_positions.filter(pl.col("strand") == "-")["position"].to_numpy() + } process_id = os.getpid() if log_dir is not None: @@ -243,7 +207,17 @@ def worker_function(args, counter, lock, assembly, min_kl_divergence, padding, m configure_logger(log_file, verbose=verbose) try: - result = process_subpileup(contig, modtype, subpileup, assembly, min_kl_divergence, padding, minimum_methylation_fraction_confident) + result = process_subpileup( + contig, + modtype, + subpileup, + assembly, + min_kl_divergence, + padding, + minimum_methylation_fraction_confident, + read_level_methylation, + na_positions = low_coverage_positions + ) with lock: counter.value += 1 return result @@ -253,10 +227,17 @@ def worker_function(args, counter, lock, assembly, min_kl_divergence, padding, m return None - - - -def process_subpileup(contig, modtype, subpileup, assembly, min_kl_divergence, padding, minimum_methylation_fraction_confident): +def process_subpileup( + contig, + modtype, + contig_pileup, + assembly, + min_kl_divergence, + padding, + minimum_methylation_fraction_confident, + read_level_methylation, + na_positions: dict = None + ): """ Process a single subpileup for one contig and one modtype @@ -269,24 +250,22 @@ def process_subpileup(contig, modtype, subpileup, assembly, min_kl_divergence, p - padding (int): The padding to use for the motif. """ log.info(f"Processing {contig} {modtype}") + assert contig_pileup is not None, "Subpileup is None" + assert len(contig_pileup) > 0, "Subpileup is empty" + assert assembly is not None, "Assembly is None" + assert min_kl_divergence >= 0, "min_kl_divergence must be greater than 0" + assert contig_pileup.get_column("mod_type").unique().to_list() == [modtype], "subpileup modtype does not match modtype" contig_sequence = assembly.assembly[contig] - subpileup_high_confident = subpileup.filter(pl.col("fraction_mod") >= minimum_methylation_fraction_confident) - index_plus = subpileup_high_confident.filter(pl.col("strand") == "+").get_column("position").to_list() - index_minus = subpileup_high_confident.filter(pl.col("strand") == "-").get_column("position").to_list() - if len(index_minus) <= 1 or len(index_plus) <= 1: - log.info("Too few methylated positions") - return None - - sequences_plus = contig_sequence.sample_at_indices(index_plus, padding) - sequences_minus = contig_sequence.sample_at_indices(index_minus, padding).reverse_compliment() - sequences = sequences_plus + sequences_minus - sequences_array = sequences.convert_to_DNAarray() motif_graph, best_candidates = find_best_candidates( - sequences_array, + contig_pileup, contig_sequence, - subpileup, + modtype, + minimum_methylation_fraction_confident, + padding, + na_positions = na_positions, + read_level_methylation = read_level_methylation, min_kl = min_kl_divergence, max_dead_ends = 25, max_rounds_since_new_best = 15 @@ -310,21 +289,23 @@ def process_subpileup(contig, modtype, subpileup, assembly, min_kl_divergence, p def process_sample_parallel( assembly, pileup, + low_coverage_positions = None, + read_level_methylation = False, threads = 2, search_frame_size = 40, threshold_methylation_confident = 0.8, - threshold_methylation_general = 0.5, - threshold_valid_coverage = 1, - minimum_kl_divergence = 0.2, + threshold_methylation_general = 0.7, + threshold_valid_coverage = 5, + minimum_kl_divergence = 0.05, verbose = False, log_dir = None, seed = None ): """ - Process a single sample + Process a sample Parameters: - - assembly (Assembly): The assembly to be processed. + - assembly (Assembly): The assembly of all contigs. - pileup (Pileup): The pileup to be processed. - max_candidate_size (int): The maximum size of the candidate motifs. - min_read_methylation_fraction (float): The minimum fraction of reads that must be methylated for a position to be considered methylated. @@ -349,11 +330,13 @@ def process_sample_parallel( # Filter pileup pileup = pileup \ .filter(pl.col("Nvalid_cov") > threshold_valid_coverage) \ - .filter(pl.col("fraction_mod") >= threshold_methylation_general) \ .sort("contig") # Create a list of tasks (TODO: not have a list of all data) - tasks = [(contig, modtype, subpileup) for (contig, modtype), subpileup in pileup.group_by(["contig", "mod_type"])] + if low_coverage_positions is not None: + tasks = [(contig, modtype, subpileup, low_coverage_positions.filter((pl.col("contig")==contig) & (pl.col("mod_type") == modtype))) for (contig, modtype), subpileup in pileup.group_by(["contig", "mod_type"])] + else: + tasks = [(contig, modtype, subpileup, None) for (contig, modtype), subpileup in pileup.group_by(["contig", "mod_type"])] # Create a progress manager manager = multiprocessing.Manager() @@ -371,7 +354,7 @@ def process_sample_parallel( results = pool.starmap(worker_function, [( task, counter, lock, - assembly, minimum_kl_divergence, padding, threshold_methylation_confident, + assembly, minimum_kl_divergence, padding, threshold_methylation_confident, read_level_methylation, log_dir, verbose, seed ) for task in tasks]) results = [result for result in results if result is not None] @@ -405,23 +388,55 @@ def process_sample_parallel( ######################################################################### # Motif candidate state space -def find_best_candidates(methylation_sequences, sequence, pileup, min_kl = 0.2, max_dead_ends = 25, max_rounds_since_new_best = 15): +def find_best_candidates( + contig_pileup: pl.DataFrame, + contig_sequence: DNAsequence, + mod_type: str, + minimum_methylation_fraction_confident: float, + padding: int, + na_positions = None, + read_level_methylation: bool = False, + min_kl: float = 0.2, + max_dead_ends: int = 25, + max_rounds_since_new_best: int = 15, + score_threshold: float = 0.2, + remaining_sequences_threshold: float = 0.01 + ) -> tuple[MotifTree, list[Motif]]: """ Find the best motif candidates in a sequence. Parameters: - - methylation_sequences (DNAarray): The methylation sequences extracted from the contig at methylated sites. - - sequence (DNAsequence): The contig to be processed. - - pileup (Pileup): The pileup to be processed. - - min_kl (float): The minimum KL-divergence for a position in the candidate to be expanded - - max_dead_ends (int): The maximum number of low scoring candidates before the search is terminated. + - contig_pileup (Pileup): The pileup to be processed. + - contig_sequence (DNAsequence): The sequence to be processed. + - mod_type (str): The modtype to be processed. + - minimum_methylation_fraction_confident (float): The minimum fraction of reads that must be methylated for a position to be considered confidently methylated and used in search. + - padding (int): The padding to use for the motif. + - min_kl (float): The minimum KL-divergence for a motif to be considered valid. + - max_dead_ends (int): The maximum number of low scoring candidates before stopping the search. + - max_rounds_since_new_best (int): The maximum number of rounds since a new best candidate was found before stopping the search. + - score_threshold (float): The minimum score for a candidate to be considered valid. + - remaining_sequences_threshold (float): The minimum fraction of sequences remaining before stopping the search. """ - padding = methylation_sequences.shape[1] // 2 + subpileup_confident = contig_pileup.filter(pl.col("fraction_mod") >= minimum_methylation_fraction_confident) + + # Extract the sequences for confidently methylated positions + index_plus = subpileup_confident.filter(pl.col("strand") == "+").get_column("position").to_list() + index_minus = subpileup_confident.filter(pl.col("strand") == "-").get_column("position").to_list() + + methylation_sequences_string = [] + if len(index_plus) >= 1: + methylation_sequences_string += contig_sequence.sample_at_indices(index_plus, padding).sequences + if len(index_minus) >= 1: + methylation_sequences_string += contig_sequence.sample_at_indices(index_minus, padding).reverse_compliment().sequences + if len(methylation_sequences_string) == 0: + log.info("No methylation sequences found") + return None + methylation_sequences = EqualLengthDNASet(methylation_sequences_string).convert_to_DNAarray() + total_sequences = methylation_sequences.shape[0] - mod_type = pileup.get_column("mod_type").unique().to_list()[0] - root_motif = Motif("." * padding + MOD_TYPE_TO_CANONICAL[mod_type] + "." * padding, padding) # Represent all possible motifs - methylation_sequences_subset = methylation_sequences.copy() + root_motif = Motif("." * padding + MOD_TYPE_TO_CANONICAL[mod_type] + "." * padding, padding) + methylation_sequences_clone = methylation_sequences.copy() best_candidates = [] continue_search = True dead_ends = 0 @@ -432,47 +447,37 @@ def find_best_candidates(methylation_sequences, sequence, pileup, min_kl = 0.2, log.debug("Stopping search, too many low scoring candidates") break # Find the initial guess within the tree - motif_graph, naive_guess = a_star_search( + searcher = MotifSearcher( root_motif, - sequence, - pileup, - methylation_sequences_subset, + contig_sequence, + contig_pileup, + methylation_sequences_clone, + padding, + read_level_methylation = read_level_methylation, + na_positions = na_positions, motif_graph = motif_graph, min_kl = min_kl, max_rounds_since_new_best = max_rounds_since_new_best ) + motif_graph, naive_guess = searcher.run() # If there is no naive guess, stop the search if naive_guess == root_motif: log.debug("No naive guess found, stopping search") break - guess = naive_guess - # next_guess = naive_guess - # while guess != next_guess: - # # Find the best guess within the subtree of the naive guess - # guess = next_guess - # motif_graph, next_guess = a_star_search( - # guess, - # sequence, - # pileup, - # methylation_sequences_subset, - # motif_graph = motif_graph, - # min_kl = min_kl, - # max_rounds_since_new_best = 5 - # ) # Remove new candidate from methylation sequences - seq_before = methylation_sequences_subset.shape[0] - methylation_sequences_subset = methylation_sequences_subset.filter_sequence_matches(naive_guess.one_hot(), keep_matches = False) - if methylation_sequences_subset is None: + seq_before = methylation_sequences.shape[0] + methylation_sequences_clone = methylation_sequences_clone.filter_sequence_matches(naive_guess.one_hot(), keep_matches = False) + if methylation_sequences_clone is None: log.debug("No more sequences left") break # Check if we should continue the search - seq_remaining = methylation_sequences_subset.shape[0] + seq_remaining = methylation_sequences_clone.shape[0] seq_remaining_percent = seq_remaining/total_sequences - if motif_graph.nodes[naive_guess]["score"] < 0.1: + if motif_graph.nodes[naive_guess]["score"] < score_threshold: dead_ends += 1 log.debug(f"Candidate has low score, {naive_guess}. {dead_ends} of {max_dead_ends} before temination") continue @@ -481,96 +486,81 @@ def find_best_candidates(methylation_sequences, sequence, pileup, min_kl = 0.2, best_candidates.append(naive_guess) - if (seq_remaining/total_sequences) < 0.01: + if (seq_remaining/total_sequences) < remaining_sequences_threshold: log.debug("Stopping search, too few sequences remaining") break log.debug("Continuing search") return motif_graph, best_candidates - -def motif_child_nodes_kl_dist_max(motif, meth_pssm, contig_pssm, freq_threshold=0.25, min_kl=0.1): - kl_divergence = entropy(meth_pssm, contig_pssm) - split_motif = motif.split() - - evaluated = np.array([i for i, base in enumerate(split_motif) if base != "."]) - kl_divergence[evaluated] = 0 - if np.max(kl_divergence) < min_kl: - return - - pos = np.where(kl_divergence == np.max(kl_divergence))[0][0] - - # Methylation frequency most be above contig frequency - index_meth_frequncies_highest = meth_pssm[:, pos] > contig_pssm[:, pos] - - # Methylation frequency most be above a threshold - index_meth_frequncies_above_threshold = meth_pssm[:, pos] > freq_threshold - - # Combine the two filters - index_position_filt = np.logical_and(index_meth_frequncies_highest, index_meth_frequncies_above_threshold) - bases_index = np.argwhere(index_position_filt).reshape(-1) - bases_filt = [BASES[int(i)] for i in bases_index] - - # All combination of the bases - combinations = [] - for i in range(1, min(len(bases_filt)+1, 4)): - combinations += list(itertools.combinations(bases_filt, i)) - - for base in combinations: - if len(base) > 1: - base = "[" + "".join(list(base)) + "]" - else: - base = "".join(list(base)) - new_motif = split_motif[:pos] + [base] + split_motif[pos+1:] - yield Motif("".join(new_motif), motif.mod_position) - - -def motif_child_nodes_kl_dist_prune(motif, meth_pssm, contig_pssm, min_kl=0.1, freq_threshold=0.35): - kl_divergence = entropy(meth_pssm, contig_pssm) - split_motif = motif.split() - - for pos in np.where(kl_divergence > min_kl)[0]: - if split_motif[pos] != ".": - continue - - # Methylation frequency most be above contig frequency - index_meth_frequncies_highest = meth_pssm[:, pos] > contig_pssm[:, pos] - - # Methylation frequency most be above a threshold - index_meth_frequncies_above_threshold = meth_pssm[:, pos] > freq_threshold - - # Combine the two filters - index_position_filt = np.logical_and(index_meth_frequncies_highest, index_meth_frequncies_above_threshold) - bases_index = np.argwhere(index_position_filt).reshape(-1) - bases_filt = [BASES[int(i)] for i in bases_index] - - # All combination of the bases - combinations = [] - for i in range(1, min(len(bases_filt)+1, 4)): - combinations += list(itertools.combinations(bases_filt, i)) - - for base in combinations: - if len(base) > 1: - base = "[" + "".join(list(base)) + "]" - else: - base = "".join(list(base)) - new_motif = split_motif[:pos] + [base] + split_motif[pos+1:] - yield Motif("".join(new_motif), motif.mod_position) -def a_star_search(root_motif, contig, pileup, methylation_sequences, - motif_graph = None, min_kl = 0.1, max_rounds_since_new_best = 10, max_motif_length = 18): +class MotifSearcher: """ - A* search for methylation motifs - - Parameters: - - root_motif (list): The root motif to start the search from. - - contig (str): The contig to be processed. - - pileup (Pileup): The pileup to be processed. - - methylation_sequences (DNAarray): The methylation sequences to be processed. + Class to perform motif search for identifying enriched motifs in methylated sequences. """ - - # Function for calculating the priority of a motif - def priority_function(next_model, root_model): + + def __init__( + self, + root_motif: Motif, + contig_sequence: DNAsequence, + contig_pileup: pl.DataFrame, + methylation_sequences: DNAarray, + padding: int, + read_level_methylation: bool = False, + na_positions = None, + motif_graph: Optional[MotifTree] = None, + min_kl: float = 0.1, + freq_threshold: float = 0.25, + max_rounds_since_new_best: int = 10, + max_motif_length: int = 18 + ): + """ + Initialize the MotifSearcher class. + + Parameters: + root_motif (Motif): The initial motif to start the search from. + contig_sequence: The contig sequence object with necessary methods. + contig_pileup: The pileup data associated with the contig. + methylation_sequences: The methylation sequences to be analyzed. + padding (int): The padding size for sequence sampling. + read_level_methylation (bool): Flag to use read-level methylation. + na_positions (Optional[List[int]]): Positions with 'N/A' or missing data. + motif_graph (Optional[MotifTree]): An optional pre-initialized motif graph. + min_kl (float): Minimum Kullback-Leibler divergence threshold. + freq_threshold (float): Frequency threshold for methylation. + max_rounds_since_new_best (int): Max iterations without improvement. + max_motif_length (int): Maximum allowed motif length. + """ + # Input validation + if contig_pileup.is_empty(): + raise ValueError("contig_pileup cannot be empty.") + if padding < 0: + raise ValueError("padding must be non-negative.") + + self.root_motif = root_motif + self.contig_sequence = contig_sequence + self.contig_pileup = contig_pileup + self.methylation_sequences = methylation_sequences + self.padding = padding + self.read_level_methylation = read_level_methylation + self.na_positions = na_positions + self.motif_graph = motif_graph or MotifTree() + self.min_kl = min_kl + self.freq_threshold = freq_threshold + self.max_rounds_since_new_best = max_rounds_since_new_best + self.max_motif_length = max_motif_length + + def _priority_function(self, next_model, root_model) -> float: + """ + Calculate the priority for a motif based on the change in model parameters. + + Parameters: + next_model: The model of the next motif. + root_model: The model of the root motif. + + Returns: + float: The calculated priority. + """ try: d_alpha = 1 - (next_model._alpha / root_model._alpha) except ZeroDivisionError: @@ -581,93 +571,239 @@ def priority_function(next_model, root_model): d_beta = 1 priority = d_alpha * d_beta return priority + + def _scoring_function(self, next_model, current_model) -> float: + """ + Calculate the score for a motif based on its model. + + Parameters: + next_model: The model of the next motif. + current_model: The model of the current motif. + + Returns: + float: The calculated score. + """ + mean_diff = next_model.mean() - current_model.mean() + try: + score = (next_model.mean() * -np.log10(next_model.standard_deviation())) * mean_diff + except (ValueError, ZeroDivisionError): + score = 0 + return score + + def _motif_child_nodes_kl_dist_max( + self, + motif: Motif, + meth_pssm: np.ndarray, + contig_pssm: np.ndarray + ) -> Generator[Motif, None, None]: + """ + Generate child motifs based on maximum KL divergence. + + Parameters: + motif (Motif): The current motif. + meth_pssm (np.ndarray): Position-specific scoring matrix for methylation. + contig_pssm (np.ndarray): Position-specific scoring matrix for the contig. + + Yields: + Motif: The next motif in the search. + """ + kl_divergence = entropy(meth_pssm, contig_pssm) + split_motif = motif.split() + + # Positions to evaluate (positions with '.') + evaluated_positions = np.array([i for i, base in enumerate(split_motif) if base == "."]) + if evaluated_positions.size == 0: + return + + kl_divergence_masked = kl_divergence.copy() + kl_divergence_masked[~np.isin(np.arange(len(split_motif)), evaluated_positions)] = 0 + + max_kl = np.max(kl_divergence_masked) + if max_kl < self.min_kl: + return + + pos = int(np.argmax(kl_divergence_masked)) + + # Methylation frequency must be above contig frequency + index_meth_freq_higher = meth_pssm[:, pos] > contig_pssm[:, pos] + + # Methylation frequency must be above a threshold + index_meth_freq_above_thresh = meth_pssm[:, pos] > self.freq_threshold + + # Combine the two filters + index_position_filter = np.logical_and(index_meth_freq_higher, index_meth_freq_above_thresh) + bases_indices = np.argwhere(index_position_filter).reshape(-1) + bases_filtered = [BASES[int(i)] for i in bases_indices] + + if not bases_filtered: + return + + # All combinations of the bases + max_combination_length = min(len(bases_filtered), 4) + for i in range(1, max_combination_length + 1): + for base_tuple in itertools.combinations(bases_filtered, i): + if len(base_tuple) > 1: + base_str = "[" + "".join(base_tuple) + "]" + else: + base_str = base_tuple[0] + new_motif_sequence = split_motif.copy() + new_motif_sequence[pos] = base_str + new_motif_str = "".join(new_motif_sequence) + yield Motif(new_motif_str, motif.mod_position) + + def run(self) -> tuple[MotifTree, list[Motif]]: + """ + Execute the motif search algorithm. + + Returns: + Tuple[MotifTree, Motif]: The graph of motifs explored and the best motif found. + """ + # Initialize variables + best_guess = self.root_motif + if self.read_level_methylation: + root_model = motif_model_read( + self.contig_pileup, self.contig_sequence.sequence, self.root_motif + ) + else: + root_model = motif_model_contig( + self.contig_pileup, + self.contig_sequence.sequence, + self.root_motif, + na_positions=self.na_positions + ) + best_score = self._scoring_function(root_model, root_model) + rounds_since_new_best = 0 + visited_nodes: set[Motif] = set() + + # Sample sequences in contig to get background for KL-divergence + contig_sequences_sample = self.contig_sequence.sample_n_subsequences( + self.padding * 2 + 1, 10000 + ) + contig_pssm = contig_sequences_sample.pssm() + + # Initialize the search tree + self.motif_graph.add_node( + self.root_motif, + model=root_model, + motif=self.root_motif, + visited=False, + score=best_score + ) + + # Initialize priority queue + priority_queue: list[tuple[float, Motif]] = [] + hq.heappush(priority_queue, (0, self.root_motif)) + + # Search loop + while priority_queue: + # Get the current best candidate + _, current_motif = hq.heappop(priority_queue) + if current_motif in visited_nodes: + continue + if len(current_motif.strip()) > self.max_motif_length: + log.debug( + f"{current_motif.string}, Skipping due to length > {self.max_motif_length}" + ) + continue + + current_model = self.motif_graph.nodes[current_motif]["model"] + visited_nodes.add(current_motif) + log.debug( + f"{current_motif.string} | Model: {current_model} | " + f"Score: {self.motif_graph.nodes[current_motif]['score']:.2f} | " + f"Queue size: {len(priority_queue)}" + ) + self.motif_graph.nodes[current_motif]["visited"] = True + rounds_since_new_best += 1 + + active_methylation_sequences = self.methylation_sequences.copy().filter_sequence_matches( + current_motif.one_hot(), keep_matches=True + ) + if active_methylation_sequences is None: + log.debug("No more sequences left after filtering") + continue + + # Generate and evaluate neighbor motifs + neighbors = list(self._motif_child_nodes_kl_dist_max( + current_motif, + active_methylation_sequences.pssm(), + contig_pssm + )) + + # Add neighbors to graph + for next_motif in neighbors: + if next_motif in self.motif_graph.nodes: + # Add only edge if motif already visited + next_model = self.motif_graph.nodes[next_motif]["model"] + score = self.motif_graph.nodes[next_motif]["score"] + self.motif_graph.add_edge(current_motif, next_motif) + else: + if self.read_level_methylation: + next_model = motif_model_read( + self.contig_pileup, + self.contig_sequence.sequence, + next_motif + ) + else: + next_model = motif_model_contig( + self.contig_pileup, + self.contig_sequence.sequence, + next_motif, + na_positions=self.na_positions + ) + + # Add neighbor to graph + self.motif_graph.add_node( + next_motif, + model=next_model, + motif=next_motif, + visited=False + ) + self.motif_graph.add_edge(current_motif, next_motif) + + score = self._scoring_function(next_model, current_model) + self.motif_graph.nodes[next_motif]["score"] = score + + # Add neighbor to priority queue if not visited + if next_motif not in visited_nodes: + priority = self._priority_function(next_model, current_model) + hq.heappush(priority_queue, (priority, next_motif)) + + if score > best_score: + best_score = score + best_guess = next_motif + rounds_since_new_best = 0 + + # Stopping criteria + if rounds_since_new_best >= self.max_rounds_since_new_best: + break + + return self.motif_graph, best_guess + + + + + + + + + + + + + + + + + + + + - # Function for scoring a motif - def scoring_function(next_model, current_model): - mean_diff = next_model.mean() - current_model.mean() - return (next_model.mean() * -np.log10(next_model.standard_deviation())) * mean_diff - # Search setup - padding = methylation_sequences.shape[1] // 2 - total_sequences = methylation_sequences.shape[0] - best_guess = root_motif - root_model = motif_model_contig(pileup, contig.sequence, root_motif) - best_score = scoring_function(root_model, root_model) - rounds_since_new_best = 0 - visisted_nodes = [] - - # Sample sequence in contig to get background for KL-divergence - contig_sequences = contig.sample_n_subsequences(padding*2 + 1, 10000) - contig_pssm = contig_sequences.pssm() - - # Initialize the search tree - if motif_graph is None: - motif_graph = MotifTree() - motif_graph.add_node(root_motif, model=root_model, motif=root_motif, visited=False, score=scoring_function(root_model, root_model)) - - # Initialize priority que - priority_que = [] - hq.heappush(priority_que, (0, root_motif)) - - # Search loop - while len(priority_que) > 0: - # Get the current best candidate - current = hq.heappop(priority_que)[1] - if current in visisted_nodes: - continue - if len(current.strip()) > max_motif_length: - log.debug(f"{current}, Skipping scoring and expansion due to length (>{max_motif_length})") - continue - current_model = motif_graph.nodes[current]["model"] - visisted_nodes.append(current) - log.debug(f"{''.join(current)} | {current_model} | {motif_graph.nodes[current]['score']:.2f} | {len(priority_que)}") - motif_graph.nodes[current]["visited"] = True - rounds_since_new_best += 1 - active_methylation_sequences = methylation_sequences.copy().filter_sequence_matches(current.one_hot(), keep_matches = True) - if active_methylation_sequences is None: - log.debug("No more sequences left") - continue - # Prune the neibhors of the current candidate - neighbors = list(motif_child_nodes_kl_dist_max( - current, - active_methylation_sequences.pssm(), - contig_pssm - )) - - # Add neighbors to graph - for next in neighbors: - if next in motif_graph.nodes: - # Add only edge if motif -> next - next_model = motif_graph.nodes[next]["model"] - score = motif_graph.nodes[next]["score"] - motif_graph.add_edge(current, next) - else: - next_model = motif_model_contig(pileup, contig.sequence, next) - - # Add neighbor to graph - motif_graph.add_node(next, model=next_model, motif=next, visited=False) - motif_graph.add_edge(current, next) - - score = scoring_function(next_model, current_model) - motif_graph.nodes[next]["score"] = score - - # Add neighbor to priority que if has not been visited in this search - if next not in visisted_nodes: - priority = priority_function(next_model, current_model) - hq.heappush(priority_que, (priority, next)) - - if score > best_score: - best_score = score - best_guess = next - rounds_since_new_best = 0 - - # Stopping criteria - if rounds_since_new_best >= max_rounds_since_new_best: - break - return motif_graph, best_guess def count_periods_at_start(s): @@ -695,9 +831,3 @@ def nxgraph_to_dataframe(graph): }).sort("score", descending=True) - -if __name__ == "__main__": - from nanomotif.dataload import load_assembly, load_pileup - assembly = load_assembly("data/ecoli/assembly.polished.fasta") - ecoli = load_pileup("data/ecoli/modkit.pileup.bed") - result = process_sample(assembly, ecoli.pileup, min_read_methylation_fraction = 0.80) diff --git a/nanomotif/main.py b/nanomotif/main.py index d14a340..765edb1 100644 --- a/nanomotif/main.py +++ b/nanomotif/main.py @@ -21,7 +21,7 @@ def shared_setup(args, working_dir): if not os.path.exists(args.out): os.makedirs(args.out) else: - log.warn(f"Output directory {args.out} already exists") + log.warning(f"Output directory {args.out} already exists") # Set up logging LOG_DIR = working_dir + "/logs" @@ -44,20 +44,41 @@ def shared_setup(args, working_dir): -def find_motifs(args, pileup = None, assembly = None): - # Run nanomotif +def find_motifs(args, pileup = None, assembly = None) -> pl.DataFrame: + """ + Nanomotif motif finder module + + Args: + args (argparse.Namespace): Arguments + pileup (pandas.DataFrame): Pileup data + assembly (nanomotif.Assembly): Assembly data + + Returns: + pandas.DataFrame: Motif data + """ log.info("Starting nanomotif motif finder") if pileup is None: log.info("Loading pileup") - pileup = nm.load_pileup(args.pileup, threads = args.threads, min_fraction = args.threshold_methylation_general) + if not os.path.exists(args.pileup): + log.error(f"File {args.pileup} does not exist") + return None + if args.read_level_methylation: + pileup = nm.load_pileup(args.pileup, threads = args.threads, min_fraction = 0) + else: + pileup = nm.load_pileup(args.pileup, threads = args.threads, min_fraction = args.threshold_methylation_general) + + # Load low coverage positions + low_coverage_positions = nm.load_low_coverage_positions(args.pileup, min_coverage = args.threshold_valid_coverage) if assembly is None: log.info("Loading assembly") assembly = nm.load_assembly(args.assembly) + assm_lengths = pl.DataFrame({ "contig": list(assembly.assembly.keys()), "length": [len(contig) for contig in assembly.assembly.values()] }) log.info("Filtering pileup") + # Filter pileup to contigs with mods, minimum 1 mod per 10kb contigs_with_mods = pileup.pileup \ .groupby(["contig", "mod_type"]) \ @@ -78,6 +99,8 @@ def find_motifs(args, pileup = None, assembly = None): log.info("Identifying motifs") motifs = nm.evaluate.process_sample_parallel( assembly, pileup, + low_coverage_positions = low_coverage_positions, + read_level_methylation = args.read_level_methylation, threads = args.threads, search_frame_size = args.search_frame_size, threshold_methylation_confident = args.threshold_methylation_confident, @@ -131,13 +154,14 @@ def save_motif_df(df, name): df = df.sort(["contig", "mod_type", "motif"]) df.write_csv(args.out + "/" + name + ".tsv", separator="\t") os.makedirs(args.out + "/precleanup-motifs/", exist_ok=True) + motifs.drop("model").write_csv(args.out + "/precleanup-motifs/motifs-raw-unformatted.tsv", separator="\t") save_motif_df(motifs, "precleanup-motifs/motifs-raw") log.info("Postprocessing motifs") motifs_file_name = "precleanup-motifs/motifs" log.info(" - Writing motifs") - motifs = motifs.filter(col("score") > 0.1) + motifs = motifs.filter(col("score") > args.min_motif_score) if len(motifs) == 0: log.info("No motifs found") return @@ -206,6 +230,9 @@ def score_motifs(args, pileup = None, assembly = None, motifs = None): if args.save_motif_positions: os.makedirs(args.out + "/motif-positions", exist_ok=True) + na_position = nm.load_low_coverage_positions(args.pileup, min_coverage = args.threshold_valid_coverage) + + pileup = pileup.pileup.filter(pl.col("fraction_mod") > args.threshold_methylation_general) # Ensure motif are iupac motifs.with_columns([ pl.col("motif").map_elements(lambda x: nm.seq.regex_to_iupac(x)).alias("motif") @@ -229,7 +256,8 @@ def score_motifs(args, pileup = None, assembly = None, motifs = None): log.info("Scoring motifs") scored_all = nm.scoremotifs.score_sample_parallel( - assembly, pileup.pileup, motifs, + assembly, pileup, motifs, + na_position = na_position, threads = args.threads, threshold_methylation_general = args.threshold_methylation_general, threshold_valid_coverage = 1, @@ -310,7 +338,10 @@ def motif_discovery(args): # Check if output directory exists log.info("Loading required files") - pileup = nm.load_pileup(args.pileup, threads = args.threads, min_fraction = args.threshold_methylation_general) + if args.read_level_methylation: + pileup = nm.load_pileup(args.pileup, threads = args.threads, min_fraction = 0) + else: + pileup = nm.load_pileup(args.pileup, threads = args.threads, min_fraction = args.threshold_methylation_general) assembly = nm.load_assembly(args.assembly) # Find motifs @@ -336,6 +367,7 @@ def check_install(args): log.info("Loading required files") args.out = "nanomotif_install_check" args.save_motif_positions = False + args.pileup = nm.datasets.geobacillus_plasmids_pileup_path() pileup = nm.datasets.geobacillus_plasmids_pileup() assembly = nm.datasets.geobacillus_plasmids_assembly() diff --git a/nanomotif/scoremotifs.py b/nanomotif/scoremotifs.py index 898f946..b31e94a 100644 --- a/nanomotif/scoremotifs.py +++ b/nanomotif/scoremotifs.py @@ -10,7 +10,8 @@ from nanomotif.logger import configure_logger from nanomotif.utils import concatenate_motif_position_files, clean_up_motif_positions_files import numpy as np -def score_contig_with_motifs(contig, mod_type, pileup, sequence, motifs, save_motif_positions=False, positions_outdir=None): +def score_contig_with_motifs(contig, mod_type, pileup, sequence, motifs, na_positions=None, save_motif_positions=False, positions_outdir=None): + log.info(f"Scoring motifs in contig {contig}, modtype {mod_type}") # Ensuring motifs are in regex for searching motifs = motifs\ @@ -40,6 +41,7 @@ def score_contig_with_motifs(contig, mod_type, pileup, sequence, motifs, save_mo pileup, sequence, candidate.Motif(motif, position), + na_positions=na_positions, save_motif_positions=save_motif_positions ) models.append(model) @@ -48,7 +50,8 @@ def score_contig_with_motifs(contig, mod_type, pileup, sequence, motifs, save_mo model = evaluate.motif_model_contig( pileup, sequence, - candidate.Motif(motif, position) + candidate.Motif(motif, position), + na_positions=na_positions ) models.append(model) @@ -89,13 +92,19 @@ def worker_function(args, counter, lock, sequence, motifs, log_dir, verbose, see """ warnings.filterwarnings("ignore") nm.seed.set_seed(seed) - contig, modtype, subpileup = args + contig, modtype, subpileup, na_positions = args + + if na_positions is not None: + na_positions = { + "fwd": na_positions.filter(pl.col("strand") == "+")["position"].to_numpy(), + "rev": na_positions.filter(pl.col("strand") == "-")["position"].to_numpy() + } process_id = os.getpid() log_file = f"{log_dir}/score-motifs.{process_id}.log" configure_logger(log_file, verbose=verbose) try: - result = score_contig_with_motifs(contig, modtype, subpileup, sequence, motifs, save_motif_positions=save_motif_positions, positions_outdir=positions_outdir) + result = score_contig_with_motifs(contig, modtype, subpileup, sequence, motifs, save_motif_positions=save_motif_positions, positions_outdir=positions_outdir, na_positions=na_positions) with lock: counter.value += 1 return result @@ -106,6 +115,7 @@ def worker_function(args, counter, lock, sequence, motifs, log_dir, verbose, see def score_sample_parallel( assembly, pileup, motifs, + na_position = None, threads = 2, threshold_methylation_general = 0.5, threshold_valid_coverage = 1, @@ -143,8 +153,10 @@ def score_sample_parallel( .sort("contig") # Create a list of tasks (TODO: not have a list of all data) - tasks = [(contig, modtype, subpileup) for (contig, modtype), subpileup in pileup.groupby(["contig", "mod_type"])] - + if na_position is not None: + tasks = [(contig, modtype, subpileup, na_position.filter((pl.col("contig")==contig) & (pl.col("mod_type") == modtype))) for (contig, modtype), subpileup in pileup.groupby(["contig", "mod_type"])] + else: + tasks = [(contig, modtype, subpileup, None) for (contig, modtype), subpileup in pileup.groupby(["contig", "mod_type"])] # Create a progress manager manager = multiprocessing.Manager() counter = manager.Value('i', 0) diff --git a/setup.py b/setup.py index 04256b0..461f38e 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ "scipy>=1.10.1", "networkx>=3.1", "pyarrow>=15.0.2", - "Bio==1.6.2", + "Bio==1.6.1", "snakemake==7.32.4", "progressbar2>=3.53.1" ], diff --git a/tests/test_motif_find.py b/tests/test_motif_find.py index 10c1d93..4bf3be3 100644 --- a/tests/test_motif_find.py +++ b/tests/test_motif_find.py @@ -1,4 +1,7 @@ from nanomotif.constants import * +from nanomotif.candidate import Motif +from nanomotif.seq import DNAsequence, DNAarray +from nanomotif.evaluate import MotifSearcher import nanomotif as nm import pytest from hypothesis import given, strategies as st @@ -22,20 +25,125 @@ def test_known_motif_sequence(self): np.testing.assert_array_equal(result[0], expected_methylated) np.testing.assert_array_equal(result[1], expected_nonmethylated) + def test_known_motif_sequence_no_methylated(self): + motif = nm.candidate.Motif('ACG', 0) + sequence = 'TACGGACGCCACG' + methylated_positions = np.array([]) + + expected_methylated = np.array([]) + expected_nonmethylated = np.array([1, 5, 10]) + + result = nm.evaluate.methylated_motif_occourances(motif, sequence, methylated_positions) + np.testing.assert_array_equal(result[0], expected_methylated) + np.testing.assert_array_equal(result[1], expected_nonmethylated) + + +class TestMotifSearcher: + def test_init_with_valid_inputs(self): + root_motif = Motif("ACGT", 0) + contig_sequence = DNAsequence("ACGTACGTACGT") + contig_pileup = pl.DataFrame({"position": [1,2,3], "value": [0.1, 0.2, 0.3]}) + methylation_sequences = DNAarray(["ACGT", "CGTA", "GTAC"]) + padding = 2 + + # Create MotifSearcher instance + searcher = MotifSearcher( + root_motif=root_motif, + contig_sequence=contig_sequence, + contig_pileup=contig_pileup, + methylation_sequences=methylation_sequences, + padding=padding + ) + + # Check that attributes are set correctly + assert searcher.root_motif == root_motif + assert searcher.contig_sequence == contig_sequence + assert searcher.contig_pileup.equals(contig_pileup) + assert np.array_equal(searcher.methylation_sequences, methylation_sequences) + assert searcher.padding == padding + + def test_priority_function_zero_division_beta(self): + root_motif = Motif("ACGT", 0) + contig_sequence = DNAsequence("ACGTACGTACGT") + contig_pileup = pl.DataFrame({"position": [1,2,3], "value": [0.1, 0.2, 0.3]}) + methylation_sequences = DNAarray(["ACGT"]) + padding = 2 + + searcher = MotifSearcher( + root_motif=root_motif, + contig_sequence=contig_sequence, + contig_pileup=contig_pileup, + methylation_sequences=methylation_sequences, + padding=padding + ) + + class MockModel: + def __init__(self, alpha, beta): + self._alpha = alpha + self._beta = beta + + next_model = MockModel(alpha=2, beta=3) + root_model = MockModel(alpha=4, beta=0) + + priority = searcher._priority_function(next_model, root_model) + + expected_d_alpha = 1 - (next_model._alpha / root_model._alpha) # 1 - (2/4) = 0.5 + expected_d_beta = 1 # ZeroDivisionError caught + expected_priority = expected_d_alpha * expected_d_beta # 0.5 * 1 = 0.5 + + assert priority == pytest.approx(expected_priority) + + def test_scoring_function(self): + root_motif = Motif("ACGT", 0) + contig_sequence = DNAsequence("ACGTACGTACGT") + contig_pileup = pl.DataFrame({"position": [1,2,3], "value": [0.1, 0.2, 0.3]}) + methylation_sequences = DNAarray(["ACGT"]) + padding = 2 + + searcher = MotifSearcher( + root_motif=root_motif, + contig_sequence=contig_sequence, + contig_pileup=contig_pileup, + methylation_sequences=methylation_sequences, + padding=padding + ) + + class MockModel: + def __init__(self, mean_value, std_dev): + self._mean = mean_value + self._std_dev = std_dev + + def mean(self): + return self._mean + + def standard_deviation(self): + return self._std_dev + + next_model = MockModel(mean_value=0.6, std_dev=0.1) + current_model = MockModel(mean_value=0.4, std_dev=0.2) + + score = searcher._scoring_function(next_model, current_model) + + mean_diff = next_model.mean() - current_model.mean() # 0.6 - 0.4 = 0.2 + expected_score = (next_model.mean() * -np.log10(next_model.standard_deviation())) * mean_diff + expected_score = 0.6 * 1 * 0.2 + + assert score == pytest.approx(expected_score) + class TestProcessSubpileup: def find_motifs(self): - pileup_path = nm.datasets.geobacillus_plasmids_pileup_path() - pileup = nm.dataload.load_pileup(pileup_path, min_fraction=0.5) - modtype = "a" - subpileup = pileup.pileup.filter(pl.col("mod_type") == modtype) - - assembly = nm.datasets.geobacillus_plasmids_assembly() contig = "contig_3" min_kl_divergence = 0.1 padding = 20 minimum_methylation_fraction_confident = 0.75 + modtype = "a" + + pileup_path = nm.datasets.geobacillus_plasmids_pileup_path() + pileup = nm.dataload.load_pileup(pileup_path, min_fraction=0.5) + subpileup = pileup.pileup.filter(pl.col("mod_type") == modtype) + assembly = nm.datasets.geobacillus_plasmids_assembly() - result = nm.evaluate.process_subpileup(contig, modtype ,subpileup, assembly, min_kl_divergence, padding, minimum_methylation_fraction_confident) + result = nm.evaluate.process_subpileup(contig, modtype, subpileup, assembly, min_kl_divergence, padding, minimum_methylation_fraction_confident, False) return result def test_no_errors(self): @@ -66,6 +174,7 @@ def run_function(self): assembly = nm.datasets.geobacillus_plasmids_assembly() return nm.evaluate.process_sample_parallel(assembly, pileup, threads=2, seed=1) + def test_no_errors(self): self.run_function()