-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdataloader.py
273 lines (244 loc) · 10.3 KB
/
dataloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
import glob
import os
import re
import hashlib
import torch
import torchaudio
from torch.utils.data import Dataset
import numpy as np
MAX_NUM_WAVS_PER_CLASS = 2 ** 27 - 1 # ~134M
def which_set(filename, validation_percentage, testing_percentage):
"""Determines which data partition the file should belong to.
We want to keep files in the same training, validation, or testing sets
even if new ones are added over time. This makes it less likely that
testing samples will accidentally be reused in training when long runs are
restarted for example. To keep this stability, a hash of the filename is
taken and used to determine which set it should belong to. This
determination only depends on the name and the set proportions, so it won't
change as other files are added.
It's also useful to associate particular files as related (for example
words spoken by the same person), so anything after '_nohash_' in a
filename is ignored for set determination. This ensures that
'bobby_nohash_0.wav' and 'bobby_nohash_1.wav' are always in the same set,
for example.
Args:
filename: File path of the data sample.
validation_percentage: How much of the data set to use for validation.
testing_percentage: How much of the data set to use for testing.
Returns:
String, one of 'training', 'validation', or 'testing'.
"""
base_name = os.path.basename(filename)
# We want to ignore anything after '_nohash_' in the file name when
# deciding which set to put a wav in, so the data set creator has a way of
# grouping wavs that are close variations of each other.
hash_name = re.sub(r"_nohash_.*$", "", base_name)
# This looks a bit magical, but we need to decide whether this file should
# go into the training, testing, or validation sets, and we want to keep
# existing files in the same set even if more files are subsequently
# added.
# To do that, we need a stable way of deciding based on just the file name
# itself, so we do a hash of that and then use that to generate a
# probability value that we use to assign it.
hash_name_hashed = hashlib.sha1(hash_name.encode("utf-8")).hexdigest()
percentage_hash = (
int(hash_name_hashed, 16) % (MAX_NUM_WAVS_PER_CLASS + 1)
) * (100.0 / MAX_NUM_WAVS_PER_CLASS)
if percentage_hash < validation_percentage:
result = "validation"
elif percentage_hash < (testing_percentage + validation_percentage):
result = "testing"
else:
result = "training"
return result
# todo add silence + background data
class SpeechCommandsGoogle(Dataset):
"""Google Speech Command Dataset configured from Hello Edge"""
def __init__(
self,
root_dir,
train_test_val,
val_perc,
test_perc,
words,
sample_rate,
batch_size,
epochs,
device,
background_volume,
background_frequency,
silence_percentage,
unknown_percentage,
time_shift_ms,
non_canonical_test=False,
transform=None,
):
self.sample_rate = sample_rate
self.root_dir = root_dir
self.transform = transform
self.train_test_val = train_test_val
self.val_perc = val_perc
self.test_perc = test_perc
self.words = words
self.device = device
self.batch_size = batch_size
self.epochs = epochs
self.non_canonical_test = non_canonical_test
self.background_volume = background_volume
self.background_frequency = background_frequency
self.silence_percentage = silence_percentage
self.unknown_percentage = unknown_percentage
self.time_shift_ms = (time_shift_ms * sample_rate) / 1000
self.noise = torch.distributions.bernoulli.Bernoulli(
torch.tensor([self.background_frequency])
)
self.list_of_x = []
self.list_of_labels = []
self.list_of_y = []
sub_dirs = [x[0].split("/")[-1] for x in os.walk(root_dir)][1:]
for cur_dir in sub_dirs:
files_in_dir = glob.glob(root_dir + "/" + cur_dir + "/" + "*.wav")
cur_dir = cur_dir.strip("_")
for cur_f in files_in_dir:
if cur_dir == "background_noise":
self.list_of_y.append(words.index("silence"))
self.list_of_labels.append("silence")
elif which_set(cur_f, val_perc, test_perc) == train_test_val:
if (cur_dir not in words) and (
not (
(train_test_val == "testing")
and not non_canonical_test
)
):
self.list_of_y.append(words.index("unknown"))
self.list_of_labels.append("unknown")
else:
self.list_of_y.append(words.index(cur_dir))
self.list_of_labels.append(cur_dir)
else:
continue
waveform, sample_rate = torchaudio.load(cur_f)
if sample_rate != self.sample_rate:
raise ValueError(
"Specified sample rate doesn't match sample rate in "
".wav file."
)
self.list_of_x.append(waveform)
self.list_of_y = np.array(self.list_of_y)
if (self.train_test_val == "validation") or (
(self.train_test_val == "testing") and self.non_canonical_test
):
self.size = int(
np.sum(np.unique(self.list_of_y, return_counts=True)[1][:10])
/ 0.8
)
self.deterministic_sampling = True
self.relevant_lable_list = np.where(self.list_of_y < 10)[0]
else:
self.deterministic_sampling = False
if self.train_test_val == "testing":
self.size = len(self.list_of_labels)
else:
self.size = int(self.batch_size * self.epochs)
def __len__(self):
return self.size
def __getitem__(self, idx):
if torch.is_tensor(idx):
idx = idx.tolist()
if (
(self.train_test_val == "testing")
and (not self.non_canonical_test)
and ("cough" not in self.root_dir)
):
# usig canonical testing set which is already balanced
waveform = self.list_of_x[idx]
else:
# balance training and validation samples
selector = idx / self.size
if selector < self.silence_percentage:
idx = np.random.choice(
np.argwhere(self.list_of_y == 11)[:, 0], 1
)
waveform = torch.zeros(1, self.sample_rate)
elif (selector >= self.silence_percentage) and (
selector < (self.silence_percentage + self.unknown_percentage)
):
idx = np.random.choice(
np.argwhere(self.list_of_y == 10)[:, 0], 1
)
waveform = self.list_of_x[idx.item()]
else:
if self.deterministic_sampling:
idx = self.relevant_lable_list[int(idx - self.size * 0.2)]
else:
y_sel = int(
np.floor(
((selector - 0.2) / 0.8 * (len(self.words) - 2))
)
)
idx = np.random.choice(
np.argwhere(self.list_of_y == y_sel)[:, 0], 1
)
waveform = self.list_of_x[idx.item()]
# random time shift
if self.time_shift_ms != 0:
start_idx = int(
np.random.choice(np.arange(0, self.time_shift_ms))
)
if (waveform.shape[1] - start_idx) >= self.sample_rate:
waveform = waveform[
0, start_idx : (waveform.shape[1] + start_idx)
].view(1, -1)
elif (waveform.shape[1] - start_idx) < self.sample_rate:
pad_size = int(
(self.sample_rate - (waveform.shape[1] - start_idx))
/ 2
)
zero_waveform = torch.zeros((1, self.sample_rate))
zero_waveform[
0,
pad_size : (
pad_size + (waveform.shape[1] - start_idx)
),
] = waveform[0, start_idx:]
waveform = zero_waveform
# sample noise
if self.noise.sample() and self.train_test_val == "training":
noise_wave = self.list_of_x[
np.random.choice(
np.argwhere(self.list_of_y == 11)[:, 0], 1
).item()
]
start_noise = int(
np.random.choice(
np.arange(
0, noise_wave.shape[1] - (self.sample_rate + 1)
)
)
)
noise_mul = (
noise_wave[
0, start_noise : (start_noise + self.sample_rate)
].view(1, -1)
* self.background_volume
)
waveform += noise_mul
waveform = torch.clamp(waveform, min=-1.0, max=1.0)
if waveform.shape[1] > self.sample_rate:
# sample random 16000 from longer sequence
start_idx = np.random.choice(
np.arange(0, waveform.shape[1] - (self.sample_rate + 1))
)
uniform_waveform = waveform[
0, start_idx : (start_idx + self.sample_rate)
].view(1, -1)
elif waveform.shape[1] < self.sample_rate:
# pad front and back with 0
pad_size = int((self.sample_rate - waveform.shape[1]) / 2)
uniform_waveform = torch.zeros((1, self.sample_rate))
uniform_waveform[
0, pad_size : (pad_size + waveform.shape[1])
] = waveform[0, :]
else:
uniform_waveform = waveform
return uniform_waveform[0].t(), int(self.list_of_y[idx])