-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrealtime-stt-server.py
282 lines (242 loc) · 9.75 KB
/
realtime-stt-server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
#!/usr/bin/env python3
"""
A server for RealtimeSTT made to be used with whisper-overlay.
"""
import argparse
import json
import logging
import queue
import socket
import struct
import time
import sys
import threading
def send_message(sock, message):
message_str = json.dumps(message)
message_bytes = message_str.encode("utf-8")
message_length = len(message_bytes)
sock.sendall(struct.pack("!I", message_length))
sock.sendall(message_bytes)
def recv_message(sock):
length_bytes = sock.recv(4)
if not length_bytes or len(length_bytes) == 0:
return None
message_length = struct.unpack("!I", length_bytes)[0]
if message_length & 0x80000000 != 0:
# Raw audio data
message_length &= ~0x80000000
message_bytes = sock.recv(message_length)
return message_bytes
message_bytes = sock.recv(message_length)
message_str = message_bytes.decode("utf-8")
return json.loads(message_str)
class Client:
def __init__(self, tag, conn):
self.tag = tag
self.conn = conn
self.thread = threading.current_thread()
self.mode = None
self.is_true_client = False
self.waiting = False
self.queue = queue.Queue()
clients = {}
active_client = None
model_lock = threading.Lock()
def publish(obj, client=None):
msg = json.dumps(obj)
if client is None:
for c in clients.values():
if c.mode == "status":
c.queue.put(msg)
else:
client.queue.put(msg)
def refresh_status(client=None):
publish(dict(refresh_status=True), client=client)
def handle_client(conn, addr):
global recorder
global active_client
tag = f"{addr[0]}:{addr[1]}"
client = Client(tag, conn)
clients[addr] = client
try:
logger.info(f'{tag} Connected to client')
init = recv_message(conn)
logger.info(f'{tag} Client requested mode {init["mode"]}')
client.mode = init["mode"]
client.is_true_client = init["mode"] == "stream"
if init["mode"] == "status":
refresh_status(client) # refresh once after startup
while True:
message = json.loads(client.queue.get())
if "refresh_status" in message and message["refresh_status"] == True:
n_clients = len(list(filter(lambda x: x.is_true_client, clients.values())))
n_waiting = len(list(filter(lambda x: x.is_true_client and x.waiting, clients.values())))
status = {
"clients": n_clients,
"waiting": n_waiting,
}
send_message(conn, status)
client.queue.task_done()
else:
logger.info(f'{tag} Acquiring lock')
client.waiting = True
refresh_status()
send_message(conn, dict(status="waiting for lock"))
with model_lock:
active_client = client
client.waiting = False
refresh_status()
send_message(conn, dict(status="lock acquired"))
recorder.start()
def send_queue():
try:
while True:
message = client.queue.get()
if message is None:
return
send_message(conn, message)
client.queue.task_done()
except (OSError, ConnectionError):
logger.info(f"{tag} error in send queue: connection closed?")
sender_thread = threading.Thread(target=send_queue)
sender_thread.daemon = True
sender_thread.start()
try:
while True:
msg = recv_message(conn)
if msg is None:
break
if isinstance(msg, bytes):
recorder.feed_audio(msg)
continue
if "action" in msg and msg["action"] == "flush":
logger.info(f"{tag} flushing on client request")
# input some silence
for i in range(10):
recorder.feed_audio(bytes(1000))
recorder.stop()
logger.info(f"{tag} flushed")
continue
else:
logger.info(f"{tag} error in recv: invalid message: {msg}")
continue
except (OSError, ConnectionError):
logger.info(f"{tag} error in recv: connection closed?")
finally:
client.queue.put(None)
active_client = None
recorder.stop()
sender_thread.join()
except Exception as e:
import traceback
traceback.print_exc()
logger.error(f'{tag} Error handling client: {e}')
finally:
refresh_status()
del clients[addr]
conn.close()
logger.info(f'{tag} Connection closed')
if __name__ == "__main__":
logging.basicConfig(format="%(levelname)s %(message)s")
logger = logging.getLogger("realtime-stt-server")
logger.setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default='localhost',
help="The host to listen on [default: 'localhost']")
parser.add_argument("--port", type=int, default=7007,
help="The port to listen on [default: 7007]")
parser.add_argument("--device", type=str, default="cuda",
help="Device to run the models on, defaults to cuda if available, else cpu [default: 'cuda']")
parser.add_argument("--model", type=str, default="large-v3",
help="Main model used to generate the final transcription [default: 'large-v3']")
parser.add_argument("--model-realtime", type=str, default="base",
help="Faster model used to generate live transcriptions [default: 'base']")
parser.add_argument("--language", type=str, default="",
help="Set the spoken language. Leave empty to auto-detect. [default: '']")
parser.add_argument("--debug", action="store_true",
help="Enable debug log output [default: unset]")
args = parser.parse_args()
if args.debug:
logger.setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
# FIXME: workaround until we can update RealtimeSTT to support device params
if args.device == "cpu":
import torch
torch.cuda.is_available = lambda: False
logger.info("Importing runtime")
from RealtimeSTT import AudioToTextRecorder
def text_detected(ts):
text, segments = ts
global active_client
if active_client is not None:
segments = [x._asdict() for x in segments]
active_client.queue.put(dict(kind="realtime", text=text, segments=segments))
recorder_ready = threading.Event()
recorder_config = {
'init_logging': False,
# FIXME: once fixed upstream 'device': args.device,
'use_microphone': False,
'spinner': False,
'model': args.model,
'return_segments': True,
'language': args.language,
'silero_sensitivity': 0.4,
'webrtc_sensitivity': 2,
'post_speech_silence_duration': 0.7,
'min_length_of_recording': 0.0,
'min_gap_between_recordings': 0,
'enable_realtime_transcription': True,
'realtime_processing_pause': 0,
'realtime_model_type': args.model_realtime,
'on_realtime_transcription_stabilized': text_detected,
}
def recorder_thread():
global recorder
global active_client
logger.info("Initializing RealtimeSTT...")
recorder = AudioToTextRecorder(**recorder_config)
logger.info("AudioToTextRecorder ready")
recorder_ready.set()
try:
while not recorder.is_shut_down:
text, segments = recorder.text()
if text == "":
continue
if active_client is not None:
segments = [x._asdict() for x in segments]
active_client.queue.put(dict(kind="result", text=text, segments=segments))
except (OSError, EOFError) as e:
logger.info(f"recorder thread failed: {e}")
return
recorder_thread = threading.Thread(target=recorder_thread)
recorder_thread.start()
recorder_ready.wait()
logger.info(f'Starting server on {args.host}:{args.port}')
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((args.host, args.port))
s.listen()
logger.info(f'Server ready to accept connections')
try:
while True:
# Accept incoming connection
conn, addr = s.accept()
conn.setblocking(True)
# Create a new thread to handle the client
client_thread = threading.Thread(target=handle_client, args=(conn, addr))
client_thread.daemon = True # die with main thread
client_thread.start()
# Note: The main thread continues to accept new connections
except KeyboardInterrupt:
logger.info(f'Received shutdown request')
for c in clients.values():
try:
c.conn.close()
except (OSError, ConnectionError):
pass
try:
s.close()
except (OSError, ConnectionError):
pass
recorder.shutdown()
logger.info('Server terminated')