-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate_encoder-decoder.py
More file actions
98 lines (88 loc) · 3.5 KB
/
generate_encoder-decoder.py
File metadata and controls
98 lines (88 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import pickle
import numpy as np
import sys
import queue
import tqdm
from helper.helper import assign_huffman_code
from helper.DNA_code_model import DNACodeTree, DNAEncoder, DNADecoder
def condition_check(s):
if len(s) < 3:
return True
if s[-3:] == "ATG":
return False
if len(s) < 5:
return True
if s[-5:] in {"TTGCA", "CCCAT"}:
return False
if s[-1] == s[-2] == s[-3] == s[-4] == s[-5]:
return False
if len(s) < 6:
return True
if s[-6:] == "TATAAT":
return False
return True
if __name__ == "__main__":
SYMBOLS = ["A", "C", "G", "T"]
mc_stochastics = pickle.load(open("DNA_stochastic.pkl", "rb"))
# normalize by row
for key in mc_stochastics:
total = sum(mc_stochastics[key].values())
for sym in mc_stochastics[key]:
mc_stochastics[key][sym] /= total
# Prefixing mcs with states of length < 6:
pending_states = queue.Queue()
# 0-length: ""
pending_states.put("")
while not pending_states.empty():
state = pending_states.get()
if len(state) == 6:
continue
valid_next_states = []
for sym in SYMBOLS:
proposed_next_state = state + sym
if condition_check(proposed_next_state):
valid_next_states.append(sym)
pending_states.put(proposed_next_state)
mc_stochastics[state] = {}
total_valid = len(valid_next_states)
for sym in valid_next_states:
mc_stochastics[state][sym] = 1.0 / total_valid
dnacodetree = DNACodeTree(list(mc_stochastics.keys())) #pickle pickle pickle
for s in mc_stochastics.keys():
code_dict = assign_huffman_code(mc_stochastics[s])
# print(s, code_dict)
dnacodetree.add_edges(s, code_dict)
DNA_Encoder = DNAEncoder(dnacodetree)
DNA_Decoder = DNADecoder(dnacodetree)
# sanity check
# test edge case: length 1
test_strs = ["A", "C", "G", "T"]
for test_str in test_strs:
encoded_str = DNA_Encoder.encode(test_str)
decoded_str = DNA_Decoder.decode(encoded_str)
assert decoded_str == test_str
# keep track of lengths of encoded vs original strings
original_lengths = []
encoded_lengths = []
for _ in tqdm.tqdm(range(1000), desc="Sanity checking encoder/decoder"):
test_str = ""
len_test_str = np.random.randint(100, 100000)
for _ in range(len_test_str):
rand_sym = SYMBOLS[np.random.randint(0, 4)]
proposed_next_str = test_str + rand_sym
while not(condition_check(proposed_next_str)):
rand_sym = SYMBOLS[np.random.randint(0, 4)]
proposed_next_str = test_str + rand_sym
test_str = proposed_next_str
encoded_str = DNA_Encoder.encode(test_str)
decoded_str = DNA_Decoder.decode(encoded_str)
assert decoded_str == test_str
original_lengths.append(len(test_str))
encoded_lengths.append(len(encoded_str))
print("Sanity check passed: Decoded string matches original string ;D.")
average_bits_per_symbol = sum([e / b for e, b in zip(encoded_lengths, original_lengths)]) / len(encoded_lengths)
print(f"Average bits per symbol (over random tests): {average_bits_per_symbol}")
with open("DNA_encoder.pkl", "wb") as f:
pickle.dump(DNA_Encoder, f)
with open("DNA_decoder.pkl", "wb") as f:
pickle.dump(DNA_Decoder, f)