log_outgoing_chunks = False debug_mode = False from typing import Iterable, List, Optional, Union from urllib.parse import urlparse from datetime import datetime import subprocess import websocket import threading import platform import logging import pyaudio import socket import struct import signal import json import time import sys import os DEFAULT_CONTROL_URL = "ws://127.0.0.1:8011" DEFAULT_DATA_URL = "ws://127.0.0.1:8012" INIT_MODEL_TRANSCRIPTION = "tiny" INIT_MODEL_TRANSCRIPTION_REALTIME = "tiny" INIT_REALTIME_PROCESSING_PAUSE = 0.2 INIT_SILERO_SENSITIVITY = 0.4 INIT_WEBRTC_SENSITIVITY = 3 INIT_POST_SPEECH_SILENCE_DURATION = 0.6 INIT_MIN_LENGTH_OF_RECORDING = 0.5 INIT_MIN_GAP_BETWEEN_RECORDINGS = 0 INIT_WAKE_WORDS_SENSITIVITY = 0.6 INIT_PRE_RECORDING_BUFFER_DURATION = 1.0 INIT_WAKE_WORD_ACTIVATION_DELAY = 0.0 INIT_WAKE_WORD_TIMEOUT = 5.0 INIT_WAKE_WORD_BUFFER_DURATION = 0.1 ALLOWED_LATENCY_LIMIT = 100 CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 SAMPLE_RATE = 16000 BUFFER_SIZE = 512 INIT_HANDLE_BUFFER_OVERFLOW = False if platform.system() != 'Darwin': INIT_HANDLE_BUFFER_OVERFLOW = True # Define ANSI color codes for terminal output class bcolors: HEADER = '\033[95m' # Magenta OKBLUE = '\033[94m' # Blue OKCYAN = '\033[96m' # Cyan OKGREEN = '\033[92m' # Green WARNING = '\033[93m' # Yellow FAIL = '\033[91m' # Red ENDC = '\033[0m' # Reset to default BOLD = '\033[1m' UNDERLINE = '\033[4m' class AudioToTextRecorderClient: """ A class responsible for capturing audio from the microphone, detecting voice activity, and then transcribing the captured audio using the `faster_whisper` model. """ def __init__(self, model: str = INIT_MODEL_TRANSCRIPTION, language: str = "", compute_type: str = "default", input_device_index: int = None, gpu_device_index: Union[int, List[int]] = 0, device: str = "cuda", on_recording_start=None, on_recording_stop=None, on_transcription_start=None, ensure_sentence_starting_uppercase=True, ensure_sentence_ends_with_period=True, use_microphone=True, spinner=True, level=logging.WARNING, # Realtime transcription parameters enable_realtime_transcription=False, use_main_model_for_realtime=False, realtime_model_type=INIT_MODEL_TRANSCRIPTION_REALTIME, realtime_processing_pause=INIT_REALTIME_PROCESSING_PAUSE, on_realtime_transcription_update=None, on_realtime_transcription_stabilized=None, # Voice activation parameters silero_sensitivity: float = INIT_SILERO_SENSITIVITY, silero_use_onnx: bool = False, silero_deactivity_detection: bool = False, webrtc_sensitivity: int = INIT_WEBRTC_SENSITIVITY, post_speech_silence_duration: float = ( INIT_POST_SPEECH_SILENCE_DURATION ), min_length_of_recording: float = ( INIT_MIN_LENGTH_OF_RECORDING ), min_gap_between_recordings: float = ( INIT_MIN_GAP_BETWEEN_RECORDINGS ), pre_recording_buffer_duration: float = ( INIT_PRE_RECORDING_BUFFER_DURATION ), on_vad_detect_start=None, on_vad_detect_stop=None, # Wake word parameters wakeword_backend: str = "pvporcupine", openwakeword_model_paths: str = None, openwakeword_inference_framework: str = "onnx", wake_words: str = "", wake_words_sensitivity: float = INIT_WAKE_WORDS_SENSITIVITY, wake_word_activation_delay: float = ( INIT_WAKE_WORD_ACTIVATION_DELAY ), wake_word_timeout: float = INIT_WAKE_WORD_TIMEOUT, wake_word_buffer_duration: float = INIT_WAKE_WORD_BUFFER_DURATION, on_wakeword_detected=None, on_wakeword_timeout=None, on_wakeword_detection_start=None, on_wakeword_detection_end=None, on_recorded_chunk=None, debug_mode=False, handle_buffer_overflow: bool = INIT_HANDLE_BUFFER_OVERFLOW, beam_size: int = 5, beam_size_realtime: int = 3, buffer_size: int = BUFFER_SIZE, sample_rate: int = SAMPLE_RATE, initial_prompt: Optional[Union[str, Iterable[int]]] = None, suppress_tokens: Optional[List[int]] = [-1], print_transcription_time: bool = False, early_transcription_on_silence: int = 0, allowed_latency_limit: int = ALLOWED_LATENCY_LIMIT, no_log_file: bool = False, use_extended_logging: bool = False, # Server urls control_url: str = DEFAULT_CONTROL_URL, data_url: str = DEFAULT_DATA_URL, autostart_server: bool = True, ): # Set instance variables from constructor parameters self.model = model self.language = language self.compute_type = compute_type self.input_device_index = input_device_index self.gpu_device_index = gpu_device_index self.device = device self.on_recording_start = on_recording_start self.on_recording_stop = on_recording_stop self.on_transcription_start = on_transcription_start self.ensure_sentence_starting_uppercase = ensure_sentence_starting_uppercase self.ensure_sentence_ends_with_period = ensure_sentence_ends_with_period self.use_microphone = use_microphone self.spinner = spinner self.level = level # Real-time transcription parameters self.enable_realtime_transcription = enable_realtime_transcription self.use_main_model_for_realtime = use_main_model_for_realtime self.realtime_model_type = realtime_model_type self.realtime_processing_pause = realtime_processing_pause self.on_realtime_transcription_update = on_realtime_transcription_update self.on_realtime_transcription_stabilized = on_realtime_transcription_stabilized # Voice activation parameters self.silero_sensitivity = silero_sensitivity self.silero_use_onnx = silero_use_onnx self.silero_deactivity_detection = silero_deactivity_detection self.webrtc_sensitivity = webrtc_sensitivity self.post_speech_silence_duration = post_speech_silence_duration self.min_length_of_recording = min_length_of_recording self.min_gap_between_recordings = min_gap_between_recordings self.pre_recording_buffer_duration = pre_recording_buffer_duration self.on_vad_detect_start = on_vad_detect_start self.on_vad_detect_stop = on_vad_detect_stop # Wake word parameters self.wakeword_backend = wakeword_backend self.openwakeword_model_paths = openwakeword_model_paths self.openwakeword_inference_framework = openwakeword_inference_framework self.wake_words = wake_words self.wake_words_sensitivity = wake_words_sensitivity self.wake_word_activation_delay = wake_word_activation_delay self.wake_word_timeout = wake_word_timeout self.wake_word_buffer_duration = wake_word_buffer_duration self.on_wakeword_detected = on_wakeword_detected self.on_wakeword_timeout = on_wakeword_timeout self.on_wakeword_detection_start = on_wakeword_detection_start self.on_wakeword_detection_end = on_wakeword_detection_end self.on_recorded_chunk = on_recorded_chunk self.debug_mode = debug_mode self.handle_buffer_overflow = handle_buffer_overflow self.beam_size = beam_size self.beam_size_realtime = beam_size_realtime self.buffer_size = buffer_size self.sample_rate = sample_rate self.initial_prompt = initial_prompt self.suppress_tokens = suppress_tokens self.print_transcription_time = print_transcription_time self.early_transcription_on_silence = early_transcription_on_silence self.allowed_latency_limit = allowed_latency_limit self.no_log_file = no_log_file self.use_extended_logging = use_extended_logging # Server URLs self.control_url = control_url self.data_url = data_url self.autostart_server = autostart_server # Instance variables self.muted = False self.recording_thread = None self.is_running = True self.connection_established = threading.Event() self.recording_start = threading.Event() self.final_text_ready = threading.Event() self.realtime_text = "" self.final_text = "" self.request_counter = 0 self.pending_requests = {} # Map from request_id to threading.Event and value if self.debug_mode: print("Checking STT server") if not self.connect(): print("Failed to connect to the server.", file=sys.stderr) else: if self.debug_mode: print("STT server is running and connected.") if self.use_microphone: self.start_recording() def text(self, on_transcription_finished=None): self.realtime_text = "" self.submitted_realtime_text = "" self.final_text = "" self.final_text_ready.clear() self.recording_start.set() try: total_wait_time = 0 wait_interval = 0.02 # Wait in small intervals, e.g., 100ms max_wait_time = 60 # Timeout after 60 seconds while total_wait_time < max_wait_time: if self.final_text_ready.wait(timeout=wait_interval): break # Break if transcription is ready # if not self.realtime_text == self.submitted_realtime_text: # if self.on_realtime_transcription_update: # self.on_realtime_transcription_update(self.realtime_text) # self.submitted_realtime_text = self.realtime_text total_wait_time += wait_interval # Check if a manual interrupt has occurred if total_wait_time >= max_wait_time: if self.debug_mode: print("Timeout while waiting for text from the server.") self.recording_start.clear() if on_transcription_finished: threading.Thread(target=on_transcription_finished, args=("",)).start() return "" self.recording_start.clear() if on_transcription_finished: threading.Thread(target=on_transcription_finished, args=(self.final_text,)).start() return self.final_text except KeyboardInterrupt: if self.debug_mode: print("KeyboardInterrupt in record_and_send_audio, exiting...") raise KeyboardInterrupt except Exception as e: print(f"Error in AudioToTextRecorderClient.text(): {e}") return "" def feed_audio(self, chunk, original_sample_rate=16000): metadata = {"sampleRate": original_sample_rate} metadata_json = json.dumps(metadata) metadata_length = len(metadata_json) message = struct.pack('