Merge pull request #786 from msrashed2018/salah/bootcamp-week-2

Bootcamp | Salah: Week 2 Technical Assessment
This commit is contained in:
Ed Donner
2025-10-22 08:12:51 -04:00
committed by GitHub
22 changed files with 1048 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
OPENAI_API_KEY=sk-or-v1-openai-api-key
GEMINI_API_KEY=AI-gemini-api-key

View File

@@ -0,0 +1,4 @@
openai>=1.3.0
gradio>=4.0.0
python-dotenv>=1.0.0
google-genai>=0.3.0

View File

@@ -0,0 +1,2 @@
OPENAI_API_KEY=sk-or-v1-your-openrouter-api-key-here
GEMINI_API_KEY=your-gemini-api-key-here

View File

@@ -0,0 +1,213 @@
import gradio as gr
from simple_assistant import Assistant
class SimpleUI:
def __init__(self):
print("\n" + "="*60)
print("Starting up...")
print("="*60)
self.assistant = Assistant()
self.history = [] # Text history for API
self.display_history = [] # Display history with audio for chat UI
self.audio_enabled = True
print("UI initialized")
print("Audio features: Gemini STT + TTS")
print("="*60 + "\n")
def add_message(self, msg):
print("\n" + ">"*60)
print(f"[UI] New message: {msg[:50]}...")
if not msg.strip():
print("[UI] Empty message, ignoring")
print(">"*60 + "\n")
return self.display_history, ""
print(f"[UI] Adding to history (current: {len(self.history)} messages)")
# Add to API history (text only)
self.history.append({"role": "user", "content": msg})
# Add to display history
self.display_history.append({"role": "user", "content": msg})
print("[UI] Getting AI response...")
response = self.assistant.chat(msg, self.history)
print(f"[UI] Adding response to history")
# Add to API history (text only)
self.history.append({"role": "assistant", "content": response})
# Add to display history
self.display_history.append({"role": "assistant", "content": response})
print(f"[UI] Total history: {len(self.history)} messages")
print(f"[UI] Returning {len(self.display_history)} messages to display")
print(">"*60 + "\n")
return self.display_history, ""
def handle_voice_input(self, audio_file):
print("\n" + ">"*60)
print("[UI] Voice input received")
print(f"[UI] Audio file: {audio_file}")
if not audio_file:
print("[UI] No audio file")
print(">"*60 + "\n")
return self.display_history, None
# Transcribe
print("[UI] Transcribing with Gemini...")
text = self.assistant.speech_to_text(audio_file)
if not text:
print("[UI] Transcription failed")
print(">"*60 + "\n")
error_msg = "Sorry, couldn't transcribe audio"
self.history.append({"role": "assistant", "content": error_msg})
self.display_history.append({"role": "assistant", "content": error_msg})
return self.display_history, None
print(f"[UI] Transcribed: {text}")
# Add to API history (text only)
self.history.append({"role": "user", "content": text})
# Add voice message to display history with audio file
self.display_history.append({
"role": "user",
"content": {
"path": audio_file,
"alt_text": f"🎤 {text}"
}
})
# Get response
print("[UI] Getting AI response...")
response = self.assistant.chat(text, self.history)
# Add text response to API history
self.history.append({"role": "assistant", "content": response})
# Generate audio response
print("[UI] Generating audio with Gemini TTS...")
audio_response = self.assistant.text_to_speech(response)
if audio_response:
print(f"[UI] ✓ Audio response generated")
# Add response with audio to display history
self.display_history.append({
"role": "assistant",
"content": {
"path": audio_response,
"alt_text": f"🔊 {response[:100]}..."
}
})
else:
print(f"[UI] ⚠ No audio, text only")
self.display_history.append({"role": "assistant", "content": response})
print(f"[UI] Returning {len(self.display_history)} messages")
print(">"*60 + "\n")
return self.display_history, None
def analyze(self, code, lang):
print("\n" + ">"*60)
print(f"[UI] Code analysis request")
print(f"[UI] Language: {lang}")
print(f"[UI] Code length: {len(code)} chars")
if not code.strip():
print("[UI] Empty code, ignoring")
print(">"*60 + "\n")
return self.display_history
print("[UI] Calling analyze_code...")
result = self.assistant.analyze_code(code, lang)
print("[UI] Adding to history")
# Add to API history
self.history.append({"role": "user", "content": f"Analyze {lang} code"})
self.history.append({"role": "assistant", "content": result})
# Add to display history
self.display_history.append({"role": "user", "content": f"Analyze {lang} code"})
self.display_history.append({"role": "assistant", "content": result})
print(f"[UI] Returning {len(self.display_history)} messages")
print(">"*60 + "\n")
return self.display_history
def create_ui(self):
print("\n" + "="*60)
print("Creating Gradio UI...")
print("="*60)
with gr.Blocks() as app:
gr.Markdown("# Tech Assistant")
gr.Markdown("**Voice-enabled**: Type or record audio messages")
# Chat panel - shows all messages including audio
chat = gr.Chatbot(type="messages", height=500)
print("✓ Chatbot created")
# Input area at bottom (like ChatGPT)
with gr.Row():
msg = gr.Textbox(
label="Message",
placeholder="Type a message or record audio...",
scale=9,
container=False
)
mic = gr.Audio(
sources=["microphone"],
type="filepath",
label="🎤 Record",
scale=1,
waveform_options={"show_controls": False}
)
print("✓ Message and record inputs created")
# Wire events
msg.submit(self.add_message, msg, [chat, msg])
print("✓ Message submit event wired")
mic.stop_recording(self.handle_voice_input, mic, [chat, mic])
print("✓ Voice input event wired")
# Tools section
with gr.Accordion("Tools", open=False):
gr.Markdown("### Code Analysis")
code = gr.Textbox(label="Code", lines=8)
lang = gr.Dropdown(
choices=["python", "javascript", "java"],
value="python",
label="Language"
)
analyze_btn = gr.Button("Analyze")
print("✓ Code analysis tools created")
analyze_btn.click(self.analyze, [code, lang], chat)
print("✓ Analyze button event wired")
print("✓ UI creation complete")
print("="*60 + "\n")
return app
def launch(self):
print("\n" + "="*60)
print("Launching Gradio app...")
print("="*60)
app = self.create_ui()
print("Starting server on port 7862...")
print("="*60 + "\n")
app.launch(server_port=7862)
if __name__ == "__main__":
print("\n" + "#"*60)
print("# TECH ASSISTANT - SIMPLE UI")
print("#"*60 + "\n")
ui = SimpleUI()
ui.launch()

View File

@@ -0,0 +1,259 @@
import os
import json
from google import genai
from google.genai import types
from dotenv import load_dotenv
from openai import OpenAI
from pathlib import Path
import tempfile
import wave
load_dotenv()
class Assistant:
def __init__(self):
print("\n" + "="*60)
print("Initializing Assistant...")
print("="*60)
openrouter_key = os.getenv('OPENAI_API_KEY')
gemini_key = os.getenv('GEMINI_API_KEY')
print(f"OpenRouter API Key: {openrouter_key[:20]}..." if openrouter_key else "OpenRouter API Key: NOT FOUND")
print(f"Gemini API Key: {gemini_key[:20]}..." if gemini_key else "Gemini API Key: NOT FOUND")
# OpenRouter client for text (GPT-4o-mini)
print("Setting up OpenRouter client...")
self.openrouter = OpenAI(
api_key=openrouter_key,
base_url="https://openrouter.ai/api/v1"
)
print("OpenRouter client ready")
# Gemini client for audio and images
print("Setting up Gemini client...")
self.gemini_client = genai.Client(api_key=gemini_key)
print("Gemini client ready (audio + images)")
self.text_model = "openai/gpt-4o-mini"
self.system_prompt = "You are a helpful technical assistant. Keep answers clear and practical."
self.stt_model = "gemini-2.0-flash-exp"
self.tts_model = "gemini-2.5-flash-preview-tts"
print(f"Text Model: {self.text_model}")
print(f"STT Model: {self.stt_model}")
print(f"TTS Model: {self.tts_model}")
def chat(self, message, history=[]):
print(f"[Chat] User: {message[:50]}...")
print(f"[Chat] History messages: {len(history)}")
print(f"[Chat] Model: {self.text_model}")
messages = [{"role": "system", "content": self.system_prompt}]
messages.extend(history)
messages.append({"role": "user", "content": message})
print(f"[Chat] Total messages to send: {len(messages)}")
print("[Chat] Calling OpenRouter API...")
try:
response = self.openrouter.chat.completions.create(
model=self.text_model,
messages=messages,
extra_body={
"usage": {
"include": True
}
}
)
reply = response.choices[0].message.content
print(f"[Chat] Response received")
print(f"[Chat] GPT-4o-mini: {len(reply)} chars")
print(f"[Chat] Preview: {reply[:100]}...")
# Print usage and cost
if hasattr(response, 'usage') and response.usage:
usage = response.usage
print(f"[Chat] Usage:")
print(f" - Prompt tokens: {usage.prompt_tokens}")
print(f" - Completion tokens: {usage.completion_tokens}")
print(f" - Total tokens: {usage.total_tokens}")
if hasattr(usage, 'cost') and usage.cost:
print(f" - Cost: ${usage.cost:.6f}")
print("-"*60 + "\n")
return reply
except Exception as e:
print(f"[Error] ✗ API call failed: {e}")
print("-"*60 + "\n")
return f"Error: {str(e)}"
def analyze_code(self, code, language="python"):
print("\n" + "="*60)
print(f"[Code] Analyzing {language} code...")
print(f"[Code] Code length: {len(code)} characters")
print(f"[Code] Lines: {len(code.splitlines())}")
print("="*60)
prompt = f"Analyze this {language} code for bugs and improvements:\n\n```{language}\n{code}\n```"
result = self.chat(prompt)
print("[Code] Analysis complete\n")
return result
def generate_image(self, description):
print("\n" + "="*60)
print(f"[Image] Gemini generating: {description[:50]}...")
print(f"[Image] Model: gemini-2.0-flash-exp")
try:
prompt = f"Generate an image of: {description}. Make it clear and professional."
print("[Image] Calling Gemini API...")
response = self.gemini_client.models.generate_content(
model='gemini-2.0-flash-exp',
contents=prompt
)
print("[Image] Response received")
print(f"[Image] Result length: {len(response.text)} chars")
# Print usage and cost (Gemini 2.0 Flash: $0.30/1M input, $2.50/1M output)
if hasattr(response, 'usage_metadata'):
usage = response.usage_metadata
input_tokens = usage.prompt_token_count
output_tokens = usage.candidates_token_count
total_tokens = usage.total_token_count
cost = (input_tokens * 0.30 + output_tokens * 2.50) / 1_000_000
print(f"[Image] Usage:")
print(f" - Input tokens: {input_tokens}")
print(f" - Output tokens: {output_tokens}")
print(f" - Total tokens: {total_tokens}")
print(f" - Cost: ${cost:.6f}")
print("="*60 + "\n")
return response.text
except Exception as e:
print(f"[Error] ✗ Image generation failed: {e}")
print("="*60 + "\n")
return None
def speech_to_text(self, audio_file_path):
print("\n" + "="*60)
print("[STT] Gemini speech-to-text...")
print(f"[STT] Audio file: {audio_file_path}")
try:
print("[STT] Uploading audio file to Gemini...")
audio_file = self.gemini_client.files.upload(file=audio_file_path)
print(f"[STT] File uploaded: {audio_file.name}")
print("[STT] Transcribing with Gemini...")
prompt = "Generate a transcript of the speech."
response = self.gemini_client.models.generate_content(
model=self.stt_model,
contents=[prompt, audio_file]
)
text = response.text.strip()
print(f"[STT] Transcribed: {text[:100]}...")
print(f"[STT] Length: {len(text)} chars")
# Print usage and cost (Flash Native Audio Input: $3.00/1M tokens)
if hasattr(response, 'usage_metadata'):
usage = response.usage_metadata
input_tokens = usage.prompt_token_count
output_tokens = usage.candidates_token_count
total_tokens = usage.total_token_count
# Audio input is $3.00/1M, text output is $2.50/1M
cost = (input_tokens * 3.00 + output_tokens * 2.50) / 1_000_000
print(f"[STT] Usage:")
print(f" - Input tokens (audio): {input_tokens}")
print(f" - Output tokens (text): {output_tokens}")
print(f" - Total tokens: {total_tokens}")
print(f" - Cost: ${cost:.6f}")
print("="*60 + "\n")
return text
except Exception as e:
print(f"[Error] ✗ STT failed: {e}")
print(f"[Error] Full error: {type(e).__name__}: {str(e)}")
print("="*60 + "\n")
return None
def text_to_speech(self, text):
print("\n" + "="*60)
print(f"[TTS] Gemini text-to-speech...")
print(f"[TTS] Text: {text[:50]}...")
print(f"[TTS] Length: {len(text)} chars")
try:
# Limit text length for TTS
text_to_speak = text[:500] if len(text) > 500 else text
print("[TTS] Generating audio with Gemini TTS model...")
response = self.gemini_client.models.generate_content(
model=self.tts_model,
contents=f"Say cheerfully: {text_to_speak}",
config=types.GenerateContentConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(
voice_name='Kore',
)
)
),
)
)
print("[TTS] Audio generated, converting to WAV...")
# Extract raw PCM audio data
pcm_data = response.candidates[0].content.parts[0].inline_data.data
print(f"[TTS] Raw PCM size: {len(pcm_data)} bytes")
# Print usage and cost (2.5 Flash Preview TTS: $10.00/1M audio output tokens)
if hasattr(response, 'usage_metadata'):
usage = response.usage_metadata
input_tokens = usage.prompt_token_count
output_tokens = usage.candidates_token_count
total_tokens = usage.total_token_count
# Text input is $0.30/1M, audio output is $10.00/1M
cost = (input_tokens * 0.30 + output_tokens * 10.00) / 1_000_000
print(f"[TTS] Usage:")
print(f" - Input tokens (text): {input_tokens}")
print(f" - Output tokens (audio): {output_tokens}")
print(f" - Total tokens: {total_tokens}")
print(f" - Cost: ${cost:.6f}")
# Create WAV file with proper headers
# Gemini TTS outputs: 24kHz sample rate, mono, 16-bit PCM
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
with wave.open(temp_file.name, 'wb') as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 16-bit = 2 bytes
wav_file.setframerate(24000) # 24kHz
wav_file.writeframes(pcm_data)
temp_file.close()
print(f"[TTS] WAV file saved: {temp_file.name}")
print("="*60 + "\n")
return temp_file.name
except Exception as e:
print(f"[Error] ✗ TTS failed: {e}")
print(f"[Error] Full error: {type(e).__name__}: {str(e)}")
print("="*60 + "\n")
return None
if __name__ == "__main__":
assistant = Assistant()
# Test it
response = assistant.chat("What is Python?")
print(f"\nResponse: {response}")

View File

@@ -0,0 +1,20 @@
# API Keys - Required
OPENAI_API_KEY=sk-or-v1-your-openrouter-api-key-here
GEMINI_API_KEY=your-gemini-api-key-here
# Models - Optional (defaults provided)
TEXT_MODEL=openai/gpt-4o-mini
STT_MODEL=gemini-2.0-flash-exp
TTS_MODEL=gemini-2.5-flash-preview-tts
VOICE_NAME=Kore
# App Settings - Optional
PORT=7862
SYSTEM_PROMPT=You are a helpful assistant. Keep it simple and practical.
# Alternative Models You Can Try:
# TEXT_MODEL=anthropic/claude-3.5-sonnet
# TEXT_MODEL=google/gemini-pro-1.5
# TEXT_MODEL=meta-llama/llama-3.1-8b-instruct
# VOICE_NAME=Aoede
# VOICE_NAME=Fenrir

View File

@@ -0,0 +1,4 @@
openai>=1.3.0
gradio>=4.0.0
python-dotenv>=1.0.0
google-genai>=0.3.0

View File

@@ -0,0 +1,13 @@
#!/usr/bin/env python3
import sys
import os
# Add src to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
# Now import and run
from main import main
if __name__ == "__main__":
main()

View File

@@ -0,0 +1 @@
# Create __init__.py files to make directories proper Python packages

View File

@@ -0,0 +1,25 @@
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
def __init__(self):
self.openrouter_key = os.getenv('OPENAI_API_KEY')
self.gemini_key = os.getenv('GEMINI_API_KEY')
# Models - all configurable via env
self.text_model = os.getenv('TEXT_MODEL', "openai/gpt-4o-mini")
self.stt_model = os.getenv('STT_MODEL', "gemini-2.0-flash-exp")
self.tts_model = os.getenv('TTS_MODEL', "gemini-2.5-flash-preview-tts")
self.voice_name = os.getenv('VOICE_NAME', 'Kore')
# App settings
self.port = int(os.getenv('PORT', '7862'))
self.system_prompt = os.getenv('SYSTEM_PROMPT', "You are a helpful assistant. Keep it simple and practical.")
def validate(self):
if not self.openrouter_key:
raise Exception("Missing OPENAI_API_KEY")
if not self.gemini_key:
raise Exception("Missing GEMINI_API_KEY")

View File

@@ -0,0 +1,23 @@
from abc import ABC, abstractmethod
class AIClient(ABC):
@abstractmethod
def chat(self, messages):
pass
@abstractmethod
def analyze_code(self, code, language):
pass
@abstractmethod
def generate_linkedin_post(self, topic, tone="professional"):
pass
class AudioService(ABC):
@abstractmethod
def speech_to_text(self, audio_file):
pass
@abstractmethod
def text_to_speech(self, text):
pass

View File

@@ -0,0 +1,32 @@
from config.settings import Config
from services.openrouter_client import OpenRouterClient
from services.gemini_audio_service import GeminiAudioService
from services.conversation_manager import ConversationManager
from ui.gradio_interface import AssistantUI
def main():
print("Starting AI Assistant...")
# Load config
config = Config()
config.validate()
# Setup services
ai_client = OpenRouterClient(config.openrouter_key, config.text_model)
audio_service = GeminiAudioService(
config.gemini_key,
config.stt_model,
config.tts_model,
config.voice_name
)
conversation = ConversationManager(config.system_prompt)
# Create UI
ui = AssistantUI(ai_client, audio_service, conversation)
app = ui.create_interface()
print(f"Launching on port {config.port}...")
app.launch(server_port=config.port)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,6 @@
from dataclasses import dataclass
@dataclass
class Message:
role: str
content: str

View File

@@ -0,0 +1,35 @@
from models.message import Message
class ConversationManager:
def __init__(self, system_prompt):
self.system_prompt = system_prompt
self.messages = []
def add_user_message(self, content):
print(f"[Conversation] Adding user message: {content[:100]}...")
print(f"[Conversation] Message length: {len(content)} chars")
self.messages.append(Message("user", content))
print(f"[Conversation] Total messages: {len(self.messages)}")
def add_assistant_message(self, content):
print(f"[Conversation] Adding assistant message: {content[:100]}...")
print(f"[Conversation] Message length: {len(content)} chars")
self.messages.append(Message("assistant", content))
print(f"[Conversation] Total messages: {len(self.messages)}")
def get_api_messages(self):
# Convert to format expected by APIs
api_messages = [{"role": "system", "content": self.system_prompt}]
for msg in self.messages:
api_messages.append({"role": msg.role, "content": msg.content})
# Calculate total context size
total_chars = sum(len(msg["content"]) for msg in api_messages)
estimated_tokens = total_chars // 4 # Rough estimate
print(f"[Conversation] API messages prepared:")
print(f" - Total messages: {len(api_messages)} (including system)")
print(f" - Total characters: {total_chars}")
print(f" - Estimated tokens: {estimated_tokens}")
return api_messages

View File

@@ -0,0 +1,124 @@
from google import genai
from google.genai import types
import tempfile
import wave
from interfaces.ai_client import AudioService
class GeminiAudioService(AudioService):
def __init__(self, api_key, stt_model, tts_model, voice_name):
self.client = genai.Client(api_key=api_key)
self.stt_model = stt_model
self.tts_model = tts_model
self.voice_name = voice_name
def speech_to_text(self, audio_file):
print(f"[Gemini STT] Processing audio file: {audio_file}")
print(f"[Gemini STT] Model: {self.stt_model}")
try:
# Get file size for logging
import os
file_size = os.path.getsize(audio_file)
print(f"[Gemini STT] Audio file size: {file_size} bytes")
print("[Gemini STT] Uploading to Gemini...")
uploaded_file = self.client.files.upload(file=audio_file)
print(f"[Gemini STT] File uploaded: {uploaded_file.name}")
print("[Gemini STT] Transcribing...")
response = self.client.models.generate_content(
model=self.stt_model,
contents=["Transcribe the speech in this audio file. Return only the spoken words, nothing else.", uploaded_file]
)
text = response.text.strip()
print(f"[Gemini STT] Transcription length: {len(text)} chars")
print(f"[Gemini STT] Transcription: {text[:100]}...")
# Print usage information if available
if hasattr(response, 'usage_metadata'):
usage = response.usage_metadata
input_tokens = usage.prompt_token_count
output_tokens = usage.candidates_token_count
total_tokens = usage.total_token_count
# Audio input cost: $3.00/1M tokens, text output: $2.50/1M tokens
cost = (input_tokens * 3.00 + output_tokens * 2.50) / 1_000_000
print(f"[Gemini STT] Token usage:")
print(f" - Input tokens (audio): {input_tokens}")
print(f" - Output tokens (text): {output_tokens}")
print(f" - Total tokens: {total_tokens}")
print(f" - Estimated cost: ${cost:.6f}")
print("[Gemini STT] Success")
return text
except Exception as e:
print(f"[Gemini STT] Error: {e}")
return None
def text_to_speech(self, text):
print(f"[Gemini TTS] Converting text to speech")
print(f"[Gemini TTS] Model: {self.tts_model}, Voice: {self.voice_name}")
print(f"[Gemini TTS] Input text length: {len(text)} chars")
try:
# Keep it short for TTS
text_to_speak = text[:500] if len(text) > 500 else text
if len(text) > 500:
print(f"[Gemini TTS] Text truncated to 500 chars")
print(f"[Gemini TTS] Text preview: {text_to_speak[:100]}...")
print("[Gemini TTS] Generating audio...")
response = self.client.models.generate_content(
model=self.tts_model,
contents=f"Say: {text_to_speak}",
config=types.GenerateContentConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(
voice_name=self.voice_name,
)
)
),
)
)
pcm_data = response.candidates[0].content.parts[0].inline_data.data
print(f"[Gemini TTS] Raw PCM data size: {len(pcm_data)} bytes")
# Print usage information if available
if hasattr(response, 'usage_metadata'):
usage = response.usage_metadata
input_tokens = usage.prompt_token_count
output_tokens = usage.candidates_token_count
total_tokens = usage.total_token_count
# Text input: $0.30/1M tokens, audio output: $10.00/1M tokens
cost = (input_tokens * 0.30 + output_tokens * 10.00) / 1_000_000
print(f"[Gemini TTS] Token usage:")
print(f" - Input tokens (text): {input_tokens}")
print(f" - Output tokens (audio): {output_tokens}")
print(f" - Total tokens: {total_tokens}")
print(f" - Estimated cost: ${cost:.6f}")
# Create WAV file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
with wave.open(temp_file.name, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(24000)
wav_file.writeframes(pcm_data)
temp_file.close()
print(f"[Gemini TTS] WAV file created: {temp_file.name}")
print("[Gemini TTS] Success")
return temp_file.name
except Exception as e:
print(f"[Gemini TTS] Error: {e}")
return None

View File

@@ -0,0 +1,91 @@
from openai import OpenAI
from interfaces.ai_client import AIClient
class OpenRouterClient(AIClient):
def __init__(self, api_key, model):
self.client = OpenAI(
api_key=api_key,
base_url="https://openrouter.ai/api/v1"
)
self.model = model
def chat(self, messages):
print(f"[OpenRouter] Calling {self.model}")
print(f"[OpenRouter] Messages count: {len(messages)}")
# Calculate input tokens estimate (rough)
total_chars = sum(len(msg.get('content', '')) for msg in messages)
estimated_tokens = total_chars // 4 # Rough estimate
print(f"[OpenRouter] Estimated input tokens: {estimated_tokens}")
try:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
extra_body={
"usage": {
"include": True
}
}
)
content = response.choices[0].message.content
print(f"[OpenRouter] Response length: {len(content)} chars")
print(f"[OpenRouter] Response preview: {content[:100]}...")
# Print usage information if available
if hasattr(response, 'usage') and response.usage:
usage = response.usage
print(f"[OpenRouter] Token usage:")
print(f" - Prompt tokens: {usage.prompt_tokens}")
print(f" - Completion tokens: {usage.completion_tokens}")
print(f" - Total tokens: {usage.total_tokens}")
# Try to get cost information
if hasattr(usage, 'cost') and usage.cost:
print(f" - Cost: ${usage.cost:.6f}")
else:
# Rough cost estimate for GPT-4o-mini ($0.15/1M input, $0.60/1M output)
estimated_cost = (usage.prompt_tokens * 0.15 + usage.completion_tokens * 0.60) / 1_000_000
print(f" - Estimated cost: ${estimated_cost:.6f}")
print(f"[OpenRouter] Success")
return content
except Exception as e:
print(f"[OpenRouter] Error: {str(e)}")
return f"Error: {str(e)}"
def analyze_code(self, code, language):
print(f"[OpenRouter] Code analysis request - Language: {language}")
print(f"[OpenRouter] Code length: {len(code)} chars, {len(code.splitlines())} lines")
prompt = f"Analyze this {language} code for bugs and improvements:\n\n```{language}\n{code}\n```"
messages = [{"role": "user", "content": prompt}]
return self.chat(messages)
def generate_linkedin_post(self, topic, tone="professional"):
print(f"[OpenRouter] LinkedIn post request - Topic: {topic[:50]}...")
print(f"[OpenRouter] Tone: {tone}")
tone_styles = {
"professional": "formal, informative, and industry-focused",
"casual": "friendly, approachable, and conversational",
"inspirational": "motivating, uplifting, and thought-provoking",
"educational": "informative, teaching-focused, and valuable"
}
style = tone_styles.get(tone, "professional and engaging")
prompt = f"""Create a LinkedIn post about: {topic}
Make it {style}. Include:
- Hook that grabs attention
- 2-3 key insights or takeaways
- Call to action or question for engagement
- Relevant hashtags (3-5)
Keep it under 300 words and format for LinkedIn readability."""
messages = [{"role": "user", "content": prompt}]
return self.chat(messages)

View File

@@ -0,0 +1,194 @@
import gradio as gr
class AssistantUI:
def __init__(self, ai_client, audio_service, conversation_manager):
self.ai_client = ai_client
self.audio_service = audio_service
self.conversation = conversation_manager
self.display_history = []
def handle_text_message(self, message):
if not message.strip():
return self.display_history, ""
# Add user message
self.conversation.add_user_message(message)
self.display_history.append({"role": "user", "content": message})
# Get AI response
api_messages = self.conversation.get_api_messages()
response = self.ai_client.chat(api_messages)
# Check if response is an error
is_error = response.startswith("Error:")
if is_error:
print(f"AI Client Error: {response}")
# Show error in chat but don't add to conversation history
self.display_history.append({"role": "assistant", "content": response})
return self.display_history, ""
# Add successful response to conversation
self.conversation.add_assistant_message(response)
self.display_history.append({"role": "assistant", "content": response})
return self.display_history, ""
def handle_voice_message(self, audio_file):
if not audio_file:
return self.display_history, None
# Transcribe audio
text = self.audio_service.speech_to_text(audio_file)
if not text:
return self.display_history, None
# Add transcribed message to display
self.display_history.append({
"role": "user",
"content": {"path": audio_file, "alt_text": f"Voice: {text}"}
})
# Process as text message
self.conversation.add_user_message(text)
api_messages = self.conversation.get_api_messages()
response = self.ai_client.chat(api_messages)
# Check if response is an error
is_error = response.startswith("Error:")
if is_error:
print(f"AI Client Error: {response}")
# Show error in chat but don't convert to speech
self.display_history.append({"role": "assistant", "content": response})
return self.display_history, None
self.conversation.add_assistant_message(response)
# Generate audio response only for successful responses
audio_response = self.audio_service.text_to_speech(response)
if audio_response:
self.display_history.append({
"role": "assistant",
"content": {"path": audio_response, "alt_text": response[:100] + "..."}
})
else:
self.display_history.append({"role": "assistant", "content": response})
return self.display_history, None
def analyze_code(self, code, language):
if not code.strip():
return self.display_history
result = self.ai_client.analyze_code(code, language)
# Check for errors
is_error = result.startswith("Error:")
if is_error:
print(f"Code Analysis Error: {result}")
self.display_history.append({"role": "user", "content": f"Code analysis ({language})"})
self.display_history.append({"role": "assistant", "content": result})
return self.display_history
# Add to conversation only if successful
self.conversation.add_user_message(f"Analyze {language} code")
self.conversation.add_assistant_message(result)
# Add to display
self.display_history.append({"role": "user", "content": f"Code analysis ({language})"})
self.display_history.append({"role": "assistant", "content": result})
return self.display_history
def generate_linkedin_post(self, topic, tone):
if not topic.strip():
return self.display_history
result = self.ai_client.generate_linkedin_post(topic, tone)
# Check for errors
is_error = result.startswith("Error:")
if is_error:
print(f"LinkedIn Post Generation Error: {result}")
self.display_history.append({"role": "user", "content": f"LinkedIn post ({tone}): {topic}"})
self.display_history.append({"role": "assistant", "content": result})
return self.display_history
# Add to conversation only if successful
self.conversation.add_user_message(f"Generate LinkedIn post about: {topic}")
self.conversation.add_assistant_message(result)
# Add to display
self.display_history.append({"role": "user", "content": f"LinkedIn post ({tone}): {topic}"})
self.display_history.append({"role": "assistant", "content": result})
return self.display_history
def create_interface(self):
with gr.Blocks() as app:
gr.Markdown("# AI Assistant")
gr.Markdown("Chat with text or voice")
# Main chat
chat = gr.Chatbot(type="messages", height=500)
# Input area
with gr.Row():
msg = gr.Textbox(
label="Message",
placeholder="Type or record...",
scale=9,
container=False
)
mic = gr.Audio(
sources=["microphone"],
type="filepath",
label="Record",
scale=1
)
# Wire up events
msg.submit(self.handle_text_message, msg, [chat, msg])
mic.stop_recording(self.handle_voice_message, mic, [chat, mic])
# Code analysis tool
with gr.Accordion("Code Analysis", open=False):
code_input = gr.Textbox(label="Code", lines=8)
lang_select = gr.Dropdown(
choices=["python", "javascript", "java"],
value="python",
label="Language"
)
analyze_btn = gr.Button("Analyze")
analyze_btn.click(
self.analyze_code,
[code_input, lang_select],
chat
)
# LinkedIn post generator
with gr.Accordion("LinkedIn Post Generator", open=False):
topic_input = gr.Textbox(
label="Topic",
placeholder="What do you want to post about?",
lines=2
)
tone_select = gr.Dropdown(
choices=["professional", "casual", "inspirational", "educational"],
value="professional",
label="Tone"
)
generate_btn = gr.Button("Generate Post")
generate_btn.click(
self.generate_linkedin_post,
[topic_input, tone_select],
chat
)
return app