from RealtimeTTS import TextToAudioStream, AzureEngine, ElevenlabsEngine, SystemEngine from RealtimeSTT import AudioToTextRecorder from PyQt5.QtCore import Qt, QTimer, QRect, QEvent, pyqtSignal, QThread, QPoint, QPropertyAnimation, QVariantAnimation from PyQt5.QtGui import QPalette, QColor, QPainter, QFontMetrics, QFont, QMouseEvent, QContextMenuEvent from PyQt5.QtWidgets import QApplication, QLabel, QWidget, QDesktopWidget, QMenu, QAction import os import openai import sys import time import sounddevice as sd import numpy as np import wavio import keyboard max_history_messages = 6 return_to_wakewords_after_silence = 12 start_with_wakeword = False recorder_model = "large-v2" language = "de" engine = "azure" # elevenlabs, system azure_speech_region = "germanywestcentral" openai.api_key = os.environ.get("OPENAI_API_KEY") user_font_size = 22 user_color = QColor(208, 208, 208) # gray assistant_font_size = 24 assistant_color = QColor(240, 240, 240) # white voice = "en-GB-SoniaNeural" prompt = "Respond helpfully, concisely, and when appropriate, with the subtle, polite irony of a butler." if language == "de": voice = "de-DE-MajaNeural" prompt = 'Antworte hilfreich, knapp und bei Gelegenheit mit der feinen, hΓΆflichen Ironie eines Butlers.' system_prompt_message = { 'role': 'system', 'content': prompt } def generate_response(messages): """Generate assistant's response using OpenAI.""" for chunk in openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=messages, stream=True, logit_bias={35309:-100, 36661:-100}): text_chunk = chunk["choices"][0]["delta"].get("content") if text_chunk: yield text_chunk history = [] MAX_WINDOW_WIDTH = 1600 MAX_WIDTH_ASSISTANT = 1200 MAX_WIDTH_USER = 1500 class AudioPlayer(QThread): def __init__(self, file_path): super(AudioPlayer, self).__init__() self.file_path = file_path def run(self): wav = wavio.read(self.file_path) sound = wav.data.astype(np.float32) / np.iinfo(np.int16).max sd.play(sound, wav.rate) sd.wait() class TextRetrievalThread(QThread): textRetrieved = pyqtSignal(str) def __init__(self, recorder): super().__init__() self.recorder = recorder self.active = False def run(self): while True: if self.active: text = self.recorder.text() self.recorder.wake_word_activation_delay = return_to_wakewords_after_silence self.textRetrieved.emit(text) self.active = False time.sleep(0.1) def activate(self): self.active = True class TransparentWindow(QWidget): updateUI = pyqtSignal() clearAssistantTextSignal = pyqtSignal() clearUserTextSignal = pyqtSignal() def __init__(self): super().__init__() self.setGeometry(1, 1, 1, 1) self.setWindowTitle("Transparent Window") self.setAttribute(Qt.WA_TranslucentBackground) self.setWindowFlags(Qt.FramelessWindowHint | Qt.WindowStaysOnTopHint) self.big_symbol_font = QFont('Arial', 32) self.small_symbol_font = QFont('Arial', 17) self.user_font = QFont('Arial', user_font_size) self.assistant_font = QFont('Arial', assistant_font_size) self.assistant_font.setItalic(True) self.big_symbol_text = "" self.small_symbol_text = "" self.user_text = "" self.assistant_text = "" self.displayed_user_text = "" self.displayed_assistant_text = "" self.stream = None self.text_retrieval_thread = None self.user_text_timer = QTimer(self) self.assistant_text_timer = QTimer(self) self.user_text_timer.timeout.connect(self.clear_user_text) self.assistant_text_timer.timeout.connect(self.clear_assistant_text) self.clearUserTextSignal.connect(self.init_clear_user_text) self.clearAssistantTextSignal.connect(self.init_clear_assistant_text) self.user_text_opacity = 255 self.assistant_text_opacity = 255 self.updateUI.connect(self.update_self) self.audio_player = None self.run_fade_user = False self.run_fade_assistant = False self.menu = QMenu() self.menu.setStyleSheet(""" QMenu { background-color: black; color: white; border-radius: 10px; } QMenu::item:selected { background-color: #555555; } """) self.elevenlabs_action = QAction("Elevenlabs", self) self.azure_action = QAction("Azure", self) self.system_action = QAction("System", self) self.menu.addAction(self.elevenlabs_action) self.menu.addAction(self.azure_action) self.menu.addAction(self.system_action) self.elevenlabs_action.triggered.connect(lambda: self.select_engine("elevenlabs")) self.azure_action.triggered.connect(lambda: self.select_engine("azure")) self.system_action.triggered.connect(lambda: self.select_engine("system")) def mousePressEvent(self, event: QMouseEvent): if event.button() == Qt.LeftButton: if event.pos().x() >= self.width() - 100 and event.pos().y() <= 100: self.menu.exec_(self.mapToGlobal(event.pos())) def init(self): self.select_engine("azure") self.recorder = AudioToTextRecorder( model=recorder_model, language=language, wake_words="Jarvis", spinner=True, silero_sensitivity=0.2, webrtc_sensitivity=3, on_recording_start=self.on_recording_start, on_vad_detect_start=self.on_vad_detect_start, on_wakeword_detection_start=self.on_wakeword_detection_start, on_transcription_start=self.on_transcription_start, ) if not start_with_wakeword: self.recorder.wake_word_activation_delay = return_to_wakewords_after_silence self.text_retrieval_thread = TextRetrievalThread(self.recorder) self.text_retrieval_thread.textRetrieved.connect(self.process_user_text) self.text_retrieval_thread.start() self.text_retrieval_thread.activate() keyboard.on_press_key('esc', self.on_escape) def select_engine(self, engine_name): if self.stream: if self.stream.is_playing(): self.stream.stop() self.stream = None engine = None if engine_name == "azure": engine = AzureEngine( os.environ.get("AZURE_SPEECH_KEY"), azure_speech_region, voice, rate=34, pitch=10, ) elif engine_name == "elevenlabs": engine = ElevenlabsEngine( os.environ.get("ELEVENLABS_API_KEY") ) else: engine = SystemEngine( voice="Stefan", print_installed_voices=True ) self.stream = TextToAudioStream( engine, on_character=self.on_character, on_text_stream_stop=self.on_text_stream_stop, on_text_stream_start=self.on_text_stream_start, on_audio_stream_stop=self.on_audio_stream_stop, log_characters=True ) def on_escape(self, e): if self.stream.is_playing(): self.stream.stop() def showEvent(self, event: QEvent): super().showEvent(event) if event.type() == QEvent.Show: self.set_symbols("βŒ›", "πŸš€") QTimer.singleShot(1000, self.init) def on_character(self, char): if self.stream: self.assistant_text += char self.updateUI.emit() def on_text_stream_stop(self): print("\"", end="", flush=True) if self.stream: assistant_response = self.stream.text() self.assistant_text = assistant_response history.append({'role': 'assistant', 'content': assistant_response}) def on_audio_stream_stop(self): if self.stream: self.clearAssistantTextSignal.emit() self.text_retrieval_thread.activate() def generate_answer(self): self.run_fade_assistant = False if self.assistant_text_timer.isActive(): self.assistant_text_timer.stop() history.append({'role': 'user', 'content': self.user_text}) self.remove_assistant_text() assistant_response = generate_response([system_prompt_message] + history[-max_history_messages:]) self.stream.feed(assistant_response) self.stream.play_async(minimum_sentence_length=7, buffer_threshold_seconds=3) def set_symbols(self, big_symbol, small_symbol): self.big_symbol_text = big_symbol self.small_symbol_text = small_symbol self.updateUI.emit() def on_text_stream_start(self): self.set_symbols("βŒ›", "πŸ‘„") def process_user_text(self, user_text): user_text = user_text.strip() if user_text: self.run_fade_user = False if self.user_text_timer.isActive(): self.user_text_timer.stop() self.user_text_opacity = 255 self.user_text = user_text self.clearUserTextSignal.emit() print (f"Me: \"{user_text}\"\nAI: \"", end="", flush=True) self.set_symbols("βŒ›", "🧠") QTimer.singleShot(100, self.generate_answer) def on_transcription_start(self): self.set_symbols("βŒ›", "πŸ“") def on_recording_start(self): self.set_symbols("πŸŽ™οΈ", "πŸ”΄") def on_vad_detect_start(self): if self.small_symbol_text == "πŸ’€" or self.small_symbol_text == "πŸš€": self.audio_player = AudioPlayer("active.wav") self.audio_player.start() self.set_symbols("πŸŽ™οΈ", "βšͺ") def on_wakeword_detection_start(self): self.audio_player = AudioPlayer("inactive.wav") self.audio_player.start() self.set_symbols("", "πŸ’€") def init_clear_user_text(self): if self.user_text_timer.isActive(): self.user_text_timer.stop() self.user_text_timer.start(10000) def remove_user_text(self): self.user_text = "" self.user_text_opacity = 255 self.updateUI.emit() def fade_out_user_text(self): if not self.run_fade_user: return if self.user_text_opacity > 0: self.user_text_opacity -= 5 self.updateUI.emit() QTimer.singleShot(50, self.fade_out_user_text) else: self.run_fade_user = False self.remove_user_text() def clear_user_text(self): self.user_text_timer.stop() if not self.user_text: return self.user_text_opacity = 255 self.run_fade_user = True self.fade_out_user_text() def init_clear_assistant_text(self): if self.assistant_text_timer.isActive(): self.assistant_text_timer.stop() self.assistant_text_timer.start(10000) def remove_assistant_text(self): self.assistant_text = "" self.assistant_text_opacity = 255 self.updateUI.emit() def fade_out_assistant_text(self): if not self.run_fade_assistant: return if self.assistant_text_opacity > 0: self.assistant_text_opacity -= 5 self.updateUI.emit() QTimer.singleShot(50, self.fade_out_assistant_text) else: self.run_fade_assistant = False self.remove_assistant_text() def clear_assistant_text(self): self.assistant_text_timer.stop() if not self.assistant_text: return self.assistant_text_opacity = 255 self.run_fade_assistant = True self.fade_out_assistant_text() # def keyPressEvent(self, event): # if event.key() == Qt.Key_Escape: # self.stream.stop() # super().keyPressEvent(event) def update_self(self): self.blockSignals(True) self.displayed_user_text, self.user_width = self.return_text_adjusted_to_width(self.user_text, self.user_font, MAX_WIDTH_USER) self.displayed_assistant_text, self.assistant_width = self.return_text_adjusted_to_width(self.assistant_text, self.assistant_font, MAX_WIDTH_ASSISTANT) fm_symbol = QFontMetrics(self.big_symbol_font) self.symbol_width = fm_symbol.width(self.big_symbol_text) + 3 self.symbol_height = fm_symbol.height() + 8 self.total_width = MAX_WINDOW_WIDTH fm_user = QFontMetrics(self.user_font) user_text_lines = (self.displayed_user_text.count("\n") + 1) self.user_height = fm_user.height() * user_text_lines + 7 fm_assistant = QFontMetrics(self.assistant_font) assistant_text_lines = (self.displayed_assistant_text.count("\n") + 1) self.assistant_height = fm_assistant.height() * assistant_text_lines + 18 self.total_height = sum([self.symbol_height, self.user_height, self.assistant_height]) desktop = QDesktopWidget() screen_rect = desktop.availableGeometry(desktop.primaryScreen()) self.setGeometry(screen_rect.right() - self.total_width - 50, 0, self.total_width + 50, self.total_height + 50) self.blockSignals(False) self.update() def drawTextWithOutline(self, painter, x, y, width, height, alignment, text, textColor, outlineColor, outline_size): painter.setPen(outlineColor) for dx, dy in [(-outline_size, 0), (outline_size, 0), (0, -outline_size), (0, outline_size), (-outline_size, -outline_size), (outline_size, -outline_size), (-outline_size, outline_size), (outline_size, outline_size)]: painter.drawText(x + dx, y + dy, width, height, alignment, text) painter.setPen(textColor) painter.drawText(x, y, width, height, alignment, text) def paintEvent(self, event): painter = QPainter(self) offsetX = 4 offsetY = 5 painter.setPen(QColor(255, 255, 255)) # Draw symbol painter.setFont(self.big_symbol_font) if self.big_symbol_text: painter.drawText(self.total_width - self.symbol_width + 5 + offsetX, offsetY, self.symbol_width, self.symbol_height, Qt.AlignRight | Qt.AlignTop, self.big_symbol_text) painter.setFont(self.small_symbol_font) painter.drawText(self.total_width - self.symbol_width + 17 + offsetX, offsetY + 10, self.symbol_width, self.symbol_height, Qt.AlignRight | Qt.AlignBottom, self.small_symbol_text) else: painter.setFont(self.small_symbol_font) painter.drawText(self.total_width - 43 + offsetX, offsetY + 2, 50, 50, Qt.AlignRight | Qt.AlignBottom, self.small_symbol_text) # Draw User Text painter.setFont(self.user_font) user_x = self.total_width - self.user_width - 45 + offsetX user_y = offsetY + 15 user_color_with_opacity = QColor(user_color.red(), user_color.green(), user_color.blue(), self.user_text_opacity) outline_color_with_opacity = QColor(0, 0, 0, self.user_text_opacity) self.drawTextWithOutline(painter, user_x, user_y, self.user_width, self.user_height, Qt.AlignRight | Qt.AlignTop, self.displayed_user_text, user_color_with_opacity, outline_color_with_opacity, 2) # Draw Assistant Text painter.setFont(self.assistant_font) assistant_x = self.total_width - self.assistant_width - 5 + offsetX assistant_y = self.user_height + offsetY + 15 assistant_color_with_opacity = QColor(assistant_color.red(), assistant_color.green(), assistant_color.blue(), self.assistant_text_opacity) outline_color_with_opacity = QColor(0, 0, 0, self.assistant_text_opacity) self.drawTextWithOutline(painter, assistant_x, assistant_y, self.assistant_width, self.assistant_height, Qt.AlignRight | Qt.AlignTop, self.displayed_assistant_text, assistant_color_with_opacity, outline_color_with_opacity, 2) def return_text_adjusted_to_width(self, text, font, max_width_allowed): """ Line feeds are inserted so that the text width does never exceed max_width. Text is only broken up on whole words. """ fm = QFontMetrics(font) words = text.split(' ') adjusted_text = '' current_line = '' max_width_used = 0 for word in words: current_width = fm.width(current_line + word) if current_width <= max_width_allowed: current_line += word + ' ' else: line_width = fm.width(current_line) if line_width > max_width_used: max_width_used = line_width adjusted_text += current_line + '\n' current_line = word + ' ' line_width = fm.width(current_line) if line_width > max_width_used: max_width_used = line_width adjusted_text += current_line return adjusted_text.rstrip(), max_width_used if __name__ == '__main__': app = QApplication(sys.argv) window = TransparentWindow() window.show() sys.exit(app.exec_())