123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- IS_DEBUG = False
- import os
- import sys
- import threading
- import queue
- import time
- from collections import deque
- from difflib import SequenceMatcher
- from install_packages import check_and_install_packages
- # Check and install required packages
- check_and_install_packages([
- {'import_name': 'rich'},
- {'import_name': 'openai'},
- {'import_name': 'colorama'},
- {'import_name': 'RealtimeSTT'},
- # Add any other required packages here
- ])
- EXTENDED_LOGGING = False
- if __name__ == '__main__':
- if EXTENDED_LOGGING:
- import logging
- logging.basicConfig(level=logging.DEBUG)
- from rich.console import Console
- from rich.live import Live
- from rich.text import Text
- from rich.panel import Panel
- from rich.spinner import Spinner
- from rich.progress import Progress, SpinnerColumn, TextColumn
- console = Console()
- console.print("System initializing, please wait")
- from RealtimeSTT import AudioToTextRecorder
- from colorama import Fore, Style
- import colorama
- from openai import OpenAI
- # import ollama
- # Initialize OpenAI client for Ollama
- client = OpenAI(
- # base_url='http://127.0.0.1:11434/v1/', # ollama
- base_url='http://127.0.0.1:1234/v1/', # lm_studio
- api_key='ollama', # required but ignored
- )
- if os.name == "nt" and (3, 8) <= sys.version_info < (3, 99):
- from torchaudio._extension.utils import _init_dll_path
- _init_dll_path()
- colorama.init()
- # Initialize Rich Console and Live
- live = Live(console=console, refresh_per_second=10, screen=False)
- live.start()
- # Initialize a thread-safe queue
- text_queue = queue.Queue()
- # Variables for managing displayed text
- full_sentences = []
- rich_text_stored = ""
- recorder = None
- displayed_text = ""
- text_time_deque = deque()
- rapid_sentence_end_detection = 0.4
- end_of_sentence_detection_pause = 1.2
- unknown_sentence_detection_pause = 1.8
- mid_sentence_detection_pause = 2.4
- hard_break_even_on_background_noise = 3.0
- hard_break_even_on_background_noise_min_texts = 3
- hard_break_even_on_background_noise_min_chars = 15
- hard_break_even_on_background_noise_min_similarity = 0.99
- relisten_on_abrupt_stop = True
- abrupt_stop = False
- def clear_console():
- os.system('clear' if os.name == 'posix' else 'cls')
- prev_text = ""
- speech_finished_cache = {}
- def is_speech_finished(text):
- # Check if the result is already in the cache
- if text in speech_finished_cache:
- if IS_DEBUG:
- print(f"Cache hit for: '{text}'")
- return speech_finished_cache[text]
-
- user_prompt = (
- "Please reply with only 'c' if the following text is a complete thought (a sentence that stands on its own), "
- "or 'i' if it is not finished. Do not include any additional text in your reply. "
- "Consider a full sentence to have a clear subject, verb, and predicate or express a complete idea. "
- "Examples:\n"
- "- 'The sky is blue.' is complete (reply 'c').\n"
- "- 'When the sky' is incomplete (reply 'i').\n"
- "- 'She walked home.' is complete (reply 'c').\n"
- "- 'Because he' is incomplete (reply 'i').\n"
- f"\nText: {text}"
- )
- response = client.chat.completions.create(
- model="lmstudio-community/Meta-Llama-3.1-8B-Instruct-GGUF/Meta-Llama-3.1-8B-Instruct-Q8_0.gguf",
- messages=[{"role": "user", "content": user_prompt}],
- max_tokens=1,
- temperature=0.0, # Set temperature to 0 for deterministic output
- )
- if IS_DEBUG:
- print(f"t:'{response.choices[0].message.content.strip().lower()}'", end="", flush=True)
- reply = response.choices[0].message.content.strip().lower()
- result = reply == 'c'
- # Cache the result
- speech_finished_cache[text] = result
- return result
- def preprocess_text(text):
- # Remove leading whitespaces
- text = text.lstrip()
- # Remove starting ellipses if present
- if text.startswith("..."):
- text = text[3:]
- # Remove any leading whitespaces again after ellipses removal
- text = text.lstrip()
- # Uppercase the first letter
- if text:
- text = text[0].upper() + text[1:]
-
- return text
- def text_detected(text):
- """
- Enqueue the detected text for processing.
- """
- text_queue.put(text)
- def process_queue():
- global recorder, full_sentences, prev_text, displayed_text, rich_text_stored, text_time_deque, abrupt_stop
- # Initialize a deque to store texts with their timestamps
- while True:
- try:
- text = text_queue.get(timeout=1) # Wait for text or timeout after 1 second
- except queue.Empty:
- continue # No text to process, continue looping
- if text is None:
- # Sentinel value to indicate thread should exit
- break
- text = preprocess_text(text)
- current_time = time.time()
- sentence_end_marks = ['.', '!', '?', '。']
- if text.endswith("..."):
- if not recorder.post_speech_silence_duration == mid_sentence_detection_pause:
- recorder.post_speech_silence_duration = mid_sentence_detection_pause
- if IS_DEBUG: print(f"RT: post_speech_silence_duration: {recorder.post_speech_silence_duration}")
- elif text and text[-1] in sentence_end_marks and prev_text and prev_text[-1] in sentence_end_marks:
- if not recorder.post_speech_silence_duration == end_of_sentence_detection_pause:
- recorder.post_speech_silence_duration = end_of_sentence_detection_pause
- if IS_DEBUG: print(f"RT: post_speech_silence_duration: {recorder.post_speech_silence_duration}")
- else:
- if not recorder.post_speech_silence_duration == unknown_sentence_detection_pause:
- recorder.post_speech_silence_duration = unknown_sentence_detection_pause
- if IS_DEBUG: print(f"RT: post_speech_silence_duration: {recorder.post_speech_silence_duration}")
- prev_text = text
-
- import string
- transtext = text.translate(str.maketrans('', '', string.punctuation))
-
- if is_speech_finished(transtext):
- if not recorder.post_speech_silence_duration == rapid_sentence_end_detection:
- recorder.post_speech_silence_duration = rapid_sentence_end_detection
- if IS_DEBUG: print(f"RT: {transtext} post_speech_silence_duration: {recorder.post_speech_silence_duration}")
- # Append the new text with its timestamp
- text_time_deque.append((current_time, text))
- # Remove texts older than 1 second
- while text_time_deque and text_time_deque[0][0] < current_time - hard_break_even_on_background_noise:
- text_time_deque.popleft()
- # Check if at least 3 texts have arrived within the last full second
- if len(text_time_deque) >= hard_break_even_on_background_noise_min_texts:
- texts = [t[1] for t in text_time_deque]
- first_text = texts[0]
- last_text = texts[-1]
- # Check if at least 3 texts have arrived within the last full second
- if len(text_time_deque) >= 3:
- texts = [t[1] for t in text_time_deque]
- first_text = texts[0]
- last_text = texts[-1]
- # Compute the similarity ratio between the first and last texts
- similarity = SequenceMatcher(None, first_text, last_text).ratio()
- #print(f"Similarity: {similarity:.2f}")
- if similarity > hard_break_even_on_background_noise_min_similarity and len(first_text) > hard_break_even_on_background_noise_min_chars:
- abrupt_stop = True
- recorder.stop()
- rich_text = Text()
- for i, sentence in enumerate(full_sentences):
- if i % 2 == 0:
- rich_text += Text(sentence, style="yellow") + Text(" ")
- else:
- rich_text += Text(sentence, style="cyan") + Text(" ")
-
- if text:
- rich_text += Text(text, style="bold yellow")
- new_displayed_text = rich_text.plain
- if new_displayed_text != displayed_text:
- displayed_text = new_displayed_text
- panel = Panel(rich_text, title="[bold green]Live Transcription[/bold green]", border_style="bold green")
- live.update(panel)
- rich_text_stored = rich_text
- # Mark the task as done
- text_queue.task_done()
- def process_text(text):
- global recorder, full_sentences, prev_text, abrupt_stop
- if IS_DEBUG: print(f"SENTENCE: post_speech_silence_duration: {recorder.post_speech_silence_duration}")
- recorder.post_speech_silence_duration = unknown_sentence_detection_pause
- text = preprocess_text(text)
- text = text.rstrip()
- text_time_deque.clear()
- if text.endswith("..."):
- text = text[:-2]
-
- full_sentences.append(text)
- prev_text = ""
- text_detected("")
- if abrupt_stop:
- abrupt_stop = False
- if relisten_on_abrupt_stop:
- recorder.listen()
- recorder.start()
- if hasattr(recorder, "last_words_buffer"):
- recorder.frames.extend(list(recorder.last_words_buffer))
- # Recorder configuration
- recorder_config = {
- 'spinner': False,
- 'model': 'medium.en',
- #'input_device_index': 1, # mic
- #'input_device_index': 2, # stereomix
- 'realtime_model_type': 'tiny.en',
- 'language': 'en',
- #'silero_sensitivity': 0.05,
- 'silero_sensitivity': 0.4,
- 'webrtc_sensitivity': 3,
- 'post_speech_silence_duration': unknown_sentence_detection_pause,
- 'min_length_of_recording': 1.1,
- 'min_gap_between_recordings': 0,
- 'enable_realtime_transcription': True,
- 'realtime_processing_pause': 0.05,
- 'on_realtime_transcription_update': text_detected,
- 'silero_deactivity_detection': False,
- 'early_transcription_on_silence': 0,
- 'beam_size': 5,
- 'beam_size_realtime': 1,
- 'no_log_file': True,
- 'initial_prompt': (
- "End incomplete sentences with ellipses.\n"
- "Examples:\n"
- "Complete: The sky is blue.\n"
- "Incomplete: When the sky...\n"
- "Complete: She walked home.\n"
- "Incomplete: Because he...\n"
- )
- #'initial_prompt': "Use ellipses for incomplete sentences like: I went to the..."
- }
- if EXTENDED_LOGGING:
- recorder_config['level'] = logging.DEBUG
- recorder = AudioToTextRecorder(**recorder_config)
-
- initial_text = Panel(Text("Say something...", style="cyan bold"), title="[bold yellow]Waiting for Input[/bold yellow]", border_style="bold yellow")
- live.update(initial_text)
- # Start the worker thread
- worker_thread = threading.Thread(target=process_queue, daemon=True)
- worker_thread.start()
- try:
- while True:
- recorder.text(process_text)
- except KeyboardInterrupt:
- # Send sentinel value to worker thread to exit
- text_queue.put(None)
- worker_thread.join()
- live.stop()
- console.print("[bold red]Transcription stopped by user. Exiting...[/bold red]")
- exit(0)
|