-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTranscribe.py
322 lines (278 loc) · 12.9 KB
/
Transcribe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
import os
import io
from pydub import AudioSegment
import copy
from GoogleTranscribe import *
from Converter import *
SPLICELEN = 60000 # for now, only up to 60 seconds per splice
PATCHLEN = 10000
TEXTLEN = 75
SPLICE = '/splice.'
PATCH = '/patch.'
INSIDEPATCH = '/insidepatch.'
SPLITPATCH1 = '/splitpatch1.'
SPLITPATCH2 = '/splitpatch2.'
FILENAMES = [SPLICE, PATCH, INSIDEPATCH, SPLITPATCH1, SPLITPATCH2]
# sets up credentials to allow someone who holds the required json file to use speech recognition
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = #AUTHENTICATION FILE NAME
def filterName(speechFile, app):
"""Filters the file name and converts it into a .wav file.
Also filters the file itself of most constant background noise."""
app.updateLog(' Filtering name: ' + speechFile)
if not isFile(speechFile, app):
return None
newName = speechFile.replace(" ", "-")
index = extensionIndex(newName)
if newName != speechFile:
os.rename(speechFile, newName) # renames the original file
app.updateLog('\n Renamed to: ' + newName)
if newName[index:] != '.wav': # check if it is a .wav file
app.updateLog('\n Converted file from ' + newName[index:] + ' to .wav. Referring to .wav file.')
newName = convert(newName, '.wav') # convert to .wav file
newName = convertClean(newName, '.wav')
app.updateLog('\n Name filtered to: ' + newName + '\n')
return newName
def isFile(speechFile, app):
if not os.path.isfile(speechFile):
app.updateLog(' Not a file! Please input a file\n')
return False
return True
def extensionIndex(speechFile):
"""Helper that finds the index for the extension of a file."""
index = -1
while speechFile[index] != '.' and index > -len(speechFile) + 1:
index -= 1
return index
def mkFilesLocation(speechFile, app):
"""Makes a new folder if one does not already exist."""
end = extensionIndex(speechFile)
folder = speechFile[:end]
if not os.path.exists(folder):
app.updateLog(' Made new directory.\n')
os.mkdir(folder)
def divide_files(speechFile, sound, length, app):
"""Splices a given file and creates the corresponding patches."""
def export(name, startSound, endSound, index, FORMAT='wav'):
try: newSound = sound[startSound:endSound]
except: newSound = sound[startSound:]
newSound.export(folder + name + str(index) + '.' + FORMAT, format=FORMAT)
app.updateLog(' Started division ... ')
end = extensionIndex(speechFile)
folder = speechFile[:end]
for i in range(int(length // (SPLICELEN//1000))):
export(SPLICE, i*SPLICELEN, (i+1)*SPLICELEN, i)
export(PATCH, (i+1)*SPLICELEN - PATCHLEN//2, (i+1)*SPLICELEN + PATCHLEN//2, i)
export(INSIDEPATCH, (i+1)*SPLICELEN - PATCHLEN//4, (i+1)*SPLICELEN + PATCHLEN//4, i)
export(SPLITPATCH1, (i+1)*SPLICELEN - PATCHLEN//2, (i+1)*SPLICELEN, i)
export(SPLITPATCH2, (i+1)*SPLICELEN, (i+1)*SPLICELEN + PATCHLEN//2, i)
app.updateLog('Finished.\n')
def reparameter_files(speechFile, length, app):
"""Overlays a file to allow transcription by the Google API."""
def replace(name, index, FORMAT='wav'):
fileName = folder + name + str(index) + '.' + FORMAT
if os.path.exists(fileName):
segment = AudioSegment.from_wav(fileName)
duration = segment.duration_seconds * 1000
os.remove(fileName)
combined = basis[:duration].overlay(segment)
combined.export(fileName, FORMAT)
app.updateLog(' Started reparameterization ... ')
end = extensionIndex(speechFile)
folder = speechFile[:end]
basis = AudioSegment.from_wav('Audio/basis.wav')
basis = basis[:SPLICELEN] - 100 # reduce volume of basis by 100 decibels
for i in range(int(length // (SPLICELEN//1000))):
replace(SPLICE, i)
replace(PATCH, i)
replace(INSIDEPATCH, i)
replace(SPLITPATCH1, i)
replace(SPLITPATCH2, i)
app.updateLog('Finished.\n')
def rmFilesLocation(speechFile, app):
"""Removes the folder of splices and patch in case of an error."""
end = extensionIndex(speechFile)
folder = speechFile[:end]
if len(os.listdir(folder)) == 0:
os.rmdir(folder)
app.updateLog(' Removed empty folder.\n')
else:
app.updateLog(' Failed to removed filled folder.\n')
def remove_files(speechFile, length, app):
"""Removes splice and patch files."""
def remove(name, index, extension='.wav'):
fileName = folder + name + str(index) + extension
if os.path.exists(fileName):
os.remove(fileName)
app.updateLog(' Started file removal ... ')
end = extensionIndex(speechFile)
folder = speechFile[:end]
for i in range(int(length // (SPLICELEN//1000))):
for filename in FILENAMES:
remove(filename, i)
app.updateLog('Finished.\n')
def match_words(outerWords, innerWords, middleIndex):
"""Helper that finds the largest section of intersecting words."""
startIndex = middleIndex
while (outerWords[startIndex] in innerWords) and startIndex > 0:
startIndex -= 1
startIndex += 1
endIndex = middleIndex
while (outerWords[endIndex] in innerWords) and endIndex < len(outerWords) - 1:
endIndex += 1
return outerWords[startIndex:endIndex]
def match_patch(fullPatch, folder, index):
"""Finds words in the inner patch that also belong to the outer patch.
Used as the main patch to knit together splices."""
patchWords = fullPatch.lower().split(" ")
middleWordIndex = len(patchWords)//2
insidePatch = transcribe_one(INSIDEPATCH, folder, index).lower()
insidePatchWords = insidePatch.split(" ")
if patchWords[middleWordIndex] not in insidePatchWords:
# check other patches for words
splitPatch1 = transcribe_one(SPLITPATCH1, folder, index).lower()
splitPatch1Words = splitPatch1.split(" ")
if patchWords[middleWordIndex] in splitPatch1Words:
return match_words(patchWords, splitPatch1Words, middleWordIndex)
splitPatch2 = transcribe_one(SPLITPATCH2, folder, index).lower()
splitPatch2Words = splitPatch2.split(" ")
if patchWords[middleWordIndex] in splitPatch2Words:
return match_words(patchWords, splitPatch2Words, middleWordIndex)
return patchWords # returns the entire patch if nothing can be found
return match_words(patchWords, insidePatchWords, middleWordIndex)
def match_start(fullTranscript, matches):
"""Helper that knits the previous splice with the next patch."""
copyMatches = copy.copy(matches)
copyScript = fullTranscript[-TEXTLEN:]
endScript = copyScript[copyScript.find(' ')+1:].lower()
while len(copyMatches) > 1:
place = endScript.find(copyMatches[0] + ' ' + copyMatches[1])
if place == -1:
copyMatches.pop(0) # case of bad translation at beginning of insidePatch
elif place == 0 or (place != 0 and endScript[place-1] == ' '):
# and endScript[place + len(matches[0]) + len(matches[1]) + 2] == ' ', potential error
return place + copyScript.find(' ')
else:
while place != endScript[place+1:].find(copyMatches[0] + ' ' + copyMatches[1]) + place + 1:
place = endScript[place+1:].find(copyMatches[0] + ' ' + copyMatches[1]) + place + 1
if endScript[place-1] == ' ':
return place + copyScript.find(' ')
copyMatches.pop(0)
place = 0
return None
def match_end(nextTranscript, matches):
"""Helper that knits a patch with the next splice."""
copyMatches = copy.copy(matches)
copyScript = nextTranscript[:TEXTLEN][::-1]
startScript = copyScript[copyScript.find(' ')+1:].lower()
while len(copyMatches) > 1:
place = startScript.find(copyMatches[-1][::-1] + ' ' + copyMatches[-2][::-1])
if place == -1:
copyMatches.pop() # case of bad translation at beginning of insidePatch
elif place == 0 or (place != 0 and startScript[place-1] == ' '):
return place + copyScript.find(' ')
else:
while place != startScript[place+1:].find(copyMatches[-1][::-1] + ' ' + copyMatches[-2][::-1]) + place + 1:
place = startScript[place+1:].find(copyMatches[-1][::-1] + ' ' + copyMatches[-2][::-1]) + place + 1
if startScript[place-1] == ' ':
return place + copyScript.find(' ')
copyMatches.pop()
place = TEXTLEN
return None
def transcribe_one(name, folder, index, extension='.wav'):
"""Transcribes a single file."""
fileName = folder + name + str(index) + extension
if os.path.exists(fileName):
transcript = google_transcribe(fileName)
return transcript
return ""
def develop_transcript(folder, length):
"""Develops the transcript by taking file splices and patching them together."""
fullTranscript = transcribe_one(SPLICE, folder, 0) # start fullTranscript
for i in range(int(length // (SPLICELEN//1000))):
if i != int(length // (SPLICELEN//1000)) - 1: # checks if there is a next transcript
nextTranscript = transcribe_one(SPLICE, folder, i+1)
else:
nextTranscript = "" # last add does not add a section
fullPatch = transcribe_one(PATCH, folder, i).lower()
if fullPatch == "": continue
matches = match_patch(fullPatch, folder, i)
if len(matches) != 0:
# for patching first part
place = match_start(fullTranscript, matches)
if place != None: fullTranscript = fullTranscript[ : place-TEXTLEN]
# for patching next part
place = match_end(nextTranscript, matches)
if place != None: nextTranscript = nextTranscript[TEXTLEN-place : ]
fullTranscript = ' '.join([fullTranscript] + matches + [nextTranscript])
return fullTranscript
def transcribe_files(speechFile, length, app):
"""Checks is the given file exists and calls develop_transcript."""
app.updateLog(' Started transcription ... ')
end = extensionIndex(speechFile)
folder = speechFile[:end]
if not os.path.exists(speechFile):
app.updateLog('File does not exist, cannot call transcription.\n')
return
fullTranscript = develop_transcript(folder, length) # develop transcript
file = open(folder + '/transcription.txt', 'w')
file.write(fullTranscript)
app.updateLog('Finished.\n')
def simple_transcribe(speechFile, app):
"""Transcribes a file less than the maximum length of a splice."""
app.updateLog(' Started transcription ... ')
end = extensionIndex(speechFile)
folder = speechFile[:end]
if not os.path.exists(speechFile): # check file existence
app.updateLog('File does not exist, cannot call transcription.\n')
return
fullTranscript = google_transcribe(speechFile) # directly translate the file
file = open(folder + '/transcription.txt', 'w')
file.write(fullTranscript)
app.updateLog('Finished.\n')
def full_transcribe(file, app, willRemove=True, cutoff=None):
"""Call for the full transcription of any video or audio file.
Emphasizes safety to make sure program does not crash."""
app.updateLog(' Transcribing the following file: ' + file + '\n')
try:
file = filterName(file, app)
except:
return 'Error in filtering name!'
try:
if cutoff == None:
sound = AudioSegment.from_wav(file)
else:
sound = AudioSegment.from_wav(file)[:cutoff]
length = sound.duration_seconds
except:
return 'Error in determining length'
try:
mkFilesLocation(file, app)
except:
return 'Error in making files location!'
try:
divide_files(file, sound, length, app)
except:
if willRemove:
remove_files(file, length, app)
rmFilesLocation(file, app)
return 'Error in dividing files!'
try:
reparameter_files(file, length, app)
except:
if willRemove:
remove_files(file, length, app)
rmFilesLocation(file, app)
return 'Error in reparametering files!'
try:
if length > SPLICELEN//1000:
transcribe_files(file, length, app)
else:
simple_transcribe(file, app)
except:
if willRemove:
remove_files(file, length, app)
rmFilesLocation(file, app)
return 'Error in transcription!'
if willRemove:
remove_files(file, length, app)
return None