Browse Source

upgraded CLI client/server files

KoljaB 6 months ago
parent
commit
1ab0c11286
3 changed files with 884 additions and 269 deletions
  1. 225 0
      server/README.md
  2. 373 209
      server/stt_cli_client.py
  3. 286 60
      server/stt_server.py

+ 225 - 0
server/README.md

@@ -0,0 +1,225 @@
+# RealtimeSTT Server and Client
+
+This directory contains the server and client implementations for the RealtimeSTT library, providing real-time speech-to-text transcription with WebSocket interfaces. The server allows clients to connect via WebSocket to send audio data and receive real-time transcription updates. The client facilitates interaction with the server, enabling audio recording, parameter management, and control commands.
+
+## Table of Contents
+
+- [Features](#features)
+- [Installation](#installation)
+- [Server Usage](#server-usage)
+  - [Starting the Server](#starting-the-server)
+  - [Server Parameters](#server-parameters)
+- [Client Usage](#client-usage)
+  - [Starting the Client](#starting-the-client)
+  - [Client Parameters](#client-parameters)
+- [WebSocket Interface](#websocket-interface)
+- [Examples](#examples)
+  - [Starting the Server and Client](#starting-the-server-and-client)
+  - [Setting Parameters](#setting-parameters)
+  - [Retrieving Parameters](#retrieving-parameters)
+  - [Calling Server Methods](#calling-server-methods)
+- [Contributing](#contributing)
+- [License](#license)
+
+## Features
+
+- **Real-Time Transcription**: Provides real-time speech-to-text transcription using pre-configured or user-defined STT models.
+- **WebSocket Communication**: Utilizes WebSocket connections for control commands and data handling.
+- **Flexible Recording Options**: Supports configurable pauses for sentence detection and various voice activity detection (VAD) methods.
+- **VAD Support**: Includes support for Silero and WebRTC VAD for robust voice activity detection.
+- **Wake Word Detection**: Capable of detecting wake words to initiate transcription.
+- **Configurable Parameters**: Allows fine-tuning of recording and transcription settings via command-line arguments or control commands.
+
+## Installation
+
+Ensure you have Python 3.8 or higher installed. Install the required packages using:
+
+```bash
+pip install git+https://github.com/KoljaB/RealtimeSTT.git@dev
+```
+
+## Server Usage
+
+### Starting the Server
+
+Start the server using the command-line interface:
+
+```bash
+stt-server [OPTIONS]
+```
+
+The server will initialize and begin listening for WebSocket connections on the specified control and data ports.
+
+### Server Parameters
+
+You can configure the server using the following command-line arguments:
+
+- `--model` (str, default: `'medium.en'`): Path to the STT model or model size. Options include `tiny`, `tiny.en`, `base`, `base.en`, `small`, `small.en`, `medium`, `medium.en`, `large-v1`, `large-v2`, or any Hugging Face CTranslate2 STT model like `deepdml/faster-whisper-large-v3-turbo-ct2`.
+
+- `--realtime_model_type` (str, default: `'tiny.en'`): Model size for real-time transcription. Same options as `--model`.
+
+- `--language` (str, default: `'en'`): Language code for the STT model. Leave empty for auto-detection.
+
+- `--input_device_index` (int, default: `1`): Index of the audio input device to use.
+
+- `--silero_sensitivity` (float, default: `0.05`): Sensitivity for Silero VAD. Lower values are less sensitive.
+
+- `--webrtc_sensitivity` (float, default: `3`): Sensitivity for WebRTC VAD. Higher values are less sensitive.
+
+- `--min_length_of_recording` (float, default: `1.1`): Minimum duration (in seconds) for a valid recording.
+
+- `--min_gap_between_recordings` (float, default: `0`): Minimum time (in seconds) between consecutive recordings.
+
+- `--enable_realtime_transcription` (flag, default: `True`): Enable real-time transcription of audio.
+
+- `--realtime_processing_pause` (float, default: `0.02`): Time interval (in seconds) between processing audio chunks for real-time transcription.
+
+- `--silero_deactivity_detection` (flag, default: `True`): Use Silero model for end-of-speech detection.
+
+- `--early_transcription_on_silence` (float, default: `0.2`): Start transcription after specified seconds of silence.
+
+- `--beam_size` (int, default: `5`): Beam size for the main transcription model.
+
+- `--beam_size_realtime` (int, default: `3`): Beam size for the real-time transcription model.
+
+- `--initial_prompt` (str): Initial prompt for the transcription model to guide its output format and style.
+
+- `--end_of_sentence_detection_pause` (float, default: `0.45`): Duration of pause (in seconds) to consider as the end of a sentence.
+
+- `--unknown_sentence_detection_pause` (float, default: `0.7`): Duration of pause (in seconds) to consider as an unknown or incomplete sentence.
+
+- `--mid_sentence_detection_pause` (float, default: `2.0`): Duration of pause (in seconds) to consider as a mid-sentence break.
+
+- `--control_port` (int, default: `8011`): Port for the control WebSocket connection.
+
+- `--data_port` (int, default: `8012`): Port for the data WebSocket connection.
+
+**Example:**
+
+```bash
+stt-server --model small.en --language en --control_port 9001 --data_port 9002
+```
+
+## Client Usage
+
+### Starting the Client
+
+Start the client using:
+
+```bash
+stt [OPTIONS]
+```
+
+The client connects to the STT server's control and data WebSocket URLs to facilitate real-time speech transcription and control.
+
+### Client Parameters
+
+- `--control-url` (default: `ws://localhost:8011`): The WebSocket URL for server control commands.
+
+- `--data-url` (default: `ws://localhost:8012`): The WebSocket URL for sending audio data and receiving transcription updates.
+
+- `--debug`: Enable debug mode, which prints detailed logs to `stderr`.
+
+- `--nort` or `--norealtime`: Disable real-time output of transcription results.
+
+- `--set-param PARAM VALUE`: Set a recorder parameter (e.g., `silero_sensitivity`, `beam_size`). This option can be used multiple times.
+
+- `--get-param PARAM`: Retrieve the value of a specific recorder parameter. Can be used multiple times.
+
+- `--call-method METHOD [ARGS]`: Call a method on the recorder with optional arguments. Can be used multiple times.
+
+**Example:**
+
+```bash
+stt --set-param silero_sensitivity 0.1 --get-param silero_sensitivity
+```
+
+## WebSocket Interface
+
+The server uses two WebSocket connections:
+
+1. **Control WebSocket**: Used to send and receive control commands, such as setting parameters or invoking recorder methods.
+
+2. **Data WebSocket**: Used to send audio data for transcription and receive real-time transcription updates.
+
+## Examples
+
+### Starting the Server and Client
+
+1. **Start the Server with Default Settings:**
+
+   ```bash
+   stt-server
+   ```
+
+2. **Start the Client with Default Settings:**
+
+   ```bash
+   stt
+   ```
+
+### Setting Parameters
+
+Set the Silero sensitivity to `0.1`:
+
+```bash
+stt --set-param silero_sensitivity 0.1
+```
+
+### Retrieving Parameters
+
+Get the current Silero sensitivity value:
+
+```bash
+stt --get-param silero_sensitivity
+```
+
+### Calling Server Methods
+
+Call the `set_microphone` method on the recorder:
+
+```bash
+stt --call-method set_microphone
+```
+
+### Running in Debug Mode
+
+Enable debug mode for detailed logging:
+
+```bash
+stt --debug
+```
+
+## Contributing
+
+Contributions are welcome! Please open an issue or submit a pull request on GitHub.
+
+## License
+
+This project is licensed under the MIT License. See the [LICENSE](../LICENSE) file for details.
+
+# Additional Information
+
+The server and client scripts are designed to work seamlessly together, enabling efficient real-time speech transcription with minimal latency. The flexibility in configuration allows users to tailor the system to specific needs, such as adjusting sensitivity levels for different environments or selecting appropriate STT models based on resource availability.
+
+**Note:** Ensure that the server is running before starting the client. The client includes functionality to check if the server is running and can prompt the user to start it if necessary.
+
+# Troubleshooting
+
+- **Server Not Starting:** If the server fails to start, check that all dependencies are installed and that the specified ports are not in use.
+
+- **Audio Issues:** Ensure that the correct audio input device index is specified if using a device other than the default.
+
+- **WebSocket Connection Errors:** Verify that the control and data URLs are correct and that the server is listening on those ports.
+
+# Contact
+
+For questions or support, please open an issue on the [GitHub repository](https://github.com/KoljaB/RealtimeSTT/issues).
+
+# Acknowledgments
+
+Special thanks to the contributors of the RealtimeSTT library and the open-source community for their continuous support.
+
+---
+
+**Disclaimer:** This software is provided "as is", without warranty of any kind, express or implied. Use it at your own risk.

+ 373 - 209
server/stt_cli_client.py

@@ -1,167 +1,135 @@
-import os
-import sys
-import pyaudio
-import numpy as np
-from scipy import signal
-import logging
-os.environ['ALSA_LOG_LEVEL'] = 'none'
+"""
+Speech-to-Text (STT) Client CLI for WebSocket Server Interaction
 
-CHUNK = 1024
-FORMAT = pyaudio.paInt16
-CHANNELS = 1
-RATE = 44100  # Default fallback rate
-input_device_index = None
-audio_interface = None
-stream = None
-device_sample_rate = None
-chunk_size = CHUNK
-
-def get_highest_sample_rate(audio_interface, device_index):
-    """Get the highest supported sample rate for the specified device."""
-    try:
-        device_info = audio_interface.get_device_info_by_index(device_index)
-        max_rate = int(device_info['defaultSampleRate'])
+This command-line interface (CLI) allows interaction with the Speech-to-Text (STT) WebSocket server. It connects to the server via control and data WebSocket URLs to facilitate real-time speech transcription, control the server, and manage various parameters related to the STT process.
 
-        if 'supportedSampleRates' in device_info:
-            supported_rates = [int(rate) for rate in device_info['supportedSampleRates']]
-            if supported_rates:
-                max_rate = max(supported_rates)
+The client can be used to start recording audio, set or retrieve STT parameters, and interact with the server using commands. Additionally, the CLI can disable real-time updates or run in debug mode for detailed output.
 
-        return max_rate
-    except Exception as e:
-        logging.warning(f"Failed to get highest sample rate: {e}")
-        return 48000  # Fallback to a common high sample rate
+### Features:
+- Connects to STT WebSocket server for real-time transcription and control.
+- Supports setting and retrieving parameters via the command line.
+- Allows calling server methods (e.g., start/stop recording).
+- Option to disable real-time updates during transcription.
+- Debug mode available for verbose logging.
 
-def initialize_audio_stream(audio_interface, device_index, sample_rate, chunk_size):
-    """Initialize the audio stream with error handling."""
-    try:
-        stream = audio_interface.open(
-            format=pyaudio.paInt16,
-            channels=CHANNELS,
-            rate=sample_rate,
-            input=True,
-            frames_per_buffer=chunk_size,
-            input_device_index=device_index,
-        )
-        return stream
-    except Exception as e:
-        logging.error(f"Error initializing audio stream: {e}")
-        raise
+### Starting the Client:
+You can start the client using the command `stt` and optionally pass configuration options or commands for interacting with the server.
 
-def preprocess_audio(chunk, original_sample_rate, target_sample_rate):
-    """Preprocess audio chunk similar to feed_audio method."""
-    if isinstance(chunk, np.ndarray):
-        if chunk.ndim == 2:  # Stereo to mono conversion
-            chunk = np.mean(chunk, axis=1)
+```bash
+stt [OPTIONS]
+```
 
-        # Resample if needed
-        if original_sample_rate != target_sample_rate:
-            num_samples = int(len(chunk) * target_sample_rate / original_sample_rate)
-            chunk = signal.resample(chunk, num_samples)
+### Available Parameters:
+- `--control-url` (default: "ws://localhost:8011"): The WebSocket URL for server control commands.
+  
+- `--data-url` (default: "ws://localhost:8012"): The WebSocket URL for sending audio data and receiving transcription updates.
 
-        chunk = chunk.astype(np.int16)
-    else:
-        chunk = np.frombuffer(chunk, dtype=np.int16)
+- `--debug`: Enable debug mode, which prints detailed logs to stderr.
 
-        if original_sample_rate != target_sample_rate:
-            num_samples = int(len(chunk) * target_sample_rate / original_sample_rate)
-            chunk = signal.resample(chunk, num_samples)
-            chunk = chunk.astype(np.int16)
+- `--nort` or `--norealtime`: Disable real-time output of transcription results.
 
-    return chunk.tobytes()
+- `--set-param PARAM VALUE`: Set a recorder parameter (e.g., silero_sensitivity, beam_size, etc.). This option can be used multiple times to set different parameters.
 
-def setup_audio():
-    global audio_interface, stream, device_sample_rate, input_device_index
-    try:
-        audio_interface = pyaudio.PyAudio()
-        if input_device_index is None:
-            try:
-                default_device = audio_interface.get_default_input_device_info()
-                input_device_index = default_device['index']
-            except OSError as e:
-                input_device_index = None
+- `--get-param PARAM`: Retrieve the value of a specific recorder parameter. This option can be used multiple times to retrieve different parameters.
 
-        sample_rates_to_try = [16000]  # Try 16000 Hz first
-        if input_device_index is not None:
-            highest_rate = get_highest_sample_rate(audio_interface, input_device_index)
-            if highest_rate != 16000:
-                sample_rates_to_try.append(highest_rate)
-        else:
-            sample_rates_to_try.append(48000)  # Fallback sample rate
+- `--call-method METHOD [ARGS]`: Call a method on the recorder with optional arguments. This option can be used multiple times for different methods.
 
-        for rate in sample_rates_to_try:
-            try:
-                device_sample_rate = rate
-                stream = initialize_audio_stream(audio_interface, input_device_index, device_sample_rate, chunk_size)
-                if stream is not None:
-                    logging.debug(f"Audio recording initialized successfully at {device_sample_rate} Hz, reading {chunk_size} frames at a time")
-                    return True
-            except Exception as e:
-                logging.warning(f"Failed to initialize audio stream at {device_sample_rate} Hz: {e}")
-                continue
+### Example Usage:
+1. **Start the client with default settings:**
+   ```bash
+   stt
+   ```
 
-        raise Exception("Failed to initialize audio stream with all sample rates.")
-    except Exception as e:
-        logging.exception(f"Error initializing audio recording: {e}")
-        if audio_interface:
-            audio_interface.terminate()
-        return False
-
-from .install_packages import check_and_install_packages
-
-check_and_install_packages([
-    {
-        'module_name': 'websocket',                    # Import module
-        'install_name': 'websocket-client',            # Package name for pip install (websocket-client is the correct package for websocket)
-    },
-    {
-        'module_name': 'pyaudio',                      # Import module
-        'install_name': 'pyaudio',                     # Package name for pip install
-    },
-    {
-        'module_name': 'colorama',                     # Import module
-        'attribute': 'init',                           # Attribute to check (init method from colorama)
-        'install_name': 'colorama',                    # Package name for pip install
-        'version': '',                                 # Optional version constraint
-    },
-])
+2. **Set a recorder parameter (e.g., set Silero sensitivity to 0.1):**
+   ```bash
+   stt --set-param silero_sensitivity 0.1
+   ```
 
-import websocket
-import pyaudio
-from colorama import init, Fore, Style
+3. **Retrieve the value of a recorder parameter (e.g., get the current Silero sensitivity):**
+   ```bash
+   stt --get-param silero_sensitivity
+   ```
+
+4. **Call a method on the recorder (e.g., start the microphone input):**
+   ```bash
+   stt --call-method set_microphone
+   ```
+
+5. **Run in debug mode:**
+   ```bash
+   stt --debug
+   ```
 
+### WebSocket Interface:
+- **Control WebSocket**: Used for sending control commands like setting parameters or invoking methods.
+- **Data WebSocket**: Used for sending audio data for real-time transcription and receiving transcription results.
+
+The client can be used to send audio data to the server for transcription and to control the behavior of the server remotely.
+"""
+
+import os
+import sys
+import pyaudio
+import numpy as np
+from scipy import signal
+import logging
+import websocket
 import argparse
 import json
 import threading
 import time
 import struct
 import socket
-import subprocess
 import shutil
 from urllib.parse import urlparse
+import queue 
 from queue import Queue
 
+os.environ['ALSA_LOG_LEVEL'] = 'none'
+
 # Constants
 CHUNK = 1024
 FORMAT = pyaudio.paInt16
 CHANNELS = 1
 RATE = 44100
-DEFAULT_SERVER_URL = "ws://localhost:8011"
+DEFAULT_CONTROL_URL = "ws://localhost:8011"
+DEFAULT_DATA_URL = "ws://localhost:8012"
+
+# Initialize colorama
+from colorama import init, Fore, Style
+init()
+
+# Stop websocket from spamming the log
+websocket.enableTrace(False)
 
 class STTWebSocketClient:
-    def __init__(self, server_url, debug=False, file_output=None, norealtime=False):
-        self.server_url = server_url
-        self.ws = None
+    def __init__(self, control_url, data_url, debug=False, file_output=None, norealtime=False):
+        self.control_url = control_url
+        self.data_url = data_url
+        self.control_ws = None
+        self.data_ws = None
         self.is_running = False
         self.debug = debug
         self.file_output = file_output
         self.last_text = ""
-        self.pbar = None
         self.console_width = shutil.get_terminal_size().columns
         self.recording_indicator = "🔴"
         self.norealtime = norealtime
         self.connection_established = threading.Event()
         self.message_queue = Queue()
+        self.commands = Queue()
+        self.stop_event = threading.Event()
+
+        # Audio attributes
+        self.audio_interface = None
+        self.stream = None
+        self.device_sample_rate = None
+        self.input_device_index = None
+
+        # Threads
+        self.control_ws_thread = None
+        self.data_ws_thread = None
+        self.recording_thread = None
 
     def debug_print(self, message):
         if self.debug:
@@ -172,34 +140,48 @@ class STTWebSocketClient:
             self.debug_print("Cannot start STT server. Exiting.")
             return False
 
-        websocket.enableTrace(self.debug)
         try:
-            
-            self.ws = websocket.WebSocketApp(self.server_url,
-                                             on_message=self.on_message,
-                                             on_error=self.on_error,
-                                             on_close=self.on_close,
-                                             on_open=self.on_open)
-            
-            self.ws_thread = threading.Thread(target=self.ws.run_forever)
-            self.ws_thread.daemon = True
-            self.ws_thread.start()
-
-            # Wait for the connection to be established
+            # Connect to control WebSocket
+            self.control_ws = websocket.WebSocketApp(self.control_url,
+                                                     on_message=self.on_control_message,
+                                                     on_error=self.on_error,
+                                                     on_close=self.on_close,
+                                                     on_open=self.on_control_open)
+
+            self.control_ws_thread = threading.Thread(target=self.control_ws.run_forever)
+            self.control_ws_thread.daemon = False  # Set to False to ensure proper shutdown
+            self.control_ws_thread.start()
+
+            # Connect to data WebSocket
+            self.data_ws = websocket.WebSocketApp(self.data_url,
+                                                  on_message=self.on_data_message,
+                                                  on_error=self.on_error,
+                                                  on_close=self.on_close,
+                                                  on_open=self.on_data_open)
+
+            self.data_ws_thread = threading.Thread(target=self.data_ws.run_forever)
+            self.data_ws_thread.daemon = False  # Set to False to ensure proper shutdown
+            self.data_ws_thread.start()
+
+            # Wait for the connections to be established
             if not self.connection_established.wait(timeout=10):
                 self.debug_print("Timeout while connecting to the server.")
                 return False
-            
-            self.debug_print("WebSocket connection established successfully.")
+
+            self.debug_print("WebSocket connections established successfully.")
             return True
         except Exception as e:
             self.debug_print(f"Error while connecting to the server: {e}")
             return False
 
-    def on_open(self, ws):
-        self.debug_print("WebSocket connection opened.")
-        self.is_running = True
+    def on_control_open(self, ws):
+        self.debug_print("Control WebSocket connection opened.")
         self.connection_established.set()
+        self.start_command_processor()
+
+    def on_data_open(self, ws):
+        self.debug_print("Data WebSocket connection opened.")
+        self.is_running = True
         self.start_recording()
 
     def on_error(self, ws, error):
@@ -208,9 +190,10 @@ class STTWebSocketClient:
     def on_close(self, ws, close_status_code, close_msg):
         self.debug_print(f"WebSocket connection closed: {close_status_code} - {close_msg}")
         self.is_running = False
+        self.stop_event.set()
 
     def is_server_running(self):
-        parsed_url = urlparse(self.server_url)
+        parsed_url = urlparse(self.control_url)
         host = parsed_url.hostname
         port = parsed_url.port or 80
         with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
@@ -227,7 +210,6 @@ class STTWebSocketClient:
             subprocess.Popen(['stt-server'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
         print("STT server start command issued. Please wait a moment for it to initialize.", file=sys.stderr)
 
-
     def ensure_server_running(self):
         if not self.is_server_running():
             print("STT server is not running.", file=sys.stderr)
@@ -247,15 +229,38 @@ class STTWebSocketClient:
                 return False
         return True
 
-    def on_message(self, ws, message):
+    # Handle control messages like set_parameter, get_parameter, etc.
+    def on_control_message(self, ws, message):
+        try:
+            data = json.loads(message)
+            # Handle server response with status
+            if 'status' in data:
+                if data['status'] == 'success':
+                    # print(f"Server Response: {data.get('message', '')}")
+                    if 'parameter' in data and 'value' in data:
+                        print(f"Parameter {data['parameter']} = {data['value']}")
+                elif data['status'] == 'error':
+                    print(f"Server Error: {data.get('message', '')}")
+            else:
+                self.debug_print(f"Unknown control message format: {data}")
+        except json.JSONDecodeError:
+            self.debug_print(f"Received non-JSON control message: {message}")
+        except Exception as e:
+            self.debug_print(f"Error processing control message: {e}")
+
+    # Handle real-time transcription and full sentence updates
+    def on_data_message(self, ws, message):
         try:
             data = json.loads(message)
-            if data['type'] == 'realtime':
+            # Handle real-time transcription updates
+            if data.get('type') == 'realtime':
                 if data['text'] != self.last_text:
                     self.last_text = data['text']
                     if not self.norealtime:
-                        self.update_progress_bar(self.last_text) 
-            elif data['type'] == 'fullSentence':
+                        self.update_progress_bar(self.last_text)
+
+            # Handle full sentences
+            elif data.get('type') == 'fullSentence':
                 if self.file_output:
                     sys.stderr.write('\r\033[K')
                     sys.stderr.write(data['text'])
@@ -265,96 +270,214 @@ class STTWebSocketClient:
                     self.file_output.flush()  # Ensure it's written immediately
                 else:
                     self.finish_progress_bar()
-                    print(f"{data['text']}")        
+                    print(f"{data['text']}")
                 self.stop()
-                
+
+            else:
+                self.debug_print(f"Unknown data message format: {data}")
+
         except json.JSONDecodeError:
-            self.debug_print(f"\nReceived non-JSON message: {message}")
+            self.debug_print(f"Received non-JSON data message: {message}")
+        except Exception as e:
+            self.debug_print(f"Error processing data message: {e}")
 
     def show_initial_indicator(self):
         if self.norealtime:
             return
-
         initial_text = f"{self.recording_indicator}\b\b"
         sys.stderr.write(initial_text)
         sys.stderr.flush()
 
     def update_progress_bar(self, text):
-        # Reserve some space for the progress bar decorations
-        available_width = self.console_width - 5
-        
-        # Clear the current line
-        sys.stderr.write('\r\033[K')  # Move to the beginning of the line and clear it
-
-        # Get the last 'available_width' characters, but don't cut words
-        words = text.split()
-        last_chars = ""
-        for word in reversed(words):
-            if len(last_chars) + len(word) + 1 > available_width:
-                break
-            last_chars = word + " " + last_chars
-
-        last_chars = last_chars.strip()
-
-        # Color the text yellow and add recording indicator
-        colored_text = f"{Fore.YELLOW}{last_chars}{Style.RESET_ALL}{self.recording_indicator}\b\b"
-
-        sys.stderr.write(colored_text)
-        sys.stderr.flush()
+        try:
+            available_width = self.console_width - 5  # Adjust for progress bar decorations
+            sys.stderr.write('\r\033[K')  # Clear the current line
+            words = text.split()
+            last_chars = ""
+            for word in reversed(words):
+                if len(last_chars) + len(word) + 1 > available_width:
+                    break
+                last_chars = word + " " + last_chars
+            last_chars = last_chars.strip()
+            colored_text = f"{Fore.YELLOW}{last_chars}{Style.RESET_ALL}{self.recording_indicator}\b\b"
+            sys.stderr.write(colored_text)
+            sys.stderr.flush()
+        except Exception as e:
+            self.debug_print(f"Error updating progress bar: {e}")
 
     def finish_progress_bar(self):
-        # Clear the current line
-        sys.stderr.write('\r\033[K')
-        sys.stderr.flush()
+        try:
+            sys.stderr.write('\r\033[K')
+            sys.stderr.flush()
+        except Exception as e:
+            self.debug_print(f"Error finishing progress bar: {e}")
 
     def stop(self):
         self.finish_progress_bar()
         self.is_running = False
-        if self.ws:
-            self.ws.close()
-        #if hasattr(self, 'ws_thread'):
-        #    self.ws_thread.join(timeout=2)
+        self.stop_event.set()
+        if self.control_ws:
+            self.control_ws.close()
+        if self.data_ws:
+            self.data_ws.close()
+
+        # Join threads to ensure they finish before exiting
+        if self.control_ws_thread:
+            self.control_ws_thread.join()
+        if self.data_ws_thread:
+            self.data_ws_thread.join()
+        if self.recording_thread:
+            self.recording_thread.join()
+
+        # Clean up audio resources
+        if self.stream:
+            self.stream.stop_stream()
+            self.stream.close()
+        if self.audio_interface:
+            self.audio_interface.terminate()
 
     def start_recording(self):
-        threading.Thread(target=self.record_and_send_audio).start()
+        self.recording_thread = threading.Thread(target=self.record_and_send_audio)
+        self.recording_thread.daemon = False  # Set to False to ensure proper shutdown
+        self.recording_thread.start()
 
     def record_and_send_audio(self):
-        if not setup_audio():
-            raise Exception("Failed to set up audio recording.")
+        try:
+            if not self.setup_audio():
+                raise Exception("Failed to set up audio recording.")
+
+            self.debug_print("Recording and sending audio...")
+            self.show_initial_indicator()
+
+            while self.is_running:
+                try:
+                    audio_data = self.stream.read(CHUNK)
+                    metadata = {"sampleRate": self.device_sample_rate}
+                    metadata_json = json.dumps(metadata)
+                    metadata_length = len(metadata_json)
+                    message = struct.pack('<I', metadata_length) + metadata_json.encode('utf-8') + audio_data
+                    self.data_ws.send(message, opcode=websocket.ABNF.OPCODE_BINARY)
+                except Exception as e:
+                    self.debug_print(f"Error sending audio data: {e}")
+                    break  # Exit the recording loop
 
-        self.debug_print("Recording and sending audio...")
-        self.show_initial_indicator()
+        except Exception as e:
+            self.debug_print(f"Error in record_and_send_audio: {e}")
+        finally:
+            self.cleanup_audio()
 
-        while self.is_running:
+    def setup_audio(self):
+        try:
+            self.audio_interface = pyaudio.PyAudio()
+            self.input_device_index = None
             try:
-                audio_data = stream.read(CHUNK)
+                default_device = self.audio_interface.get_default_input_device_info()
+                self.input_device_index = default_device['index']
+            except OSError as e:
+                self.debug_print(f"No default input device found: {e}")
+                return False
 
-                # Prepare metadata
-                metadata = {
-                    "sampleRate": device_sample_rate
-                }
-                metadata_json = json.dumps(metadata)
-                metadata_length = len(metadata_json)
+            self.device_sample_rate = 16000  # Try 16000 Hz first
 
-                # Construct the message
-                message = struct.pack('<I', metadata_length) + metadata_json.encode('utf-8') + audio_data
+            try:
+                self.stream = self.audio_interface.open(
+                    format=FORMAT,
+                    channels=CHANNELS,
+                    rate=self.device_sample_rate,
+                    input=True,
+                    frames_per_buffer=CHUNK,
+                    input_device_index=self.input_device_index,
+                )
+                self.debug_print(f"Audio recording initialized successfully at {self.device_sample_rate} Hz")
+                return True
+            except Exception as e:
+                self.debug_print(f"Failed to initialize audio stream at {self.device_sample_rate} Hz: {e}")
+                return False
 
-                self.ws.send(message, opcode=websocket.ABNF.OPCODE_BINARY)
+        except Exception as e:
+            self.debug_print(f"Error initializing audio recording: {e}")
+            if self.audio_interface:
+                self.audio_interface.terminate()
+            return False
+
+    def cleanup_audio(self):
+        try:
+            if self.stream:
+                self.stream.stop_stream()
+                self.stream.close()
+                self.stream = None
+            if self.audio_interface:
+                self.audio_interface.terminate()
+                self.audio_interface = None
+        except Exception as e:
+            self.debug_print(f"Error cleaning up audio resources: {e}")
+
+    def set_parameter(self, parameter, value):
+        command = {
+            "command": "set_parameter",
+            "parameter": parameter,
+            "value": value
+        }
+        self.control_ws.send(json.dumps(command))
+
+    def get_parameter(self, parameter):
+        command = {
+            "command": "get_parameter",
+            "parameter": parameter
+        }
+        self.control_ws.send(json.dumps(command))
+
+    def call_method(self, method, args=None, kwargs=None):
+        command = {
+            "command": "call_method",
+            "method": method,
+            "args": args or [],
+            "kwargs": kwargs or {}
+        }
+        self.control_ws.send(json.dumps(command))
+
+    def start_command_processor(self):
+        self.command_thread = threading.Thread(target=self.command_processor)
+        self.command_thread.daemon = False  # Ensure it is not a daemon thread
+        self.command_thread.start()
+
+    def command_processor(self):
+        # print(f"Starting command processor")
+        self.debug_print(f"Starting command processor")
+        #while self.is_running and not self.stop_event.is_set():
+        while not self.stop_event.is_set():
+            try:
+                command = self.commands.get(timeout=0.1)
+                if command['type'] == 'set_parameter':
+                    self.set_parameter(command['parameter'], command['value'])
+                elif command['type'] == 'get_parameter':
+                    self.get_parameter(command['parameter'])
+                elif command['type'] == 'call_method':
+                    self.call_method(command['method'], command.get('args'), command.get('kwargs'))
+            except queue.Empty:  # Use queue.Empty instead of Queue.Empty
+                continue  # Queue was empty, just loop again
             except Exception as e:
-                self.debug_print(f"Error sending audio data: {e}")
-                break
+                self.debug_print(f"Error in command processor: {e}")
+            # finally:
+        #print(f"Leaving command processor")
+        self.debug_print(f"Leaving command processor")
 
-        self.debug_print("Stopped recording.")
-        stream.stop_stream()
-        stream.close()
-        audio_interface.terminate()
 
+    def add_command(self, command):
+        self.commands.put(command)
 
 def main():
     parser = argparse.ArgumentParser(description="STT Client")
-    parser.add_argument("--server", default=DEFAULT_SERVER_URL, help="STT WebSocket server URL")
+    parser.add_argument("--control-url", default=DEFAULT_CONTROL_URL, help="STT Control WebSocket URL")
+    parser.add_argument("--data-url", default=DEFAULT_DATA_URL, help="STT Data WebSocket URL")
     parser.add_argument("--debug", action="store_true", help="Enable debug mode")
-    parser.add_argument("-nort", "--norealtime", action="store_true", help="Disable real-time output")    
+    parser.add_argument("-nort", "--norealtime", action="store_true", help="Disable real-time output")
+    parser.add_argument("--set-param", nargs=2, metavar=('PARAM', 'VALUE'), action='append',
+                        help="Set a recorder parameter. Can be used multiple times.")
+    parser.add_argument("--call-method", nargs='+', metavar='METHOD', action='append',
+                        help="Call a recorder method with optional arguments.")
+    parser.add_argument("--get-param", nargs=1, metavar='PARAM', action='append',
+                        help="Get the value of a recorder parameter. Can be used multiple times.")
     args = parser.parse_args()
 
     # Check if output is being redirected
@@ -362,22 +485,63 @@ def main():
         file_output = sys.stdout
     else:
         file_output = None
-    
-    client = STTWebSocketClient(args.server, args.debug, file_output, args.norealtime)
-  
+
+    client = STTWebSocketClient(args.control_url, args.data_url, args.debug, file_output, args.norealtime)
+
     def signal_handler(sig, frame):
-        # print("\nInterrupted by user, shutting down...")
         client.stop()
         sys.exit(0)
 
     import signal
     signal.signal(signal.SIGINT, signal_handler)
-    
+
     try:
         if client.connect():
-            # print("Connection established. Recording... (Press Ctrl+C to stop)", file=sys.stderr)
+            # Process command-line parameters
+            if args.set_param:
+                for param, value in args.set_param:
+                    try:
+                        # Attempt to parse the value to the appropriate type
+                        if '.' in value:
+                            value = float(value)
+                        else:
+                            value = int(value)
+                    except ValueError:
+                        pass  # Keep as string if not a number
+
+                    client.add_command({
+                        'type': 'set_parameter',
+                        'parameter': param,
+                        'value': value
+                    })
+
+            if args.get_param:
+                for param_list in args.get_param:
+                    param = param_list[0]
+                    client.add_command({
+                        'type': 'get_parameter',
+                        'parameter': param
+                    })
+
+            if args.call_method:
+                for method_call in args.call_method:
+                    method = method_call[0]
+                    args_list = method_call[1:] if len(method_call) > 1 else []
+                    client.add_command({
+                        'type': 'call_method',
+                        'method': method,
+                        'args': args_list
+                    })
+
+            # If command-line parameters were used (like --get-param), wait for them to be processed
+            if args.set_param or args.get_param or args.call_method:
+                while not client.commands.empty():
+                    time.sleep(0.1)
+
+            # Start recording directly if no command-line params were provided
             while client.is_running:
                 time.sleep(0.1)
+
         else:
             print("Failed to connect to the server.", file=sys.stderr)
     except Exception as e:

+ 286 - 60
server/stt_server.py

@@ -1,3 +1,77 @@
+"""
+Speech-to-Text (STT) Server with Real-Time Transcription and WebSocket Interface
+
+This server provides real-time speech-to-text (STT) transcription using the RealtimeSTT library. It allows clients to connect via WebSocket to send audio data and receive real-time transcription updates. The server supports configurable audio recording parameters, voice activity detection (VAD), and wake word detection. It is designed to handle continuous transcription as well as post-recording processing, enabling real-time feedback with the option to improve final transcription quality after the complete sentence is recognized.
+
+### Features:
+- Real-time transcription using pre-configured or user-defined STT models.
+- WebSocket-based communication for control and data handling.
+- Flexible recording and transcription options, including configurable pauses for sentence detection.
+- Supports Silero and WebRTC VAD for robust voice activity detection.
+
+### Starting the Server:
+You can start the server using the command-line interface (CLI) command `stt-server`, passing the desired configuration options.
+
+```bash
+stt-server [OPTIONS]
+```
+
+### Available Parameters:
+- `--model` (str, default: 'medium.en'): Path to the STT model or model size. Options: tiny, tiny.en, base, base.en, small, small.en, medium, medium.en, large-v1, large-v2, or any huggingface CTranslate2 STT model like `deepdml/faster-whisper-large-v3-turbo-ct2`.
+  
+- `--realtime_model_type` (str, default: 'tiny.en'): Model size for real-time transcription. Same options as `--model`.
+
+- `--language` (str, default: 'en'): Language code for the STT model. Leave empty for auto-detection.
+
+- `--input_device_index` (int, default: 1): Index of the audio input device to use.
+
+- `--silero_sensitivity` (float, default: 0.05): Sensitivity for Silero Voice Activity Detection (VAD). Lower values are less sensitive.
+
+- `--webrtc_sensitivity` (float, default: 3): Sensitivity for WebRTC VAD. Higher values are less sensitive.
+
+- `--min_length_of_recording` (float, default: 1.1): Minimum duration (in seconds) for a valid recording. Prevents short recordings.
+
+- `--min_gap_between_recordings` (float, default: 0): Minimum time (in seconds) between consecutive recordings.
+
+- `--enable_realtime_transcription` (flag, default: True): Enable real-time transcription of audio.
+
+- `--realtime_processing_pause` (float, default: 0.02): Time interval (in seconds) between processing audio chunks for real-time transcription. Lower values increase responsiveness.
+
+- `--silero_deactivity_detection` (flag, default: True): Use Silero model for end-of-speech detection.
+
+- `--early_transcription_on_silence` (float, default: 0.2): Start transcription after specified seconds of silence.
+
+- `--beam_size` (int, default: 5): Beam size for the main transcription model.
+
+- `--beam_size_realtime` (int, default: 3): Beam size for the real-time transcription model.
+
+- `--initial_prompt` (str, default: 'Add periods only for complete sentences...'): Initial prompt for the transcription model to guide its output format and style.
+
+- `--end_of_sentence_detection_pause` (float, default: 0.45): Duration of pause (in seconds) to consider as the end of a sentence.
+
+- `--unknown_sentence_detection_pause` (float, default: 0.7): Duration of pause (in seconds) to consider as an unknown or incomplete sentence.
+
+- `--mid_sentence_detection_pause` (float, default: 2.0): Duration of pause (in seconds) to consider as a mid-sentence break.
+
+- `--control_port` (int, default: 8011): Port for the control WebSocket connection.
+
+- `--data_port` (int, default: 8012): Port for the data WebSocket connection.
+
+### WebSocket Interface:
+The server supports two WebSocket connections:
+1. **Control WebSocket**: Used to send and receive commands, such as setting parameters or calling recorder methods.
+2. **Data WebSocket**: Used to send audio data for transcription and receive real-time transcription updates.
+
+The server will broadcast real-time transcription updates to all connected clients on the data WebSocket.
+"""
+
+
+import asyncio
+import sys
+
+if sys.platform == 'win32':
+    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
+
 from .install_packages import check_and_install_packages
 
 check_and_install_packages([
@@ -23,7 +97,6 @@ check_and_install_packages([
 
 print("Starting server, please wait...")
 
-import asyncio
 import threading
 import json
 import websockets
@@ -35,28 +108,40 @@ global_args = None
 recorder = None
 recorder_config = {}
 recorder_ready = threading.Event()
-client_websocket = None
 stop_recorder = False
 prev_text = ""
 
-
-async def send_to_client(message):
-    global client_websocket
-    if client_websocket and client_websocket.open:
-        try:
-            await client_websocket.send(message)
-        except websockets.exceptions.ConnectionClosed:
-            print("Client websocket is closed, resetting client_websocket")
-            client_websocket = None
-    else:
-        print("No client connected or connection is closed.")
-        client_websocket = None  # Ensure it resets
+# Define allowed methods and parameters for security
+allowed_methods = [
+    'set_microphone',
+    'abort',
+    'stop',
+    'clear_audio_queue',
+    'wakeup',
+    'shutdown',
+    'text',  # Allow 'text' method to initiate transcription
+]
+allowed_parameters = [
+    'silero_sensitivity',
+    'wake_word_activation_delay',
+    'post_speech_silence_duration',
+    'listen_start',
+    'recording_stop_time',
+    'recorderActive',
+    # Add other parameters as needed
+]
+
+# Queues and connections for control and data
+control_connections = set()
+data_connections = set()
+control_queue = asyncio.Queue()
+audio_queue = asyncio.Queue()
 
 def preprocess_text(text):
     # Remove leading whitespaces
     text = text.lstrip()
 
-    #  Remove starting ellipses if present
+    # Remove starting ellipses if present
     if text.startswith("..."):
         text = text[3:]
 
@@ -69,7 +154,7 @@ def preprocess_text(text):
     
     return text
 
-def text_detected(text):
+def text_detected(text, loop):
     global prev_text
 
     text = preprocess_text(text)
@@ -84,19 +169,68 @@ def text_detected(text):
 
     prev_text = text
 
-    try:
-        asyncio.new_event_loop().run_until_complete(
-            send_to_client(
-                json.dumps({
-                    'type': 'realtime',
-                    'text': text
-                })
-            )
-        )
-    except Exception as e:
-        print(f"Error in text_detected while sending to client: {e}")
+    # Put the message in the audio queue to be sent to clients
+    message = json.dumps({
+        'type': 'realtime',
+        'text': text
+    })
+    asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
     print(f"\r{text}", flush=True, end='')
 
+def on_recording_start(loop):
+    # Send a message to the client indicating recording has started
+    message = json.dumps({
+        'type': 'recording_start'
+    })
+    asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
+
+def on_recording_stop(loop):
+    # Send a message to the client indicating recording has stopped
+    message = json.dumps({
+        'type': 'recording_stop'
+    })
+    asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
+
+def on_vad_detect_start(loop):
+    message = json.dumps({
+        'type': 'vad_detect_start'
+    })
+    asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
+
+def on_wakeword_detection_start(loop):
+    # Send a message to the client when wake word detection starts
+    message = json.dumps({
+        'type': 'wakeword_detection_start'
+    })
+    asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
+
+def on_wakeword_detection_end(loop):
+    # Send a message to the client when wake word detection ends
+    message = json.dumps({
+        'type': 'wakeword_detection_end'
+    })
+    asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
+
+def on_transcription_start(loop):
+    # Send a message to the client when transcription starts
+    message = json.dumps({
+        'type': 'transcription_start'
+    })
+    asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
+
+def on_realtime_transcription_update(text, loop):
+    # Send real-time transcription updates to the client
+    text = preprocess_text(text)
+    message = json.dumps({
+        'type': 'realtime_update',
+        'text': text
+    })
+    asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
+
+def on_recorded_chunk(chunk):
+    # Process each recorded audio chunk (optional implementation)
+    pass
+
 # Define the server's arguments
 def parse_arguments():
     import argparse
@@ -157,11 +291,16 @@ def parse_arguments():
     parser.add_argument('--mid_sentence_detection_pause', type=float, default=2.0,
                         help='Duration of pause (in seconds) to consider as a mid-sentence break. Default: 2.0')
     
+    parser.add_argument('--control_port', type=int, default=8011,
+                        help='Port for the control WebSocket connection. Default: 8011')
+    
+    parser.add_argument('--data_port', type=int, default=8012,
+                        help='Port for the data WebSocket connection. Default: 8012')
+    
     return parser.parse_args()
 
-def _recorder_thread():
+def _recorder_thread(loop):
     global recorder, prev_text, stop_recorder
-    # print("Initializing RealtimeSTT...")
     print(f"Initializing RealtimeSTT server with parameters {recorder_config}")
     recorder = AudioToTextRecorder(**recorder_config)
     print("RealtimeSTT initialized")
@@ -170,17 +309,12 @@ def _recorder_thread():
     def process_text(full_sentence):
         full_sentence = preprocess_text(full_sentence)
         prev_text = ""
-        try:
-            asyncio.new_event_loop().run_until_complete(
-                send_to_client(
-                    json.dumps({
-                        'type': 'fullSentence',
-                        'text': full_sentence
-                    })
-                )
-            )
-        except Exception as e:
-            print(f"Error in _recorder_thread while sending to client: {e}")
+        message = json.dumps({
+            'type': 'fullSentence',
+            'text': full_sentence
+        })
+        # Use the passed event loop here
+        asyncio.run_coroutine_threadsafe(audio_queue.put(message), loop)
         print(f"\rSentence: {full_sentence}")
 
     try:
@@ -210,35 +344,114 @@ def decode_and_resample(
 
     return resampled_audio.astype(np.int16).tobytes()
 
-async def echo(websocket, path):
-    print("Client connected")
-    global client_websocket
-    client_websocket = websocket
-    recorder.post_speech_silence_duration = global_args.unknown_sentence_detection_pause
+async def control_handler(websocket, path):
+    print("Control client connected")
+    global recorder
+    control_connections.add(websocket)
     try:
         async for message in websocket:
             if not recorder_ready.is_set():
                 print("Recorder not ready")
                 continue
+            if isinstance(message, str):
+                # Handle text message (command)
+                try:
+                    command_data = json.loads(message)
+                    command = command_data.get("command")
+                    if command == "set_parameter":
+                        parameter = command_data.get("parameter")
+                        value = command_data.get("value")
+                        if parameter in allowed_parameters and hasattr(recorder, parameter):
+                            setattr(recorder, parameter, value)
+                            print(f"Set recorder.{parameter} to {value}")
+                            # Optionally send a response back to the client
+                            await websocket.send(json.dumps({"status": "success", "message": f"Parameter {parameter} set to {value}"}))
+                        else:
+                            print(f"Parameter {parameter} is not allowed or does not exist")
+                            await websocket.send(json.dumps({"status": "error", "message": f"Parameter {parameter} is not allowed or does not exist"}))
+                    elif command == "get_parameter":
+                        parameter = command_data.get("parameter")
+                        if parameter in allowed_parameters and hasattr(recorder, parameter):
+                            value = getattr(recorder, parameter)
+                            await websocket.send(json.dumps({"status": "success", "parameter": parameter, "value": value}))
+                        else:
+                            print(f"Parameter {parameter} is not allowed or does not exist")
+                            await websocket.send(json.dumps({"status": "error", "message": f"Parameter {parameter} is not allowed or does not exist"}))
+                    elif command == "call_method":
+                        method_name = command_data.get("method")
+                        if method_name in allowed_methods:
+                            method = getattr(recorder, method_name, None)
+                            if method and callable(method):
+                                args = command_data.get("args", [])
+                                kwargs = command_data.get("kwargs", {})
+                                method(*args, **kwargs)
+                                print(f"Called method recorder.{method_name}")
+                                await websocket.send(json.dumps({"status": "success", "message": f"Method {method_name} called"}))
+                            else:
+                                print(f"Recorder does not have method {method_name}")
+                                await websocket.send(json.dumps({"status": "error", "message": f"Recorder does not have method {method_name}"}))
+                        else:
+                            print(f"Method {method_name} is not allowed")
+                            await websocket.send(json.dumps({"status": "error", "message": f"Method {method_name} is not allowed"}))
+                    else:
+                        print(f"Unknown command: {command}")
+                        await websocket.send(json.dumps({"status": "error", "message": f"Unknown command {command}"}))
+                except json.JSONDecodeError:
+                    print("Received invalid JSON command")
+                    await websocket.send(json.dumps({"status": "error", "message": "Invalid JSON command"}))
+            else:
+                print("Received unknown message type on control connection")
+    except websockets.exceptions.ConnectionClosed as e:
+        print(f"Control client disconnected: {e}")
+    finally:
+        control_connections.remove(websocket)
 
-            metadata_length = int.from_bytes(message[:4], byteorder='little')
-            metadata_json = message[4:4+metadata_length].decode('utf-8')
-            metadata = json.loads(metadata_json)
-            sample_rate = metadata['sampleRate']
-            chunk = message[4+metadata_length:]
-            resampled_chunk = decode_and_resample(chunk, sample_rate, 16000)
-            recorder.feed_audio(resampled_chunk)
+async def data_handler(websocket, path):
+    print("Data client connected")
+    data_connections.add(websocket)
+    try:
+        while True:
+            message = await websocket.recv()
+            if isinstance(message, bytes):
+                # Handle binary message (audio data)
+                metadata_length = int.from_bytes(message[:4], byteorder='little')
+                metadata_json = message[4:4+metadata_length].decode('utf-8')
+                metadata = json.loads(metadata_json)
+                sample_rate = metadata['sampleRate']
+                chunk = message[4+metadata_length:]
+                resampled_chunk = decode_and_resample(chunk, sample_rate, 16000)
+                recorder.feed_audio(resampled_chunk)
+            else:
+                print("Received non-binary message on data connection")
     except websockets.exceptions.ConnectionClosed as e:
-        print(f"Client disconnected: {e}")
+        print(f"Data client disconnected: {e}")
     finally:
-        print("Resetting client_websocket after disconnect")
-        client_websocket = None  # Reset websocket reference
+        data_connections.remove(websocket)
+        recorder.clear_audio_queue()  # Ensure audio queue is cleared if client disconnects
+
+async def broadcast_audio_messages():
+    while True:
+        message = await audio_queue.get()
+        for conn in list(data_connections):
+            try:
+                await conn.send(message)
+            except websockets.exceptions.ConnectionClosed:
+                data_connections.remove(conn)
+
+# Helper function to create event loop bound closures for callbacks
+def make_callback(loop, callback):
+    def inner_callback(*args, **kwargs):
+        callback(*args, **kwargs, loop=loop)
+    return inner_callback
 
 async def main_async():            
     global stop_recorder, recorder_config, global_args
     args = parse_arguments()
     global_args = args
 
+    # Get the event loop here and pass it to the recorder thread
+    loop = asyncio.get_event_loop()
+
     recorder_config = {
         'model': args.model,
         'realtime_model_type': args.realtime_model_type,
@@ -259,20 +472,33 @@ async def main_async():
 
         'spinner': False,
         'use_microphone': False,
-        'on_realtime_transcription_update': text_detected,
+
+        'on_realtime_transcription_update': make_callback(loop, text_detected),
+        'on_recording_start': make_callback(loop, on_recording_start),
+        'on_recording_stop': make_callback(loop, on_recording_stop),
+        'on_vad_detect_start': make_callback(loop, on_vad_detect_start),
+        'on_wakeword_detection_start': make_callback(loop, on_wakeword_detection_start),
+        'on_wakeword_detection_end': make_callback(loop, on_wakeword_detection_end),
+        'on_transcription_start': make_callback(loop, on_transcription_start),
         'no_log_file': True,
     }
 
-    start_server = await websockets.serve(echo, "localhost", 8011)
+    control_server = await websockets.serve(control_handler, "localhost", args.control_port)
+    data_server = await websockets.serve(data_handler, "localhost", args.data_port)
+    print(f"Control server started on ws://localhost:{args.control_port}")
+    print(f"Data server started on ws://localhost:{args.data_port}")
+
+    # Task to broadcast audio messages
+    broadcast_task = asyncio.create_task(broadcast_audio_messages())
 
-    recorder_thread = threading.Thread(target=_recorder_thread)
+    recorder_thread = threading.Thread(target=_recorder_thread, args=(loop,))
     recorder_thread.start()
     recorder_ready.wait()
 
     print("Server started. Press Ctrl+C to stop the server.")
-    
+
     try:
-        await start_server.wait_closed()  # This will keep the server running
+        await asyncio.gather(control_server.wait_closed(), data_server.wait_closed(), broadcast_task)
     except KeyboardInterrupt:
         print("Shutting down gracefully...")
     finally: