123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- EXTENDED_LOGGING = False
- if __name__ == '__main__':
- import subprocess
- import sys
- import threading
- import time
- def install_rich():
- subprocess.check_call([sys.executable, "-m", "pip", "install", "rich"])
- try:
- import rich
- except ImportError:
- user_input = input("This demo needs the 'rich' library, which is not installed.\nDo you want to install it now? (y/n): ")
- if user_input.lower() == 'y':
- try:
- install_rich()
- import rich
- print("Successfully installed 'rich'.")
- except Exception as e:
- print(f"An error occurred while installing 'rich': {e}")
- sys.exit(1)
- else:
- print("The program requires the 'rich' library to run. Exiting...")
- sys.exit(1)
- import keyboard
- import pyperclip
- if EXTENDED_LOGGING:
- import logging
- logging.basicConfig(level=logging.DEBUG)
- from rich.console import Console
- from rich.live import Live
- from rich.text import Text
- from rich.panel import Panel
- console = Console()
- console.print("System initializing, please wait")
- import os
- from RealtimeSTT import AudioToTextRecorder # Ensure this module has stop() or close() methods
- import colorama
- colorama.init()
- # Import pyautogui
- import pyautogui
- import pyaudio
- import numpy as np
- # Initialize Rich Console and Live
- live = Live(console=console, refresh_per_second=10, screen=False)
- live.start()
- # Global variables
- full_sentences = []
- rich_text_stored = ""
- recorder = None
- displayed_text = "" # Used for tracking text that was already displayed
- end_of_sentence_detection_pause = 0.45
- unknown_sentence_detection_pause = 0.7
- mid_sentence_detection_pause = 2.0
- prev_text = ""
- # Events to signal threads to exit or reset
- exit_event = threading.Event()
- reset_event = threading.Event()
- def preprocess_text(text):
- # Remove leading whitespaces
- text = text.lstrip()
- # Remove starting ellipses if present
- if text.startswith("..."):
- text = text[3:]
- # Remove any leading whitespaces again after ellipses removal
- text = text.lstrip()
- # Uppercase the first letter
- if text:
- text = text[0].upper() + text[1:]
- return text
- def text_detected(text):
- global prev_text, displayed_text, rich_text_stored
- text = preprocess_text(text)
- sentence_end_marks = ['.', '!', '?', '。']
- if text.endswith("..."):
- recorder.post_speech_silence_duration = mid_sentence_detection_pause
- elif text and text[-1] in sentence_end_marks and prev_text and prev_text[-1] in sentence_end_marks:
- recorder.post_speech_silence_duration = end_of_sentence_detection_pause
- else:
- recorder.post_speech_silence_duration = unknown_sentence_detection_pause
- prev_text = text
- # Build Rich Text with alternating colors
- rich_text = Text()
- for i, sentence in enumerate(full_sentences):
- if i % 2 == 0:
- rich_text += Text(sentence, style="yellow") + Text(" ")
- else:
- rich_text += Text(sentence, style="cyan") + Text(" ")
- # If the current text is not a sentence-ending, display it in real-time
- if text:
- rich_text += Text(text, style="bold yellow")
- new_displayed_text = rich_text.plain
- if new_displayed_text != displayed_text:
- displayed_text = new_displayed_text
- panel = Panel(rich_text, title="[bold green]Live Transcription[/bold green]", border_style="bold green")
- live.update(panel)
- rich_text_stored = rich_text
- def process_text(text):
- global recorder, full_sentences, prev_text, displayed_text
- recorder.post_speech_silence_duration = unknown_sentence_detection_pause
- text = preprocess_text(text)
- text = text.rstrip()
- if text.endswith("..."):
- text = text[:-2]
- full_sentences.append(text)
- prev_text = ""
- text_detected("")
- # Check if reset_event is set
- if reset_event.is_set():
- # Clear buffers
- full_sentences.clear()
- displayed_text = ""
- reset_event.clear()
- console.print("[bold magenta]Transcription buffer reset.[/bold magenta]")
- return
- # Type the finalized sentence to the active window quickly if typing is enabled
- try:
- # Release modifier keys to prevent stuck keys
- for key in ['ctrl', 'shift', 'alt', 'win']:
- keyboard.release(key)
- pyautogui.keyUp(key)
- # Use clipboard to paste text
- pyperclip.copy(text + ' ')
- pyautogui.hotkey('ctrl', 'v')
- except Exception as e:
- console.print(f"[bold red]Failed to type the text: {e}[/bold red]")
- # Recorder configuration
- recorder_config = {
- 'spinner': False,
- 'model': 'Systran/faster-distil-whisper-large-v3', # distil-medium.en or large-v2 or deepdml/faster-whisper-large-v3-turbo-ct2 or ...
- 'input_device_index': 1,
- 'realtime_model_type': 'Systran/faster-distil-whisper-large-v3', # Using the same model for realtime
- 'language': 'en',
- 'silero_sensitivity': 0.05,
- 'webrtc_sensitivity': 3,
- 'post_speech_silence_duration': unknown_sentence_detection_pause,
- 'min_length_of_recording': 1.1,
- 'min_gap_between_recordings': 0,
- 'enable_realtime_transcription': True,
- 'realtime_processing_pause': 0.02,
- 'on_realtime_transcription_update': text_detected,
- # 'on_realtime_transcription_stabilized': text_detected,
- 'silero_deactivity_detection': True,
- 'early_transcription_on_silence': 0,
- 'beam_size': 5,
- 'beam_size_realtime': 5, # Matching beam_size for consistency
- 'no_log_file': True,
- 'initial_prompt': "Use ellipses for incomplete sentences like: I went to the...",
- 'device': 'cuda', # Added device configuration
- 'compute_type': 'float16' # Added compute_type configuration
- }
- if EXTENDED_LOGGING:
- recorder_config['level'] = logging.DEBUG
- recorder = AudioToTextRecorder(**recorder_config)
- initial_text = Panel(Text("Say something...", style="cyan bold"), title="[bold yellow]Waiting for Input[/bold yellow]", border_style="bold yellow")
- live.update(initial_text)
- # Print available hotkeys
- console.print("[bold green]Available Hotkeys:[/bold green]")
- console.print("[bold cyan]F1[/bold cyan]: Mute Microphone")
- console.print("[bold cyan]F2[/bold cyan]: Unmute Microphone")
- console.print("[bold cyan]F3[/bold cyan]: Start Static Recording")
- console.print("[bold cyan]F4[/bold cyan]: Stop Static Recording")
- console.print("[bold cyan]F5[/bold cyan]: Reset Transcription")
- # Global variables for static recording
- static_recording_active = False
- static_recording_thread = None
- static_audio_frames = []
- live_recording_enabled = True # Track whether live recording was enabled before static recording
- # Audio settings for static recording
- audio_settings = {
- 'FORMAT': pyaudio.paInt16, # PyAudio format
- 'CHANNELS': 1, # Mono audio
- 'RATE': 16000, # Sample rate
- 'CHUNK': 1024 # Buffer size
- }
- # Note: The maximum recommended length of static recording is about 5 minutes.
- def static_recording_worker():
- """
- Worker function to record audio statically.
- """
- global static_audio_frames, static_recording_active
- # Set up pyaudio
- p = pyaudio.PyAudio()
- # Use the same audio format as defined in audio_settings
- FORMAT = audio_settings['FORMAT']
- CHANNELS = audio_settings['CHANNELS']
- RATE = audio_settings['RATE'] # Sample rate
- CHUNK = audio_settings['CHUNK'] # Buffer size
- # Open the audio stream
- try:
- stream = p.open(format=FORMAT,
- channels=CHANNELS,
- rate=RATE,
- input=True,
- frames_per_buffer=CHUNK)
- except Exception as e:
- console.print(f"[bold red]Failed to open audio stream for static recording: {e}[/bold red]")
- static_recording_active = False
- p.terminate()
- return
- while static_recording_active and not exit_event.is_set():
- try:
- data = stream.read(CHUNK)
- static_audio_frames.append(data)
- except Exception as e:
- console.print(f"[bold red]Error during static recording: {e}[/bold red]")
- break
- # Stop and close the stream
- stream.stop_stream()
- stream.close()
- p.terminate()
- def start_static_recording():
- """
- Starts the static audio recording.
- """
- global static_recording_active, static_recording_thread, static_audio_frames, live_recording_enabled
- if static_recording_active:
- console.print("[bold yellow]Static recording is already in progress.[/bold yellow]")
- return
- # Mute the live recording microphone
- live_recording_enabled = recorder.use_microphone.value
- if live_recording_enabled:
- recorder.set_microphone(False)
- console.print("[bold yellow]Live microphone muted during static recording.[/bold yellow]")
- console.print("[bold green]Starting static recording... Press F4 or F5 to stop/reset.[/bold green]")
- static_audio_frames = []
- static_recording_active = True
- static_recording_thread = threading.Thread(target=static_recording_worker, daemon=True)
- static_recording_thread.start()
- def stop_static_recording():
- """
- Stops the static audio recording and processes the transcription.
- """
- global static_recording_active, static_recording_thread
- if not static_recording_active:
- console.print("[bold yellow]No static recording is in progress.[/bold yellow]")
- return
- console.print("[bold green]Stopping static recording...[/bold green]")
- static_recording_active = False
- if static_recording_thread is not None:
- static_recording_thread.join()
- static_recording_thread = None
- # Start a new thread to process the transcription
- processing_thread = threading.Thread(target=process_static_transcription, daemon=True)
- processing_thread.start()
- def process_static_transcription():
- global static_audio_frames, live_recording_enabled
- if exit_event.is_set():
- return
- # Process the recorded audio
- console.print("[bold green]Processing static recording...[/bold green]")
- # Convert audio data to numpy array
- audio_data = b''.join(static_audio_frames)
- audio_array = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0
- # Transcribe the audio data
- try:
- from faster_whisper import WhisperModel
- except ImportError:
- console.print("[bold red]faster_whisper is not installed. Please install it to use static transcription.[/bold red]")
- return
- # Load the model using recorder_config
- model_size = recorder_config['model']
- device = recorder_config['device']
- compute_type = recorder_config['compute_type']
- console.print("Loading transcription model... This may take a moment.")
- try:
- model = WhisperModel(model_size, device=device, compute_type=compute_type)
- except Exception as e:
- console.print(f"[bold red]Failed to load transcription model: {e}[/bold red]")
- return
- # Transcribe the audio
- try:
- segments, info = model.transcribe(audio_array, beam_size=recorder_config['beam_size'])
- transcription = ' '.join([segment.text for segment in segments]).strip()
- except Exception as e:
- console.print(f"[bold red]Error during transcription: {e}[/bold red]")
- return
- # Display the transcription
- console.print("Static Recording Transcription:")
- console.print(f"[bold cyan]{transcription}[/bold cyan]")
- # Type the transcription into the active window
- try:
- # Release modifier keys to prevent stuck keys
- for key in ['ctrl', 'shift', 'alt', 'win']:
- keyboard.release(key)
- pyautogui.keyUp(key)
- # Use clipboard to paste text
- pyperclip.copy(transcription + ' ')
- pyautogui.hotkey('ctrl', 'v')
- except Exception as e:
- console.print(f"[bold red]Failed to type the static transcription: {e}[/bold red]")
- # Unmute the live recording microphone if it was enabled before
- if live_recording_enabled and not exit_event.is_set():
- recorder.set_microphone(True)
- console.print("[bold yellow]Live microphone unmuted.[/bold yellow]")
- def reset_transcription():
- """
- Resets the transcription by flushing ongoing recordings or buffers.
- """
- global static_recording_active, static_recording_thread, static_audio_frames
- console.print("[bold magenta]Resetting transcription...[/bold magenta]")
- if static_recording_active:
- console.print("[bold magenta]Flushing static recording...[/bold magenta]")
- # Stop static recording
- static_recording_active = False
- if static_recording_thread is not None:
- static_recording_thread.join()
- static_recording_thread = None
- # Clear static audio frames
- static_audio_frames = []
- # Unmute microphone if it was muted during static recording
- if live_recording_enabled:
- recorder.set_microphone(True)
- console.print("[bold yellow]Live microphone unmuted after reset.[/bold yellow]")
- elif recorder.use_microphone.value:
- # Live transcription is active and microphone is not muted
- console.print("[bold magenta]Resetting live transcription buffer...[/bold magenta]")
- reset_event.set()
- else:
- # Microphone is muted; nothing to reset
- console.print("[bold yellow]Microphone is muted. Nothing to reset.[/bold yellow]")
- # Hotkey Callback Functions
- def mute_microphone():
- recorder.set_microphone(False)
- console.print("[bold red]Microphone muted.[/bold red]")
- def unmute_microphone():
- recorder.set_microphone(True)
- console.print("[bold green]Microphone unmuted.[/bold green]")
- # Start the transcription loop in a separate thread
- def transcription_loop():
- try:
- while not exit_event.is_set():
- recorder.text(process_text)
- except Exception as e:
- console.print(f"[bold red]Error in transcription loop: {e}[/bold red]")
- finally:
- # Do not call sys.exit() here
- pass
- # Start the transcription loop thread
- transcription_thread = threading.Thread(target=transcription_loop, daemon=True)
- transcription_thread.start()
- # Define the hotkey combinations and their corresponding functions
- keyboard.add_hotkey('F1', mute_microphone, suppress=True)
- keyboard.add_hotkey('F2', unmute_microphone, suppress=True)
- keyboard.add_hotkey('F3', start_static_recording, suppress=True)
- keyboard.add_hotkey('F4', stop_static_recording, suppress=True)
- keyboard.add_hotkey('F5', reset_transcription, suppress=True)
- # Keep the main thread running and handle graceful exit
- try:
- keyboard.wait() # Waits indefinitely, until a hotkey triggers an exit or Ctrl+C
- except KeyboardInterrupt:
- console.print("[bold yellow]KeyboardInterrupt received. Exiting...[/bold yellow]")
- finally:
- # Signal threads to exit
- exit_event.set()
- # Reset transcription if needed
- reset_transcription()
- # Stop the recorder
- try:
- if hasattr(recorder, 'stop'):
- recorder.stop()
- elif hasattr(recorder, 'close'):
- recorder.close()
- except Exception as e:
- console.print(f"[bold red]Error stopping recorder: {e}[/bold red]")
- # Allow some time for threads to finish
- time.sleep(1)
- # Wait for transcription_thread to finish
- if transcription_thread.is_alive():
- transcription_thread.join(timeout=5)
- # Stop the Live console
- live.stop()
- console.print("[bold red]Exiting gracefully...[/bold red]")
- sys.exit(0)
|