realtimestt_test_stereomix.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. import os
  2. import sys
  3. import threading
  4. import time
  5. import pyaudio
  6. from rich.console import Console
  7. from rich.live import Live
  8. from rich.text import Text
  9. from rich.panel import Panel
  10. from rich.spinner import Spinner
  11. from rich.progress import Progress, SpinnerColumn, TextColumn
  12. from colorama import Fore, Style, init as colorama_init
  13. from RealtimeSTT import AudioToTextRecorder
  14. # Configuration Constants
  15. LOOPBACK_DEVICE_NAME = "stereomix"
  16. LOOPBACK_DEVICE_HOST_API = 0
  17. BUFFER_SIZE = 512
  18. AUDIO_FORMAT = pyaudio.paInt16
  19. CHANNELS = 1
  20. RATE = 16000
  21. EXTENDED_LOGGING = False
  22. def main():
  23. if EXTENDED_LOGGING:
  24. import logging
  25. logging.basicConfig(level=logging.DEBUG)
  26. console = Console()
  27. console.print("System initializing, please wait")
  28. colorama_init()
  29. # Initialize Rich Console and Live
  30. live = Live(console=console, refresh_per_second=10, screen=False)
  31. live.start()
  32. full_sentences = []
  33. rich_text_stored = ""
  34. recorder = None
  35. displayed_text = "" # Used for tracking text that was already displayed
  36. end_of_sentence_detection_pause = 0.2
  37. unknown_sentence_detection_pause = 0.5
  38. mid_sentence_detection_pause = 1
  39. prev_text = ""
  40. def clear_console():
  41. os.system('clear' if os.name == 'posix' else 'cls')
  42. def preprocess_text(text):
  43. # Remove leading whitespaces
  44. text = text.lstrip()
  45. # Remove starting ellipses if present
  46. if text.startswith("..."):
  47. text = text[3:]
  48. # Remove any leading whitespaces again after ellipses removal
  49. text = text.lstrip()
  50. # Uppercase the first letter
  51. if text:
  52. text = text[0].upper() + text[1:]
  53. return text
  54. def text_detected(text):
  55. nonlocal prev_text, displayed_text, rich_text_stored
  56. text = preprocess_text(text)
  57. sentence_end_marks = ['.', '!', '?', '。']
  58. midsentence_marks = ['…', '-', '(']
  59. if text.endswith("...") or text and text[-1] in midsentence_marks:
  60. recorder.post_speech_silence_duration = mid_sentence_detection_pause
  61. elif text and text[-1] in sentence_end_marks and prev_text and prev_text[-1] in sentence_end_marks:
  62. recorder.post_speech_silence_duration = end_of_sentence_detection_pause
  63. else:
  64. recorder.post_speech_silence_duration = unknown_sentence_detection_pause
  65. prev_text = text
  66. # Build Rich Text with alternating colors
  67. rich_text = Text()
  68. for i, sentence in enumerate(full_sentences):
  69. if i % 2 == 0:
  70. rich_text += Text(sentence, style="yellow") + Text(" ")
  71. else:
  72. rich_text += Text(sentence, style="cyan") + Text(" ")
  73. # If the current text is not a sentence-ending, display it in real-time
  74. if text:
  75. rich_text += Text(text, style="bold yellow")
  76. new_displayed_text = rich_text.plain
  77. if new_displayed_text != displayed_text:
  78. displayed_text = new_displayed_text
  79. panel = Panel(rich_text, title="[bold green]Live Transcription[/bold green]", border_style="bold green")
  80. live.update(panel)
  81. rich_text_stored = rich_text
  82. def process_text(text):
  83. nonlocal recorder, full_sentences, prev_text
  84. recorder.post_speech_silence_duration = unknown_sentence_detection_pause
  85. text = preprocess_text(text)
  86. text = text.rstrip()
  87. if text.endswith("..."):
  88. text = text[:-3] # Remove ellipsis
  89. full_sentences.append(text)
  90. prev_text = ""
  91. text_detected("")
  92. # Recorder configuration
  93. recorder_config = {
  94. 'spinner': False,
  95. 'use_microphone': False,
  96. 'model': 'large-v2',
  97. 'input_device_index': None, # To be set after finding the device
  98. 'realtime_model_type': 'tiny.en',
  99. 'language': 'en',
  100. 'silero_sensitivity': 0.05,
  101. 'webrtc_sensitivity': 3,
  102. 'post_speech_silence_duration': unknown_sentence_detection_pause,
  103. 'min_length_of_recording': 2.0,
  104. 'min_gap_between_recordings': 0,
  105. 'enable_realtime_transcription': True,
  106. 'realtime_processing_pause': 0.01,
  107. 'on_realtime_transcription_update': text_detected,
  108. 'silero_deactivity_detection': False,
  109. 'early_transcription_on_silence': 0,
  110. 'beam_size': 5,
  111. 'beam_size_realtime': 1,
  112. 'no_log_file': True,
  113. 'initial_prompt': "Use ellipses for incomplete sentences like: I went to the..."
  114. }
  115. if EXTENDED_LOGGING:
  116. recorder_config['level'] = logging.DEBUG
  117. # Initialize PyAudio
  118. audio = pyaudio.PyAudio()
  119. def find_stereo_mix_index():
  120. nonlocal audio
  121. devices_info = ""
  122. for i in range(audio.get_device_count()):
  123. dev = audio.get_device_info_by_index(i)
  124. devices_info += f"{dev['index']}: {dev['name']} (hostApi: {dev['hostApi']})\n"
  125. if (LOOPBACK_DEVICE_NAME.lower() in dev['name'].lower()
  126. and dev['hostApi'] == LOOPBACK_DEVICE_HOST_API):
  127. return dev['index'], devices_info
  128. return None, devices_info
  129. device_index, devices_info = find_stereo_mix_index()
  130. if device_index is None:
  131. live.stop()
  132. console.print("[bold red]Stereo Mix device not found. Available audio devices are:\n[/bold red]")
  133. console.print(devices_info, style="red")
  134. audio.terminate()
  135. sys.exit(1)
  136. else:
  137. recorder_config['input_device_index'] = device_index
  138. console.print(f"Using audio device index {device_index} for Stereo Mix.", style="green")
  139. # Initialize the recorder
  140. recorder = AudioToTextRecorder(**recorder_config)
  141. # Initialize Live Display with waiting message
  142. initial_text = Panel(Text("Say something...", style="cyan bold"), title="[bold yellow]Waiting for Input[/bold yellow]", border_style="bold yellow")
  143. live.update(initial_text)
  144. # Define the recording thread
  145. def recording_thread():
  146. nonlocal recorder
  147. stream = audio.open(format=AUDIO_FORMAT,
  148. channels=CHANNELS,
  149. rate=RATE,
  150. input=True,
  151. frames_per_buffer=BUFFER_SIZE,
  152. input_device_index=recorder_config['input_device_index'])
  153. try:
  154. while not stop_event.is_set():
  155. data = stream.read(BUFFER_SIZE, exception_on_overflow=False)
  156. recorder.feed_audio(data)
  157. except Exception as e:
  158. console.print(f"[bold red]Error in recording thread: {e}[/bold red]")
  159. finally:
  160. console.print(f"[bold red]Stopping stream[/bold red]")
  161. stream.stop_stream()
  162. stream.close()
  163. # Define the stop event
  164. stop_event = threading.Event()
  165. # Start the recording thread
  166. thread = threading.Thread(target=recording_thread, daemon=True)
  167. thread.start()
  168. try:
  169. while True:
  170. recorder.text(process_text)
  171. except KeyboardInterrupt:
  172. console.print("[bold red]\nTranscription stopped by user. Exiting...[/bold red]")
  173. finally:
  174. print("live stop")
  175. live.stop()
  176. print("setting stop event")
  177. stop_event.set()
  178. print("thread join")
  179. thread.join()
  180. print("recorder stop")
  181. recorder.stop()
  182. print("audio terminate")
  183. audio.terminate()
  184. print("sys exit ")
  185. sys.exit(0)
  186. if __name__ == '__main__':
  187. main()