stt_server.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739
  1. """
  2. Speech-to-Text (STT) Server with Real-Time Transcription and WebSocket Interface
  3. This server provides real-time speech-to-text (STT) transcription using the RealtimeSTT library. It allows clients to connect via WebSocket to send audio data and receive real-time transcription updates. The server supports configurable audio recording parameters, voice activity detection (VAD). It is designed to handle continuous transcription as well as post-recording processing, enabling real-time feedback with the option to improve final transcription quality after the complete sentence is recognized.
  4. ### Features:
  5. - Real-time transcription using pre-configured or user-defined STT models.
  6. - WebSocket-based communication for control and data handling.
  7. - Flexible recording and transcription options, including configurable pauses for sentence detection.
  8. - Supports Silero and WebRTC VAD for robust voice activity detection.
  9. ### Starting the Server:
  10. You can start the server using the command-line interface (CLI) command `stt-server`, passing the desired configuration options.
  11. ```bash
  12. stt-server [OPTIONS]
  13. ```
  14. ### Available Parameters:
  15. - `-m, --model`: Model path or size; default 'large-v2'.
  16. - `-r, --rt-model, --realtime_model_type`: Real-time model size; default 'tiny.en'.
  17. - `-l, --lang, --language`: Language code for transcription; default 'en'.
  18. - `-i, --input-device, --input_device_index`: Audio input device index; default 1.
  19. - `-c, --control, --control_port`: WebSocket control port; default 8011.
  20. - `-d, --data, --data_port`: WebSocket data port; default 8012.
  21. - `-D, --debug`: Enable debug logging.
  22. - `-W, --write`: Save audio to WAV file.
  23. - `-s, --silence_timing`: Enable dynamic silence duration for sentence detection; default True.
  24. - `--silero_sensitivity`: Silero VAD sensitivity (0-1); default 0.05.
  25. - `--silero_use_onnx`: Use Silero ONNX model; default False.
  26. - `--webrtc_sensitivity`: WebRTC VAD sensitivity (0-3); default 3.
  27. - `--min_length_of_recording`: Minimum recording duration in seconds; default 1.1.
  28. - `--min_gap_between_recordings`: Min time between recordings in seconds; default 0.
  29. - `--enable_realtime_transcription`: Enable real-time transcription; default True.
  30. - `--realtime_processing_pause`: Pause between audio chunk processing; default 0.02.
  31. - `--silero_deactivity_detection`: Use Silero for end-of-speech detection; default True.
  32. - `--early_transcription_on_silence`: Start transcription after silence in seconds; default 0.2.
  33. - `--beam_size`: Beam size for main model; default 5.
  34. - `--beam_size_realtime`: Beam size for real-time model; default 3.
  35. - `--initial_prompt`: Initial transcription guidance prompt.
  36. - `--end_of_sentence_detection_pause`: Silence duration for sentence end detection; default 0.45.
  37. - `--unknown_sentence_detection_pause`: Pause duration for incomplete sentence detection; default 0.7.
  38. - `--mid_sentence_detection_pause`: Pause for mid-sentence break; default 2.0.
  39. - `--use_main_model_for_realtime`: Use main model for real-time transcription.
  40. - `--use_extended_logging`: Enable extensive log messages.
  41. - `--logchunks`: Log incoming audio chunks.
  42. ### WebSocket Interface:
  43. The server supports two WebSocket connections:
  44. 1. **Control WebSocket**: Used to send and receive commands, such as setting parameters or calling recorder methods.
  45. 2. **Data WebSocket**: Used to send audio data for transcription and receive real-time transcription updates.
  46. The server will broadcast real-time transcription updates to all connected clients on the data WebSocket.
  47. """
  48. from .install_packages import check_and_install_packages
  49. from difflib import SequenceMatcher
  50. from collections import deque
  51. from datetime import datetime
  52. import logging
  53. import asyncio
  54. import pyaudio
  55. import sys
  56. debug_logging = False
  57. extended_logging = False
  58. send_recorded_chunk = False
  59. log_incoming_chunks = False
  60. silence_timing = False
  61. writechunks = False#
  62. wav_file = None
  63. hard_break_even_on_background_noise = 3.0
  64. hard_break_even_on_background_noise_min_texts = 3
  65. hard_break_even_on_background_noise_min_similarity = 0.99
  66. hard_break_even_on_background_noise_min_chars = 15
  67. text_time_deque = deque()
  68. loglevel = logging.WARNING
  69. FORMAT = pyaudio.paInt16
  70. CHANNELS = 1
  71. if sys.platform == 'win32':
  72. asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
  73. check_and_install_packages([
  74. {
  75. 'module_name': 'RealtimeSTT', # Import module
  76. 'attribute': 'AudioToTextRecorder', # Specific class to check
  77. 'install_name': 'RealtimeSTT', # Package name for pip install
  78. },
  79. {
  80. 'module_name': 'websockets', # Import module
  81. 'install_name': 'websockets', # Package name for pip install
  82. },
  83. {
  84. 'module_name': 'numpy', # Import module
  85. 'install_name': 'numpy', # Package name for pip install
  86. },
  87. {
  88. 'module_name': 'scipy.signal', # Submodule of scipy
  89. 'attribute': 'resample', # Specific function to check
  90. 'install_name': 'scipy', # Package name for pip install
  91. }
  92. ])
  93. # Define ANSI color codes for terminal output
  94. class bcolors:
  95. HEADER = '\033[95m' # Magenta
  96. OKBLUE = '\033[94m' # Blue
  97. OKCYAN = '\033[96m' # Cyan
  98. OKGREEN = '\033[92m' # Green
  99. WARNING = '\033[93m' # Yellow
  100. FAIL = '\033[91m' # Red
  101. ENDC = '\033[0m' # Reset to default
  102. BOLD = '\033[1m'
  103. UNDERLINE = '\033[4m'
  104. print(f"{bcolors.BOLD}{bcolors.OKCYAN}Starting server, please wait...{bcolors.ENDC}")
  105. # Initialize colorama
  106. from colorama import init, Fore, Style
  107. init()
  108. from RealtimeSTT import AudioToTextRecorder
  109. from scipy.signal import resample
  110. import numpy as np
  111. import websockets
  112. import threading
  113. import logging
  114. import wave
  115. import json
  116. import time
  117. global_args = None
  118. recorder = None
  119. recorder_config = {}
  120. recorder_ready = threading.Event()
  121. recorder_thread = None
  122. stop_recorder = False
  123. prev_text = ""
  124. # Define allowed methods and parameters for security
  125. allowed_methods = [
  126. 'set_microphone',
  127. 'abort',
  128. 'stop',
  129. 'clear_audio_queue',
  130. 'shutdown',
  131. 'text',
  132. ]
  133. allowed_parameters = [
  134. 'silero_sensitivity',
  135. 'post_speech_silence_duration',
  136. 'listen_start',
  137. 'recording_stop_time',
  138. 'last_transcription_bytes',
  139. 'last_transcription_bytes_b64',
  140. ]
  141. # Queues and connections for control and data
  142. control_connections = set()
  143. data_connections = set()
  144. control_queue = asyncio.Queue()
  145. audio_queue = asyncio.Queue()
  146. def preprocess_text(text):
  147. # Remove leading whitespaces
  148. text = text.lstrip()
  149. # Remove starting ellipses if present
  150. if text.startswith("..."):
  151. text = text[3:]
  152. if text.endswith("...'."):
  153. text = text[:-1]
  154. if text.endswith("...'"):
  155. text = text[:-1]
  156. # Remove any leading whitespaces again after ellipses removal
  157. text = text.lstrip()
  158. # Uppercase the first letter
  159. if text:
  160. text = text[0].upper() + text[1:]
  161. return text
  162. def debug_print(message):
  163. if debug_logging:
  164. timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  165. thread_name = threading.current_thread().name
  166. print(f"{Fore.CYAN}[DEBUG][{timestamp}][{thread_name}] {message}{Style.RESET_ALL}", file=sys.stderr)
  167. def text_detected(text, loop):
  168. global prev_text
  169. text = preprocess_text(text)
  170. if silence_timing:
  171. def ends_with_ellipsis(text: str):
  172. if text.endswith("..."):
  173. return True
  174. if len(text) > 1 and text[:-1].endswith("..."):
  175. return True
  176. return False
  177. def sentence_end(text: str):
  178. sentence_end_marks = ['.', '!', '?', '。']
  179. if text and text[-1] in sentence_end_marks:
  180. return True
  181. return False
  182. if ends_with_ellipsis(text):
  183. recorder.post_speech_silence_duration = global_args.mid_sentence_detection_pause
  184. elif sentence_end(text) and sentence_end(prev_text) and not ends_with_ellipsis(prev_text):
  185. recorder.post_speech_silence_duration = global_args.end_of_sentence_detection_pause
  186. else:
  187. recorder.post_speech_silence_duration = global_args.unknown_sentence_detection_pause
  188. # Append the new text with its timestamp
  189. current_time = time.time()
  190. text_time_deque.append((current_time, text))
  191. # Remove texts older than hard_break_even_on_background_noise seconds
  192. while text_time_deque and text_time_deque[0][0] < current_time - hard_break_even_on_background_noise:
  193. text_time_deque.popleft()
  194. # Check if at least hard_break_even_on_background_noise_min_texts texts have arrived within the last hard_break_even_on_background_noise seconds
  195. if len(text_time_deque) >= hard_break_even_on_background_noise_min_texts:
  196. texts = [t[1] for t in text_time_deque]
  197. first_text = texts[0]
  198. last_text = texts[-1]
  199. # Compute the similarity ratio between the first and last texts
  200. similarity = SequenceMatcher(None, first_text, last_text).ratio()
  201. if similarity > hard_break_even_on_background_noise_min_similarity and len(first_text) > hard_break_even_on_background_noise_min_chars:
  202. recorder.stop()
  203. recorder.clear_audio_queue()
  204. prev_text = ""
  205. prev_text = text
  206. # Put the message in the audio queue to be sent to clients
  207. message = json.dumps({
  208. 'type': 'realtime',
  209. 'text': text
  210. })
  211. asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
  212. # Get current timestamp in HH:MM:SS.nnn format
  213. timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
  214. if extended_logging:
  215. print(f" [{timestamp}] Realtime text: {bcolors.OKCYAN}{text}{bcolors.ENDC}\n", flush=True, end="")
  216. else:
  217. print(f"\r[{timestamp}] {bcolors.OKCYAN}{text}{bcolors.ENDC}", flush=True, end='')
  218. def on_recording_start(loop):
  219. # Send a message to the client indicating recording has started
  220. message = json.dumps({
  221. 'type': 'recording_start'
  222. })
  223. asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
  224. def on_recording_stop(loop):
  225. # Send a message to the client indicating recording has stopped
  226. message = json.dumps({
  227. 'type': 'recording_stop'
  228. })
  229. asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
  230. def on_vad_detect_start(loop):
  231. message = json.dumps({
  232. 'type': 'vad_detect_start'
  233. })
  234. asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
  235. def on_vad_detect_stop(loop):
  236. message = json.dumps({
  237. 'type': 'vad_detect_stop'
  238. })
  239. asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
  240. def on_transcription_start(loop):
  241. # Send a message to the client when transcription starts
  242. message = json.dumps({
  243. 'type': 'transcription_start'
  244. })
  245. asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
  246. # def on_realtime_transcription_update(text, loop):
  247. # # Send real-time transcription updates to the client
  248. # text = preprocess_text(text)
  249. # message = json.dumps({
  250. # 'type': 'realtime_update',
  251. # 'text': text
  252. # })
  253. # asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
  254. # def on_recorded_chunk(chunk, loop):
  255. # if send_recorded_chunk:
  256. # bytes_b64 = base64.b64encode(chunk.tobytes()).decode('utf-8')
  257. # message = json.dumps({
  258. # 'type': 'recorded_chunk',
  259. # 'bytes': bytes_b64
  260. # })
  261. # asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
  262. # Define the server's arguments
  263. def parse_arguments():
  264. global debug_logging, extended_logging, loglevel, writechunks, log_incoming_chunks, dynamic_silence_timing
  265. import argparse
  266. parser = argparse.ArgumentParser(description='Start the Speech-to-Text (STT) server with various configuration options.')
  267. parser.add_argument('-m', '--model', type=str, default='large-v2',
  268. help='Path to the STT model or model size. Options include: tiny, tiny.en, base, base.en, small, small.en, medium, medium.en, large-v1, large-v2, or any huggingface CTranslate2 STT model such as deepdml/faster-whisper-large-v3-turbo-ct2. Default is large-v2.')
  269. parser.add_argument('-r', '--rt-model', '--realtime_model_type', type=str, default='tiny.en',
  270. help='Model size for real-time transcription. Options same as --model. This is used only if real-time transcription is enabled (enable_realtime_transcription). Default is tiny.en.')
  271. parser.add_argument('-l', '--lang', '--language', type=str, default='en',
  272. help='Language code for the STT model to transcribe in a specific language. Leave this empty for auto-detection based on input audio. Default is en. List of supported language codes: https://github.com/openai/whisper/blob/main/whisper/tokenizer.py#L11-L110')
  273. parser.add_argument('-i', '--input-device', '--input_device_index', type=int, default=1,
  274. help='Index of the audio input device to use. Use this option to specify a particular microphone or audio input device based on your system. Default is 1.')
  275. parser.add_argument('-c', '--control', '--control_port', type=int, default=8011,
  276. help='The port number used for the control WebSocket connection. Control connections are used to send and receive commands to the server. Default is port 8011.')
  277. parser.add_argument('-d', '--data', '--data_port', type=int, default=8012,
  278. help='The port number used for the data WebSocket connection. Data connections are used to send audio data and receive transcription updates in real time. Default is port 8012.')
  279. parser.add_argument('-D', '--debug', action='store_true', help='Enable debug logging for detailed server operations')
  280. parser.add_argument("-W", "--write", metavar="FILE",
  281. help="Save received audio to a WAV file")
  282. parser.add_argument('-s', '--silence_timing', action='store_true', default=True,
  283. help='Enable dynamic adjustment of silence duration for sentence detection. Adjusts post-speech silence duration based on detected sentence structure and punctuation. Default is False.')
  284. parser.add_argument('--silero_sensitivity', type=float, default=0.05,
  285. help='Sensitivity level for Silero Voice Activity Detection (VAD), with a range from 0 to 1. Lower values make the model less sensitive, useful for noisy environments. Default is 0.05.')
  286. parser.add_argument('--silero_use_onnx', action='store_true', default=False,
  287. help='Enable ONNX version of Silero model for faster performance with lower resource usage. Default is False.')
  288. parser.add_argument('--webrtc_sensitivity', type=int, default=3,
  289. help='Sensitivity level for WebRTC Voice Activity Detection (VAD), with a range from 0 to 3. Higher values make the model less sensitive, useful for cleaner environments. Default is 3.')
  290. parser.add_argument('--min_length_of_recording', type=float, default=1.1,
  291. help='Minimum duration of valid recordings in seconds. This prevents very short recordings from being processed, which could be caused by noise or accidental sounds. Default is 1.1 seconds.')
  292. parser.add_argument('--min_gap_between_recordings', type=float, default=0,
  293. help='Minimum time (in seconds) between consecutive recordings. Setting this helps avoid overlapping recordings when there’s a brief silence between them. Default is 0 seconds.')
  294. parser.add_argument('--enable_realtime_transcription', action='store_true', default=True,
  295. help='Enable continuous real-time transcription of audio as it is received. When enabled, transcriptions are sent in near real-time. Default is True.')
  296. parser.add_argument('--realtime_processing_pause', type=float, default=0.02,
  297. help='Time interval (in seconds) between processing audio chunks for real-time transcription. Lower values increase responsiveness but may put more load on the CPU. Default is 0.02 seconds.')
  298. parser.add_argument('--silero_deactivity_detection', action='store_true', default=True,
  299. help='Use the Silero model for end-of-speech detection. This option can provide more robust silence detection in noisy environments, though it consumes more GPU resources. Default is True.')
  300. parser.add_argument('--early_transcription_on_silence', type=float, default=0.2,
  301. help='Start transcription after the specified seconds of silence. This is useful when you want to trigger transcription mid-speech when there is a brief pause. Should be lower than post_speech_silence_duration. Set to 0 to disable. Default is 0.2 seconds.')
  302. parser.add_argument('--beam_size', type=int, default=5,
  303. help='Beam size for the main transcription model. Larger values may improve transcription accuracy but increase the processing time. Default is 5.')
  304. parser.add_argument('--beam_size_realtime', type=int, default=3,
  305. help='Beam size for the real-time transcription model. A smaller beam size allows for faster real-time processing but may reduce accuracy. Default is 3.')
  306. # parser.add_argument('--initial_prompt', type=str,
  307. # default='End incomplete sentences with ellipses.\nExamples:\nComplete: The sky is blue.\nIncomplete: When the sky...\nComplete: She walked home.\nIncomplete: Because he...',
  308. # help='Initial prompt that guides the transcription model to produce transcriptions in a particular style or format. The default provides instructions for handling sentence completions and ellipsis usage.')
  309. parser.add_argument('--initial_prompt', type=str,
  310. default="Incomplete thoughts should end with '...'. Examples of complete thoughts: 'The sky is blue.' 'She walked home.' Examples of incomplete thoughts: 'When the sky...' 'Because he...'",
  311. help='Initial prompt that guides the transcription model to produce transcriptions in a particular style or format. The default provides instructions for handling sentence completions and ellipsis usage.')
  312. parser.add_argument('--end_of_sentence_detection_pause', type=float, default=0.45,
  313. help='The duration of silence (in seconds) that the model should interpret as the end of a sentence. This helps the system detect when to finalize the transcription of a sentence. Default is 0.45 seconds.')
  314. parser.add_argument('--unknown_sentence_detection_pause', type=float, default=0.7,
  315. help='The duration of pause (in seconds) that the model should interpret as an incomplete or unknown sentence. This is useful for identifying when a sentence is trailing off or unfinished. Default is 0.7 seconds.')
  316. parser.add_argument('--mid_sentence_detection_pause', type=float, default=2.0,
  317. help='The duration of pause (in seconds) that the model should interpret as a mid-sentence break. Longer pauses can indicate a pause in speech but not necessarily the end of a sentence. Default is 2.0 seconds.')
  318. parser.add_argument('--use_main_model_for_realtime', action='store_true',
  319. help='Enable this option if you want to use the main model for real-time transcription, instead of the smaller, faster real-time model. Using the main model may provide better accuracy but at the cost of higher processing time.')
  320. parser.add_argument('--use_extended_logging', action='store_true',
  321. help='Writes extensive log messages for the recording worker, that processes the audio chunks.')
  322. parser.add_argument('--logchunks', action='store_true', help='Enable logging of incoming audio chunks (periods)')
  323. # Parse arguments
  324. args = parser.parse_args()
  325. debug_logging = args.debug
  326. extended_logging = args.use_extended_logging
  327. writechunks = args.write
  328. log_incoming_chunks = args.logchunks
  329. dynamic_silence_timing = args.silence_timing
  330. if debug_logging:
  331. loglevel = logging.DEBUG
  332. logging.basicConfig(level=loglevel, format='[%(asctime)s] %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
  333. else:
  334. loglevel = logging.WARNING
  335. # Replace escaped newlines with actual newlines in initial_prompt
  336. if args.initial_prompt:
  337. args.initial_prompt = args.initial_prompt.replace("\\n", "\n")
  338. return args
  339. def _recorder_thread(loop):
  340. global recorder, stop_recorder
  341. print(f"{bcolors.OKGREEN}Initializing RealtimeSTT server with parameters:{bcolors.ENDC}")
  342. for key, value in recorder_config.items():
  343. print(f" {bcolors.OKBLUE}{key}{bcolors.ENDC}: {value}")
  344. recorder = AudioToTextRecorder(**recorder_config)
  345. print(f"{bcolors.OKGREEN}{bcolors.BOLD}RealtimeSTT initialized{bcolors.ENDC}")
  346. recorder_ready.set()
  347. def process_text(full_sentence):
  348. global prev_text
  349. prev_text = ""
  350. full_sentence = preprocess_text(full_sentence)
  351. message = json.dumps({
  352. 'type': 'fullSentence',
  353. 'text': full_sentence
  354. })
  355. # Use the passed event loop here
  356. asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
  357. timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
  358. if extended_logging:
  359. print(f" [{timestamp}] Full text: {bcolors.BOLD}Sentence:{bcolors.ENDC} {bcolors.OKGREEN}{full_sentence}{bcolors.ENDC}\n", flush=True, end="")
  360. else:
  361. print(f"\r[{timestamp}] {bcolors.BOLD}Sentence:{bcolors.ENDC} {bcolors.OKGREEN}{full_sentence}{bcolors.ENDC}\n")
  362. try:
  363. while not stop_recorder:
  364. recorder.text(process_text)
  365. except KeyboardInterrupt:
  366. print(f"{bcolors.WARNING}Exiting application due to keyboard interrupt{bcolors.ENDC}")
  367. def decode_and_resample(
  368. audio_data,
  369. original_sample_rate,
  370. target_sample_rate):
  371. # Decode 16-bit PCM data to numpy array
  372. if original_sample_rate == target_sample_rate:
  373. return audio_data
  374. audio_np = np.frombuffer(audio_data, dtype=np.int16)
  375. # Calculate the number of samples after resampling
  376. num_original_samples = len(audio_np)
  377. num_target_samples = int(num_original_samples * target_sample_rate /
  378. original_sample_rate)
  379. # Resample the audio
  380. resampled_audio = resample(audio_np, num_target_samples)
  381. return resampled_audio.astype(np.int16).tobytes()
  382. async def control_handler(websocket, path):
  383. debug_print(f"New control connection from {websocket.remote_address}")
  384. print(f"{bcolors.OKGREEN}Control client connected{bcolors.ENDC}")
  385. global recorder
  386. control_connections.add(websocket)
  387. try:
  388. async for message in websocket:
  389. debug_print(f"Received control message: {message[:200]}...")
  390. if not recorder_ready.is_set():
  391. print(f"{bcolors.WARNING}Recorder not ready{bcolors.ENDC}")
  392. continue
  393. if isinstance(message, str):
  394. # Handle text message (command)
  395. try:
  396. command_data = json.loads(message)
  397. command = command_data.get("command")
  398. if command == "set_parameter":
  399. parameter = command_data.get("parameter")
  400. value = command_data.get("value")
  401. if parameter in allowed_parameters and hasattr(recorder, parameter):
  402. setattr(recorder, parameter, value)
  403. # Format the value for output
  404. if isinstance(value, float):
  405. value_formatted = f"{value:.2f}"
  406. else:
  407. value_formatted = value
  408. timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
  409. if extended_logging:
  410. print(f" [{timestamp}] {bcolors.OKGREEN}Set recorder.{parameter} to: {bcolors.OKBLUE}{value_formatted}{bcolors.ENDC}")
  411. # Optionally send a response back to the client
  412. await websocket.send(json.dumps({"status": "success", "message": f"Parameter {parameter} set to {value}"}))
  413. else:
  414. if not parameter in allowed_parameters:
  415. print(f"{bcolors.WARNING}Parameter {parameter} is not allowed (set_parameter){bcolors.ENDC}")
  416. await websocket.send(json.dumps({"status": "error", "message": f"Parameter {parameter} is not allowed (set_parameter)"}))
  417. else:
  418. print(f"{bcolors.WARNING}Parameter {parameter} does not exist (set_parameter){bcolors.ENDC}")
  419. await websocket.send(json.dumps({"status": "error", "message": f"Parameter {parameter} does not exist (set_parameter)"}))
  420. elif command == "get_parameter":
  421. parameter = command_data.get("parameter")
  422. request_id = command_data.get("request_id") # Get the request_id from the command data
  423. if parameter in allowed_parameters and hasattr(recorder, parameter):
  424. value = getattr(recorder, parameter)
  425. if isinstance(value, float):
  426. value_formatted = f"{value:.2f}"
  427. else:
  428. value_formatted = f"{value}"
  429. value_truncated = value_formatted[:39] + "…" if len(value_formatted) > 40 else value_formatted
  430. timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
  431. if extended_logging:
  432. print(f" [{timestamp}] {bcolors.OKGREEN}Get recorder.{parameter}: {bcolors.OKBLUE}{value_truncated}{bcolors.ENDC}")
  433. response = {"status": "success", "parameter": parameter, "value": value}
  434. if request_id is not None:
  435. response["request_id"] = request_id
  436. await websocket.send(json.dumps(response))
  437. else:
  438. if not parameter in allowed_parameters:
  439. print(f"{bcolors.WARNING}Parameter {parameter} is not allowed (get_parameter){bcolors.ENDC}")
  440. await websocket.send(json.dumps({"status": "error", "message": f"Parameter {parameter} is not allowed (get_parameter)"}))
  441. else:
  442. print(f"{bcolors.WARNING}Parameter {parameter} does not exist (get_parameter){bcolors.ENDC}")
  443. await websocket.send(json.dumps({"status": "error", "message": f"Parameter {parameter} does not exist (get_parameter)"}))
  444. elif command == "call_method":
  445. method_name = command_data.get("method")
  446. if method_name in allowed_methods:
  447. method = getattr(recorder, method_name, None)
  448. if method and callable(method):
  449. args = command_data.get("args", [])
  450. kwargs = command_data.get("kwargs", {})
  451. method(*args, **kwargs)
  452. timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
  453. print(f" [{timestamp}] {bcolors.OKGREEN}Called method recorder.{bcolors.OKBLUE}{method_name}{bcolors.ENDC}")
  454. await websocket.send(json.dumps({"status": "success", "message": f"Method {method_name} called"}))
  455. else:
  456. print(f"{bcolors.WARNING}Recorder does not have method {method_name}{bcolors.ENDC}")
  457. await websocket.send(json.dumps({"status": "error", "message": f"Recorder does not have method {method_name}"}))
  458. else:
  459. print(f"{bcolors.WARNING}Method {method_name} is not allowed{bcolors.ENDC}")
  460. await websocket.send(json.dumps({"status": "error", "message": f"Method {method_name} is not allowed"}))
  461. else:
  462. print(f"{bcolors.WARNING}Unknown command: {command}{bcolors.ENDC}")
  463. await websocket.send(json.dumps({"status": "error", "message": f"Unknown command {command}"}))
  464. except json.JSONDecodeError:
  465. print(f"{bcolors.WARNING}Received invalid JSON command{bcolors.ENDC}")
  466. await websocket.send(json.dumps({"status": "error", "message": "Invalid JSON command"}))
  467. else:
  468. print(f"{bcolors.WARNING}Received unknown message type on control connection{bcolors.ENDC}")
  469. except websockets.exceptions.ConnectionClosed as e:
  470. print(f"{bcolors.WARNING}Control client disconnected: {e}{bcolors.ENDC}")
  471. finally:
  472. control_connections.remove(websocket)
  473. async def data_handler(websocket, path):
  474. global writechunks, wav_file
  475. print(f"{bcolors.OKGREEN}Data client connected{bcolors.ENDC}")
  476. data_connections.add(websocket)
  477. try:
  478. while True:
  479. message = await websocket.recv()
  480. if isinstance(message, bytes):
  481. if debug_logging:
  482. debug_print(f"Received audio chunk (size: {len(message)} bytes)")
  483. elif log_incoming_chunks:
  484. print(".", end='', flush=True)
  485. # Handle binary message (audio data)
  486. metadata_length = int.from_bytes(message[:4], byteorder='little')
  487. metadata_json = message[4:4+metadata_length].decode('utf-8')
  488. metadata = json.loads(metadata_json)
  489. sample_rate = metadata['sampleRate']
  490. debug_print(f"Processing audio chunk with sample rate {sample_rate}")
  491. chunk = message[4+metadata_length:]
  492. if writechunks:
  493. if not wav_file:
  494. wav_file = wave.open(writechunks, 'wb')
  495. wav_file.setnchannels(CHANNELS)
  496. wav_file.setsampwidth(pyaudio.get_sample_size(FORMAT))
  497. wav_file.setframerate(sample_rate)
  498. wav_file.writeframes(chunk)
  499. resampled_chunk = decode_and_resample(chunk, sample_rate, 16000)
  500. debug_print(f"Resampled chunk size: {len(resampled_chunk)} bytes")
  501. recorder.feed_audio(resampled_chunk)
  502. else:
  503. print(f"{bcolors.WARNING}Received non-binary message on data connection{bcolors.ENDC}")
  504. except websockets.exceptions.ConnectionClosed as e:
  505. print(f"{bcolors.WARNING}Data client disconnected: {e}{bcolors.ENDC}")
  506. finally:
  507. data_connections.remove(websocket)
  508. recorder.clear_audio_queue() # Ensure audio queue is cleared if client disconnects
  509. async def broadcast_audio_messages():
  510. while True:
  511. message = await audio_queue.get()
  512. for conn in list(data_connections):
  513. try:
  514. timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
  515. if extended_logging:
  516. print(f" [{timestamp}] Sending message: {bcolors.OKBLUE}{message}{bcolors.ENDC}\n", flush=True, end="")
  517. await conn.send(message)
  518. except websockets.exceptions.ConnectionClosed:
  519. data_connections.remove(conn)
  520. # Helper function to create event loop bound closures for callbacks
  521. def make_callback(loop, callback):
  522. def inner_callback(*args, **kwargs):
  523. callback(*args, **kwargs, loop=loop)
  524. return inner_callback
  525. async def main_async():
  526. global stop_recorder, recorder_config, global_args
  527. args = parse_arguments()
  528. global_args = args
  529. # Get the event loop here and pass it to the recorder thread
  530. loop = asyncio.get_event_loop()
  531. recorder_config = {
  532. 'model': args.model,
  533. 'realtime_model_type': args.rt_model,
  534. 'language': args.lang,
  535. 'input_device_index': args.input_device,
  536. 'silero_sensitivity': args.silero_sensitivity,
  537. 'silero_use_onnx': args.silero_use_onnx,
  538. 'webrtc_sensitivity': args.webrtc_sensitivity,
  539. 'post_speech_silence_duration': args.unknown_sentence_detection_pause,
  540. 'min_length_of_recording': args.min_length_of_recording,
  541. 'min_gap_between_recordings': args.min_gap_between_recordings,
  542. 'enable_realtime_transcription': args.enable_realtime_transcription,
  543. 'realtime_processing_pause': args.realtime_processing_pause,
  544. 'silero_deactivity_detection': args.silero_deactivity_detection,
  545. 'early_transcription_on_silence': args.early_transcription_on_silence,
  546. 'beam_size': args.beam_size,
  547. 'beam_size_realtime': args.beam_size_realtime,
  548. 'initial_prompt': args.initial_prompt,
  549. 'use_main_model_for_realtime': args.use_main_model_for_realtime,
  550. 'spinner': False,
  551. 'use_microphone': False,
  552. 'on_realtime_transcription_update': make_callback(loop, text_detected),
  553. 'on_recording_start': make_callback(loop, on_recording_start),
  554. 'on_recording_stop': make_callback(loop, on_recording_stop),
  555. 'on_vad_detect_start': make_callback(loop, on_vad_detect_start),
  556. 'on_vad_detect_stop': make_callback(loop, on_vad_detect_stop),
  557. 'on_transcription_start': make_callback(loop, on_transcription_start),
  558. # 'on_recorded_chunk': make_callback(loop, on_recorded_chunk),
  559. 'no_log_file': True, # Disable logging to file
  560. 'use_extended_logging': args.use_extended_logging,
  561. 'level': loglevel,
  562. }
  563. try:
  564. # Attempt to start control and data servers
  565. control_server = await websockets.serve(control_handler, "localhost", args.control)
  566. data_server = await websockets.serve(data_handler, "localhost", args.data)
  567. print(f"{bcolors.OKGREEN}Control server started on {bcolors.OKBLUE}ws://localhost:{args.control}{bcolors.ENDC}")
  568. print(f"{bcolors.OKGREEN}Data server started on {bcolors.OKBLUE}ws://localhost:{args.data}{bcolors.ENDC}")
  569. # Start the broadcast and recorder threads
  570. broadcast_task = asyncio.create_task(broadcast_audio_messages())
  571. recorder_thread = threading.Thread(target=_recorder_thread, args=(loop,))
  572. recorder_thread.start()
  573. recorder_ready.wait()
  574. print(f"{bcolors.OKGREEN}Server started. Press Ctrl+C to stop the server.{bcolors.ENDC}")
  575. # Run server tasks
  576. await asyncio.gather(control_server.wait_closed(), data_server.wait_closed(), broadcast_task)
  577. except OSError as e:
  578. print(f"{bcolors.FAIL}Error: Could not start server on specified ports. It’s possible another instance of the server is already running, or the ports are being used by another application.{bcolors.ENDC}")
  579. except KeyboardInterrupt:
  580. print(f"{bcolors.WARNING}Server interrupted by user, shutting down...{bcolors.ENDC}")
  581. finally:
  582. # Shutdown procedures for recorder and server threads
  583. await shutdown_procedure()
  584. print(f"{bcolors.OKGREEN}Server shutdown complete.{bcolors.ENDC}")
  585. async def shutdown_procedure():
  586. global stop_recorder, recorder_thread
  587. if recorder:
  588. stop_recorder = True
  589. recorder.abort()
  590. recorder.stop()
  591. recorder.shutdown()
  592. print(f"{bcolors.OKGREEN}Recorder shut down{bcolors.ENDC}")
  593. if recorder_thread:
  594. recorder_thread.join()
  595. print(f"{bcolors.OKGREEN}Recorder thread finished{bcolors.ENDC}")
  596. tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
  597. for task in tasks:
  598. task.cancel()
  599. await asyncio.gather(*tasks, return_exceptions=True)
  600. print(f"{bcolors.OKGREEN}All tasks cancelled, closing event loop now.{bcolors.ENDC}")
  601. def main():
  602. try:
  603. asyncio.run(main_async())
  604. except KeyboardInterrupt:
  605. # Capture any final KeyboardInterrupt to prevent it from showing up in logs
  606. print(f"{bcolors.WARNING}Server interrupted by user.{bcolors.ENDC}")
  607. exit(0)
  608. if __name__ == '__main__':
  609. main()