forked from jquagga/ttt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathttt.py
executable file
·405 lines (339 loc) · 13.3 KB
/
ttt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
#!/usr/bin/env python
import json
import os
import subprocess
import time
from datetime import datetime
from pathlib import Path
import apprise
import requests
import torch
from better_profanity import profanity
from transformers import (
AutoModelForSpeechSeq2Seq,
AutoProcessor,
pipeline,
)
# Let's increase our nice value by 5. We're important but let's not
# impact system functionality overall.
os.nice(5)
# Before we dig in, let's globally set up transformers
# We will load up the model, etc now so we only need to
# use the PIPE constant in the function.
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
model_id = os.environ.get("TTT_TRANSFORMERS_MODEL_ID", "openai/whisper-large-v3-turbo")
print(f"We are using {torch_dtype} on {device} with {model_id}")
model = AutoModelForSpeechSeq2Seq.from_pretrained(
model_id,
torch_dtype=torch_dtype,
low_cpu_mem_usage=True,
use_safetensors=True,
)
model.to(device)
processor = AutoProcessor.from_pretrained(model_id)
PIPE = pipeline(
"automatic-speech-recognition",
model=model,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
torch_dtype=torch_dtype,
device=device,
)
# If an ambulance is coming for you stroke is still a bad word,
# we don't want to censor it in this case.
profanity.load_censor_words(whitelist_words=["stroke"])
def transcribe_transformers(calljson, audiofile):
"""Transcribes audio file using transformers library.
Args:
calljson (dict): A dictionary containing the JSON data.
audiofile (str): The path to the audio file.
Returns:
dict: The updated calljson dictionary with the transcript.
Explanation:
This function transcribes the audio file using the transformers library. It loads a pre-trained model
and processor, creates a pipeline for automatic speech recognition, and processes the audio file.
The resulting transcript is added to the calljson dictionary and returned.
"""
audiofile = str(audiofile)
# Set the return argument to english
# result = PIPE(audiofile, generate_kwargs={"language": "english"})
result = PIPE(audiofile, generate_kwargs={"language": "english", "return_timestamps": True})
calljson["text"] = result["text"]
return calljson
def send_notifications(calljson, audiofile, destinations):
"""
Sends notifications using the provided calljson, audiofile, and destinations.
Args:
calljson (dict): The JSON object containing call information.
audiofile (str): The path to the audio file.
destinations (dict): A dictionary mapping short names and talkgroups to notification URLs.
Raises:
None
Returns:
None
Examples:
send_notifications(calljson, audiofile, destinations)
"""
# Run ai text through profanity filter
body = profanity.censor(calljson["text"])
title = (
calljson["talkgroup_description"]
+ " @ "
+ str(datetime.fromtimestamp(calljson["start_time"]))
)
short_name = str(calljson["short_name"])
talkgroup = str(calljson["talkgroup"])
notify_url = destinations[short_name][talkgroup]
# If TTT_ATTACH_AUDIO is set to True, attach it to apprise notification
attach_audio = os.environ.get("TTT_ATTACH_AUDIO", "False").lower() in (
"true",
"1",
"t",
)
apobj = apprise.Apprise()
apobj.add(notify_url)
if attach_audio:
audio_notification(audiofile, apobj, body, title)
else:
apobj.notify(
body=body,
title=title,
)
def audio_notification(audiofile, apobj, body, title):
"""
Encode audio file to AAC format and send a notification with the audio attachment.
Args:
audiofile (str): Path to the input audio file.
apobj: Object used to send notifications.
body (str): Body of the notification.
title (str): Title of the notification.
Returns:
None
Raises:
subprocess.CalledProcessError: If ffmpeg encoding fails.
subprocess.TimeoutExpired: If ffmpeg encoding exceeds 30 seconds.
"""
# Try and except to handle ffmpeg encoding failures
# If it fails, just upload the text and skip the audio attachment
try:
aacfile = Path(audiofile).with_suffix(".m4a")
ffmpeg_cmd = [
"ffmpeg",
"-y",
"-hide_banner",
"-loglevel",
"error",
"-i",
audiofile,
"-ac",
"1",
"-af",
"highpass=f=200,lowpass=f=3000,anlmdn,loudnorm=i=-14",
"-b:a",
"64k",
"-c:a",
"aac",
aacfile,
]
subprocess.run(ffmpeg_cmd, check=True, timeout=30)
aacfile = str(aacfile)
apobj.notify(
body=body,
title=title,
attach=aacfile,
)
# Remove aacfile; audiofile and json unlinked later
try:
Path(aacfile).unlink()
except FileNotFoundError:
print(f"File {aacfile} not found.")
except PermissionError:
print(f"No permission to delete {aacfile}.")
except subprocess.CalledProcessError:
print(
f"ffmpeg file conversion error with {aacfile}. We will skip audio on this file and post text only."
)
apobj.notify(
body=body,
title=title,
)
try:
Path(aacfile).unlink()
except FileNotFoundError:
print(f"File {aacfile} not found.")
except subprocess.TimeoutExpired:
print(
f"ffmpeg file conversion error exceeded 30 seconds on {aacfile}. We will skip audio on this file and post text only."
)
apobj.notify(
body=body,
title=title,
)
try:
Path(aacfile).unlink()
except FileNotFoundError:
print(f"File {aacfile} not found.")
def import_notification_destinations():
"""Imports notification destinations from a CSV file.
Returns:
dict: A dictionary containing the notification destinations.
Explanation:
This function reads a CSV file containing notification destinations. Each row in the CSV file represents
a destination, with the first column as the key, the second column as the sub-key, and the third column
as the value. The function constructs a dictionary where the keys are the values from the first column,
and the values are nested dictionaries with the sub-keys and values from the second and third columns,
respectively. The resulting dictionary is returned.
"""
import csv
destinations = {}
with open("destinations.csv", newline="") as inp:
reader = csv.reader(inp)
next(reader, None) # skip the headers
for row in reader:
if row[0] in destinations:
destinations[row[0]][row[1]] = row[2]
else:
destinations[row[0]] = {row[1]: row[2]}
return destinations
def main():
"""Runs the main loop for transcribing audio files and sending notifications.
Explanation:
This function imports the notification destinations, searches for JSON files in the "media/transcribe" directory,
transcribes the corresponding audio files using different methods based on environment variables,
sends notifications using the transcribed text and the audio files, and deletes the JSON and audio files.
Args:
None
Returns:
None
Raises:
None
Examples:
None
"""
# Import the apprise destinations to send calls
destinations = import_notification_destinations()
while 1:
# First lets search the media directory for all json, sorted by creation time
jsonlist = sorted(
Path("media/transcribe").rglob("*.[jJ][sS][oO][nN]"), key=os.path.getctime
)
# If the queue is empty, pause for 5 seconds and then restart polling
if not jsonlist:
print("Empty queue. Sleep 5 seconds and check again.")
time.sleep(5)
continue
for jsonfile in jsonlist:
# Ok, let's grab the first json and pull it out and then the matching wav file
audiofile = Path(jsonfile).with_suffix(".wav")
print(f"Processing: {audiofile}")
# Now load the actual json data into calljson
calljson = jsonfile.read_text()
calljson = json.loads(calljson)
# Send the json and audiofile to a function to transcribe
# If TTT_DEEPGRAM_KEY is set, use deepgram, else
# if TTT_WHISPER_URL is set, use whisper.cpp else
# transformers
if os.environ.get("TTT_DEEPGRAM_KEY", False):
calljson = transcribe_deepgram(calljson, audiofile)
elif os.environ.get("TTT_WHISPERCPP_URL", False):
calljson = transcribe_whispercpp(calljson, audiofile)
else:
calljson = transcribe_transformers(calljson, audiofile)
# When Whisper process a file with no speech, it tends to spit out "you"
# Just "you" and nothing else.
# So if the transcript is just "you", don't bother sending the notification,
# we will just delete the files and keep going to the next call.
if calljson["text"].strip() != "you":
send_notifications(calljson, audiofile, destinations)
# And now delete the files from the transcribe directory
try:
Path(jsonfile).unlink()
except FileNotFoundError:
print(f"File {jsonfile} not found.")
except PermissionError:
print(f"No permission to delete {jsonfile}.")
try:
Path(audiofile).unlink()
except FileNotFoundError:
print(f"File {audiofile} not found.")
except PermissionError:
print(f"No permission to delete {audiofile}.")
def transcribe_whispercpp(calljson, audiofile):
"""Transcribes audio file using whisper.cpp.
Args:
calljson (dict): A dictionary containing the JSON data.
audiofile (Path): The path to the audio file.
Returns:
dict: The updated calljson dictionary with the transcript.
Explanation:
This function sends the audio file to whisper.cpp for transcription. It constructs a multipart/form-data
request with the audio file and other parameters. The response from whisper.cpp is parsed as JSON and
merged into the calljson dictionary. The updated calljson dictionary is then returned.
"""
whisper_url = os.environ.get("TTT_WHISPERCPP_URL", "http://whisper:8080")
# Now send the files over to whisper for transcribing
files = {
"file": (None, audiofile.read_bytes()),
"temperature": (None, "0.0"),
"temperature_inc": (None, "0.2"),
"response_format": (None, "json"),
}
try:
response = requests.post(f"{whisper_url}/inference", files=files)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"A request error occurred while trying to post to whisper.cpp: {e}")
raise RuntimeError(
"A request error occurred while trying to post to whisper.cpp."
) from e
calltext = response.json()
# And now merge that dict into calljson so [text] in calljson is the transcript
calljson = {**calljson, **calltext}
return calljson
def transcribe_deepgram(calljson, audiofile):
"""Transcribes audio file using Deepgram API.
Args:
calljson (dict): A dictionary containing the JSON data.
audiofile (Path): The path to the audio file.
Returns:
dict: The updated calljson dictionary with the transcript.
Explanation:
This function sends the audio file to the Deepgram API for transcription. It constructs a POST request
with the audio file and necessary headers. The response from Deepgram is parsed as JSON, and the
transcript is extracted and added to the calljson dictionary. The updated calljson dictionary is then
returned.
"""
deepgram_key = os.environ.get("TTT_DEEPGRAM_KEY")
headers = {
"Authorization": f"Token {deepgram_key}",
"Content-Type": "audio/wav",
}
params = {
"model": "nova-2-phonecall",
"language": "en-US",
"smart_format": "true",
}
data = audiofile.read_bytes()
try:
response = requests.post(
"https://api.deepgram.com/v1/listen",
params=params,
headers=headers,
data=data,
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"A request error occurred while trying to post to Deepgram: {e}")
raise RuntimeError(
"A request error occurred while trying to post to Deepgram."
) from e
json = response.json()
# We take the json returned from deepgram and pull out the "transcript"
# then tack it onto the calljson dict as "text" which is what whisper
# normally uses
calltext = json["results"]["channels"][0]["alternatives"][0]["transcript"]
calljson["text"] = calltext
return calljson
if __name__ == "__main__":
main()