from urllib.parse import urlparse from scipy import signal from queue import Queue import numpy as np import subprocess import threading import websocket import argparse import pyaudio import logging import struct import socket import shutil import queue import json import time import sys import os os.environ['ALSA_LOG_LEVEL'] = 'none' # Constants CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 44100 DEFAULT_CONTROL_URL = "ws://127.0.0.1:8011" DEFAULT_DATA_URL = "ws://127.0.0.1:8012" # Initialize colorama from colorama import init, Fore, Style init() # Stop websocket from spamming the log websocket.enableTrace(False) class STTWebSocketClient: def __init__(self, control_url, data_url, debug=False, file_output=None, norealtime=False): self.control_url = control_url self.data_url = data_url self.control_ws = None self.data_ws_app = None self.data_ws_connected = None # WebSocket object that will be used for sending self.is_running = True self.debug = debug self.file_output = file_output self.last_text = "" self.console_width = shutil.get_terminal_size().columns self.recording_indicator = "🔴" self.norealtime = norealtime self.connection_established = threading.Event() self.message_queue = Queue() self.commands = Queue() self.stop_event = threading.Event() # Audio attributes self.audio_interface = None self.stream = None self.device_sample_rate = None self.input_device_index = None # Threads self.control_ws_thread = None self.data_ws_thread = None self.recording_thread = None def debug_print(self, message): if self.debug: print(message, file=sys.stderr) def connect(self): if not self.ensure_server_running(): self.debug_print("Cannot start STT server. Exiting.") return False try: # Connect to control WebSocket self.control_ws = websocket.WebSocketApp(self.control_url, on_message=self.on_control_message, on_error=self.on_error, on_close=self.on_close, on_open=self.on_control_open) self.control_ws_thread = threading.Thread(target=self.control_ws.run_forever) self.control_ws_thread.daemon = False # Set to False to ensure proper shutdown self.control_ws_thread.start() # Connect to data WebSocket self.data_ws_app = websocket.WebSocketApp(self.data_url, on_message=self.on_data_message, on_error=self.on_error, on_close=self.on_close, on_open=self.on_data_open) self.data_ws_thread = threading.Thread(target=self.data_ws_app.run_forever) self.data_ws_thread.daemon = False # Set to False to ensure proper shutdown self.data_ws_thread.start() # Wait for the connections to be established if not self.connection_established.wait(timeout=10): self.debug_print("Timeout while connecting to the server.") return False self.debug_print("WebSocket connections established successfully.") return True except Exception as e: self.debug_print(f"Error while connecting to the server: {e}") return False def on_control_open(self, ws): self.debug_print("Control WebSocket connection opened.") self.connection_established.set() self.start_command_processor() def on_data_open(self, ws): self.debug_print("Data WebSocket connection opened.") self.data_ws_connected = ws # Store the connected websocket object for sending data self.start_recording() def on_error(self, ws, error): self.debug_print(f"WebSocket error: {error}") def on_close(self, ws, close_status_code, close_msg): self.debug_print(f"WebSocket connection closed: {close_status_code} - {close_msg}") self.is_running = False self.stop_event.set() def is_server_running(self): parsed_url = urlparse(self.control_url) host = parsed_url.hostname port = parsed_url.port or 80 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex((host, port)) == 0 def ask_to_start_server(self): response = input("Would you like to start the STT server now? (y/n): ").strip().lower() return response == 'y' or response == 'yes' def start_server(self): if os.name == 'nt': # Windows subprocess.Popen('start /min cmd /c stt-server', shell=True) else: # Unix-like systems terminal_emulators = [ 'gnome-terminal', 'x-terminal-emulator', 'konsole', 'xfce4-terminal', 'lxterminal', 'xterm', 'mate-terminal', 'terminator', 'tilix', 'alacritty', 'urxvt', 'eterm', 'rxvt', 'kitty', 'hyper' ] terminal = None for term in terminal_emulators: if shutil.which(term): terminal = term break if terminal: terminal_exec_options = { 'x-terminal-emulator': ['--'], 'gnome-terminal': ['--'], 'mate-terminal': ['--'], 'terminator': ['--'], 'tilix': ['--'], 'konsole': ['-e'], 'xfce4-terminal': ['-e'], 'lxterminal': ['-e'], 'alacritty': ['-e'], 'xterm': ['-e'], 'rxvt': ['-e'], 'urxvt': ['-e'], 'eterm': ['-e'], 'kitty': [], 'hyper': ['--command'] } exec_option = terminal_exec_options.get(terminal, None) if exec_option is not None: subprocess.Popen([terminal] + exec_option + ['stt-server'], start_new_session=True) print(f"STT server started in a new terminal window using {terminal}.", file=sys.stderr) else: print(f"Unsupported terminal emulator '{terminal}'. Please start the STT server manually.", file=sys.stderr) else: print("No supported terminal emulator found. Please start the STT server manually.", file=sys.stderr) def ensure_server_running(self): if not self.is_server_running(): print("STT server is not running.", file=sys.stderr) if self.ask_to_start_server(): self.start_server() print("Waiting for STT server to start...", file=sys.stderr) for _ in range(20): # Wait up to 20 seconds if self.is_server_running(): print("STT server started successfully.", file=sys.stderr) time.sleep(2) # Give the server a moment to fully initialize return True time.sleep(1) print("Failed to start STT server.", file=sys.stderr) return False else: print("STT server is required. Please start it manually.", file=sys.stderr) return False return True def on_control_message(self, ws, message): try: data = json.loads(message) if 'status' in data: if data['status'] == 'success': if 'parameter' in data and 'value' in data: print(f"Parameter {data['parameter']} = {data['value']}") elif data['status'] == 'error': print(f"Server Error: {data.get('message', '')}") else: self.debug_print(f"Unknown control message format: {data}") except json.JSONDecodeError: self.debug_print(f"Received non-JSON control message: {message}") except Exception as e: self.debug_print(f"Error processing control message: {e}") def on_data_message(self, ws, message): try: data = json.loads(message) message_type = data.get('type') if message_type == 'realtime': if data['text'] != self.last_text: self.last_text = data['text'] if not self.norealtime: self.update_progress_bar(self.last_text) elif message_type == 'fullSentence': if self.file_output: sys.stderr.write('\r\033[K') sys.stderr.write(data['text']) sys.stderr.write('\n') sys.stderr.flush() print(data['text'], file=self.file_output) self.file_output.flush() # Ensure it's written immediately else: self.finish_progress_bar() print(f"{data['text']}") self.stop() elif message_type in { 'vad_detect_start', 'vad_detect_stop', 'recording_start', 'recording_stop', 'wakeword_detected', 'wakeword_detection_start', 'wakeword_detection_end', 'transcription_start'}: pass # Known message types, no action needed else: self.debug_print(f"Unknown data message format: {data}") except json.JSONDecodeError: self.debug_print(f"Received non-JSON data message: {message}") except Exception as e: self.debug_print(f"Error processing data message: {e}") def show_initial_indicator(self): if self.norealtime: return initial_text = f"{self.recording_indicator}\b\b" sys.stderr.write(initial_text) sys.stderr.flush() def update_progress_bar(self, text): try: available_width = self.console_width - 5 # Adjust for progress bar decorations sys.stderr.write('\r\033[K') # Clear the current line words = text.split() last_chars = "" for word in reversed(words): if len(last_chars) + len(word) + 1 > available_width: break last_chars = word + " " + last_chars last_chars = last_chars.strip() colored_text = f"{Fore.YELLOW}{last_chars}{Style.RESET_ALL}{self.recording_indicator}\b\b" sys.stderr.write(colored_text) sys.stderr.flush() except Exception as e: self.debug_print(f"Error updating progress bar: {e}") def finish_progress_bar(self): try: sys.stderr.write('\r\033[K') sys.stderr.flush() except Exception as e: self.debug_print(f"Error finishing progress bar: {e}") def stop(self): self.finish_progress_bar() self.is_running = False self.stop_event.set() self.debug_print("Stopping client and cleaning up resources.") if self.control_ws: self.control_ws.close() if self.data_ws_connected: self.data_ws_connected.close() # Join threads to ensure they finish before exiting if self.control_ws_thread: self.control_ws_thread.join() if self.data_ws_thread: self.data_ws_thread.join() if self.recording_thread: self.recording_thread.join() # Clean up audio resources if self.stream: self.stream.stop_stream() self.stream.close() if self.audio_interface: self.audio_interface.terminate() def start_recording(self): self.recording_thread = threading.Thread(target=self.record_and_send_audio) self.recording_thread.daemon = False # Set to False to ensure proper shutdown self.recording_thread.start() def record_and_send_audio(self): try: if not self.setup_audio(): raise Exception("Failed to set up audio recording.") self.debug_print("Recording and sending audio...") self.show_initial_indicator() while self.is_running: try: audio_data = self.stream.read(CHUNK) metadata = {"sampleRate": self.device_sample_rate} metadata_json = json.dumps(metadata) metadata_length = len(metadata_json) message = struct.pack(' 1 else [] client.add_command({ 'type': 'call_method', 'method': method, 'args': args_list }) # If command-line parameters were used (like --get-param), wait for them to be processed if args.set_param or args.get_param or args.call_method: while not client.commands.empty(): time.sleep(0.1) # Start recording directly if no command-line params were provided while client.is_running: time.sleep(0.1) else: print("Failed to connect to the server.", file=sys.stderr) except Exception as e: print(f"An error occurred: {e}") finally: client.stop() if __name__ == "__main__": main()