Week 2: Technical Assistant - Salah (Bootcamp)
This commit is contained in:
2
week2/community-contributions/salah/.env.example
Normal file
2
week2/community-contributions/salah/.env.example
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
OPENAI_API_KEY=sk-or-v1-openai-api-key
|
||||||
|
GEMINI_API_KEY=AI-gemini-api-key
|
||||||
213
week2/community-contributions/salah/app.py
Normal file
213
week2/community-contributions/salah/app.py
Normal file
@@ -0,0 +1,213 @@
|
|||||||
|
import gradio as gr
|
||||||
|
from simple_assistant import Assistant
|
||||||
|
|
||||||
|
class SimpleUI:
|
||||||
|
def __init__(self):
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print("Starting up...")
|
||||||
|
print("="*60)
|
||||||
|
self.assistant = Assistant()
|
||||||
|
self.history = [] # Text history for API
|
||||||
|
self.display_history = [] # Display history with audio for chat UI
|
||||||
|
self.audio_enabled = True
|
||||||
|
print("UI initialized")
|
||||||
|
print("Audio features: Gemini STT + TTS")
|
||||||
|
print("="*60 + "\n")
|
||||||
|
|
||||||
|
def add_message(self, msg):
|
||||||
|
print("\n" + ">"*60)
|
||||||
|
print(f"[UI] New message: {msg[:50]}...")
|
||||||
|
|
||||||
|
if not msg.strip():
|
||||||
|
print("[UI] Empty message, ignoring")
|
||||||
|
print(">"*60 + "\n")
|
||||||
|
return self.display_history, ""
|
||||||
|
|
||||||
|
print(f"[UI] Adding to history (current: {len(self.history)} messages)")
|
||||||
|
# Add to API history (text only)
|
||||||
|
self.history.append({"role": "user", "content": msg})
|
||||||
|
# Add to display history
|
||||||
|
self.display_history.append({"role": "user", "content": msg})
|
||||||
|
|
||||||
|
print("[UI] Getting AI response...")
|
||||||
|
response = self.assistant.chat(msg, self.history)
|
||||||
|
|
||||||
|
print(f"[UI] Adding response to history")
|
||||||
|
# Add to API history (text only)
|
||||||
|
self.history.append({"role": "assistant", "content": response})
|
||||||
|
# Add to display history
|
||||||
|
self.display_history.append({"role": "assistant", "content": response})
|
||||||
|
print(f"[UI] Total history: {len(self.history)} messages")
|
||||||
|
|
||||||
|
print(f"[UI] Returning {len(self.display_history)} messages to display")
|
||||||
|
print(">"*60 + "\n")
|
||||||
|
return self.display_history, ""
|
||||||
|
|
||||||
|
def handle_voice_input(self, audio_file):
|
||||||
|
print("\n" + ">"*60)
|
||||||
|
print("[UI] Voice input received")
|
||||||
|
print(f"[UI] Audio file: {audio_file}")
|
||||||
|
|
||||||
|
if not audio_file:
|
||||||
|
print("[UI] No audio file")
|
||||||
|
print(">"*60 + "\n")
|
||||||
|
return self.display_history, None
|
||||||
|
|
||||||
|
# Transcribe
|
||||||
|
print("[UI] Transcribing with Gemini...")
|
||||||
|
text = self.assistant.speech_to_text(audio_file)
|
||||||
|
|
||||||
|
if not text:
|
||||||
|
print("[UI] Transcription failed")
|
||||||
|
print(">"*60 + "\n")
|
||||||
|
error_msg = "Sorry, couldn't transcribe audio"
|
||||||
|
self.history.append({"role": "assistant", "content": error_msg})
|
||||||
|
self.display_history.append({"role": "assistant", "content": error_msg})
|
||||||
|
return self.display_history, None
|
||||||
|
|
||||||
|
print(f"[UI] Transcribed: {text}")
|
||||||
|
|
||||||
|
# Add to API history (text only)
|
||||||
|
self.history.append({"role": "user", "content": text})
|
||||||
|
|
||||||
|
# Add voice message to display history with audio file
|
||||||
|
self.display_history.append({
|
||||||
|
"role": "user",
|
||||||
|
"content": {
|
||||||
|
"path": audio_file,
|
||||||
|
"alt_text": f"🎤 {text}"
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
# Get response
|
||||||
|
print("[UI] Getting AI response...")
|
||||||
|
response = self.assistant.chat(text, self.history)
|
||||||
|
|
||||||
|
# Add text response to API history
|
||||||
|
self.history.append({"role": "assistant", "content": response})
|
||||||
|
|
||||||
|
# Generate audio response
|
||||||
|
print("[UI] Generating audio with Gemini TTS...")
|
||||||
|
audio_response = self.assistant.text_to_speech(response)
|
||||||
|
|
||||||
|
if audio_response:
|
||||||
|
print(f"[UI] ✓ Audio response generated")
|
||||||
|
# Add response with audio to display history
|
||||||
|
self.display_history.append({
|
||||||
|
"role": "assistant",
|
||||||
|
"content": {
|
||||||
|
"path": audio_response,
|
||||||
|
"alt_text": f"🔊 {response[:100]}..."
|
||||||
|
}
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
print(f"[UI] ⚠ No audio, text only")
|
||||||
|
self.display_history.append({"role": "assistant", "content": response})
|
||||||
|
|
||||||
|
print(f"[UI] Returning {len(self.display_history)} messages")
|
||||||
|
print(">"*60 + "\n")
|
||||||
|
|
||||||
|
return self.display_history, None
|
||||||
|
|
||||||
|
def analyze(self, code, lang):
|
||||||
|
print("\n" + ">"*60)
|
||||||
|
print(f"[UI] Code analysis request")
|
||||||
|
print(f"[UI] Language: {lang}")
|
||||||
|
print(f"[UI] Code length: {len(code)} chars")
|
||||||
|
|
||||||
|
if not code.strip():
|
||||||
|
print("[UI] Empty code, ignoring")
|
||||||
|
print(">"*60 + "\n")
|
||||||
|
return self.display_history
|
||||||
|
|
||||||
|
print("[UI] Calling analyze_code...")
|
||||||
|
result = self.assistant.analyze_code(code, lang)
|
||||||
|
|
||||||
|
print("[UI] Adding to history")
|
||||||
|
# Add to API history
|
||||||
|
self.history.append({"role": "user", "content": f"Analyze {lang} code"})
|
||||||
|
self.history.append({"role": "assistant", "content": result})
|
||||||
|
|
||||||
|
# Add to display history
|
||||||
|
self.display_history.append({"role": "user", "content": f"Analyze {lang} code"})
|
||||||
|
self.display_history.append({"role": "assistant", "content": result})
|
||||||
|
|
||||||
|
print(f"[UI] Returning {len(self.display_history)} messages")
|
||||||
|
print(">"*60 + "\n")
|
||||||
|
return self.display_history
|
||||||
|
|
||||||
|
def create_ui(self):
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print("Creating Gradio UI...")
|
||||||
|
print("="*60)
|
||||||
|
|
||||||
|
with gr.Blocks() as app:
|
||||||
|
|
||||||
|
gr.Markdown("# Tech Assistant")
|
||||||
|
gr.Markdown("**Voice-enabled**: Type or record audio messages")
|
||||||
|
|
||||||
|
# Chat panel - shows all messages including audio
|
||||||
|
chat = gr.Chatbot(type="messages", height=500)
|
||||||
|
print("✓ Chatbot created")
|
||||||
|
|
||||||
|
# Input area at bottom (like ChatGPT)
|
||||||
|
with gr.Row():
|
||||||
|
msg = gr.Textbox(
|
||||||
|
label="Message",
|
||||||
|
placeholder="Type a message or record audio...",
|
||||||
|
scale=9,
|
||||||
|
container=False
|
||||||
|
)
|
||||||
|
mic = gr.Audio(
|
||||||
|
sources=["microphone"],
|
||||||
|
type="filepath",
|
||||||
|
label="🎤 Record",
|
||||||
|
scale=1,
|
||||||
|
waveform_options={"show_controls": False}
|
||||||
|
)
|
||||||
|
print("✓ Message and record inputs created")
|
||||||
|
|
||||||
|
# Wire events
|
||||||
|
msg.submit(self.add_message, msg, [chat, msg])
|
||||||
|
print("✓ Message submit event wired")
|
||||||
|
|
||||||
|
mic.stop_recording(self.handle_voice_input, mic, [chat, mic])
|
||||||
|
print("✓ Voice input event wired")
|
||||||
|
|
||||||
|
# Tools section
|
||||||
|
with gr.Accordion("Tools", open=False):
|
||||||
|
|
||||||
|
gr.Markdown("### Code Analysis")
|
||||||
|
code = gr.Textbox(label="Code", lines=8)
|
||||||
|
lang = gr.Dropdown(
|
||||||
|
choices=["python", "javascript", "java"],
|
||||||
|
value="python",
|
||||||
|
label="Language"
|
||||||
|
)
|
||||||
|
analyze_btn = gr.Button("Analyze")
|
||||||
|
print("✓ Code analysis tools created")
|
||||||
|
|
||||||
|
analyze_btn.click(self.analyze, [code, lang], chat)
|
||||||
|
print("✓ Analyze button event wired")
|
||||||
|
|
||||||
|
print("✓ UI creation complete")
|
||||||
|
print("="*60 + "\n")
|
||||||
|
return app
|
||||||
|
|
||||||
|
def launch(self):
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print("Launching Gradio app...")
|
||||||
|
print("="*60)
|
||||||
|
app = self.create_ui()
|
||||||
|
print("Starting server on port 7862...")
|
||||||
|
print("="*60 + "\n")
|
||||||
|
app.launch(server_port=7862)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print("\n" + "#"*60)
|
||||||
|
print("# TECH ASSISTANT - SIMPLE UI")
|
||||||
|
print("#"*60 + "\n")
|
||||||
|
|
||||||
|
ui = SimpleUI()
|
||||||
|
ui.launch()
|
||||||
259
week2/community-contributions/salah/assistant.py
Normal file
259
week2/community-contributions/salah/assistant.py
Normal file
@@ -0,0 +1,259 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
from google import genai
|
||||||
|
from google.genai import types
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from openai import OpenAI
|
||||||
|
from pathlib import Path
|
||||||
|
import tempfile
|
||||||
|
import wave
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
class Assistant:
|
||||||
|
def __init__(self):
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print("Initializing Assistant...")
|
||||||
|
print("="*60)
|
||||||
|
|
||||||
|
openrouter_key = os.getenv('OPENAI_API_KEY')
|
||||||
|
gemini_key = os.getenv('GEMINI_API_KEY')
|
||||||
|
|
||||||
|
print(f"OpenRouter API Key: {openrouter_key[:20]}..." if openrouter_key else "OpenRouter API Key: NOT FOUND")
|
||||||
|
print(f"Gemini API Key: {gemini_key[:20]}..." if gemini_key else "Gemini API Key: NOT FOUND")
|
||||||
|
|
||||||
|
# OpenRouter client for text (GPT-4o-mini)
|
||||||
|
print("Setting up OpenRouter client...")
|
||||||
|
self.openrouter = OpenAI(
|
||||||
|
api_key=openrouter_key,
|
||||||
|
base_url="https://openrouter.ai/api/v1"
|
||||||
|
)
|
||||||
|
print("OpenRouter client ready")
|
||||||
|
|
||||||
|
# Gemini client for audio and images
|
||||||
|
print("Setting up Gemini client...")
|
||||||
|
self.gemini_client = genai.Client(api_key=gemini_key)
|
||||||
|
print("Gemini client ready (audio + images)")
|
||||||
|
|
||||||
|
self.text_model = "openai/gpt-4o-mini"
|
||||||
|
self.system_prompt = "You are a helpful technical assistant. Keep answers clear and practical."
|
||||||
|
self.stt_model = "gemini-2.0-flash-exp"
|
||||||
|
self.tts_model = "gemini-2.5-flash-preview-tts"
|
||||||
|
|
||||||
|
print(f"Text Model: {self.text_model}")
|
||||||
|
print(f"STT Model: {self.stt_model}")
|
||||||
|
print(f"TTS Model: {self.tts_model}")
|
||||||
|
|
||||||
|
def chat(self, message, history=[]):
|
||||||
|
print(f"[Chat] User: {message[:50]}...")
|
||||||
|
print(f"[Chat] History messages: {len(history)}")
|
||||||
|
print(f"[Chat] Model: {self.text_model}")
|
||||||
|
|
||||||
|
messages = [{"role": "system", "content": self.system_prompt}]
|
||||||
|
messages.extend(history)
|
||||||
|
messages.append({"role": "user", "content": message})
|
||||||
|
|
||||||
|
print(f"[Chat] Total messages to send: {len(messages)}")
|
||||||
|
print("[Chat] Calling OpenRouter API...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = self.openrouter.chat.completions.create(
|
||||||
|
model=self.text_model,
|
||||||
|
messages=messages,
|
||||||
|
extra_body={
|
||||||
|
"usage": {
|
||||||
|
"include": True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
reply = response.choices[0].message.content
|
||||||
|
print(f"[Chat] Response received")
|
||||||
|
print(f"[Chat] GPT-4o-mini: {len(reply)} chars")
|
||||||
|
print(f"[Chat] Preview: {reply[:100]}...")
|
||||||
|
|
||||||
|
# Print usage and cost
|
||||||
|
if hasattr(response, 'usage') and response.usage:
|
||||||
|
usage = response.usage
|
||||||
|
print(f"[Chat] Usage:")
|
||||||
|
print(f" - Prompt tokens: {usage.prompt_tokens}")
|
||||||
|
print(f" - Completion tokens: {usage.completion_tokens}")
|
||||||
|
print(f" - Total tokens: {usage.total_tokens}")
|
||||||
|
if hasattr(usage, 'cost') and usage.cost:
|
||||||
|
print(f" - Cost: ${usage.cost:.6f}")
|
||||||
|
|
||||||
|
print("-"*60 + "\n")
|
||||||
|
return reply
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[Error] ✗ API call failed: {e}")
|
||||||
|
print("-"*60 + "\n")
|
||||||
|
return f"Error: {str(e)}"
|
||||||
|
|
||||||
|
def analyze_code(self, code, language="python"):
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print(f"[Code] Analyzing {language} code...")
|
||||||
|
print(f"[Code] Code length: {len(code)} characters")
|
||||||
|
print(f"[Code] Lines: {len(code.splitlines())}")
|
||||||
|
print("="*60)
|
||||||
|
|
||||||
|
prompt = f"Analyze this {language} code for bugs and improvements:\n\n```{language}\n{code}\n```"
|
||||||
|
result = self.chat(prompt)
|
||||||
|
|
||||||
|
print("[Code] Analysis complete\n")
|
||||||
|
return result
|
||||||
|
|
||||||
|
def generate_image(self, description):
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print(f"[Image] Gemini generating: {description[:50]}...")
|
||||||
|
print(f"[Image] Model: gemini-2.0-flash-exp")
|
||||||
|
|
||||||
|
try:
|
||||||
|
prompt = f"Generate an image of: {description}. Make it clear and professional."
|
||||||
|
print("[Image] Calling Gemini API...")
|
||||||
|
response = self.gemini_client.models.generate_content(
|
||||||
|
model='gemini-2.0-flash-exp',
|
||||||
|
contents=prompt
|
||||||
|
)
|
||||||
|
print("[Image] Response received")
|
||||||
|
print(f"[Image] Result length: {len(response.text)} chars")
|
||||||
|
|
||||||
|
# Print usage and cost (Gemini 2.0 Flash: $0.30/1M input, $2.50/1M output)
|
||||||
|
if hasattr(response, 'usage_metadata'):
|
||||||
|
usage = response.usage_metadata
|
||||||
|
input_tokens = usage.prompt_token_count
|
||||||
|
output_tokens = usage.candidates_token_count
|
||||||
|
total_tokens = usage.total_token_count
|
||||||
|
cost = (input_tokens * 0.30 + output_tokens * 2.50) / 1_000_000
|
||||||
|
print(f"[Image] Usage:")
|
||||||
|
print(f" - Input tokens: {input_tokens}")
|
||||||
|
print(f" - Output tokens: {output_tokens}")
|
||||||
|
print(f" - Total tokens: {total_tokens}")
|
||||||
|
print(f" - Cost: ${cost:.6f}")
|
||||||
|
|
||||||
|
print("="*60 + "\n")
|
||||||
|
return response.text
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[Error] ✗ Image generation failed: {e}")
|
||||||
|
print("="*60 + "\n")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def speech_to_text(self, audio_file_path):
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print("[STT] Gemini speech-to-text...")
|
||||||
|
print(f"[STT] Audio file: {audio_file_path}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
print("[STT] Uploading audio file to Gemini...")
|
||||||
|
audio_file = self.gemini_client.files.upload(file=audio_file_path)
|
||||||
|
print(f"[STT] File uploaded: {audio_file.name}")
|
||||||
|
|
||||||
|
print("[STT] Transcribing with Gemini...")
|
||||||
|
prompt = "Generate a transcript of the speech."
|
||||||
|
|
||||||
|
response = self.gemini_client.models.generate_content(
|
||||||
|
model=self.stt_model,
|
||||||
|
contents=[prompt, audio_file]
|
||||||
|
)
|
||||||
|
text = response.text.strip()
|
||||||
|
|
||||||
|
print(f"[STT] Transcribed: {text[:100]}...")
|
||||||
|
print(f"[STT] Length: {len(text)} chars")
|
||||||
|
|
||||||
|
# Print usage and cost (Flash Native Audio Input: $3.00/1M tokens)
|
||||||
|
if hasattr(response, 'usage_metadata'):
|
||||||
|
usage = response.usage_metadata
|
||||||
|
input_tokens = usage.prompt_token_count
|
||||||
|
output_tokens = usage.candidates_token_count
|
||||||
|
total_tokens = usage.total_token_count
|
||||||
|
# Audio input is $3.00/1M, text output is $2.50/1M
|
||||||
|
cost = (input_tokens * 3.00 + output_tokens * 2.50) / 1_000_000
|
||||||
|
print(f"[STT] Usage:")
|
||||||
|
print(f" - Input tokens (audio): {input_tokens}")
|
||||||
|
print(f" - Output tokens (text): {output_tokens}")
|
||||||
|
print(f" - Total tokens: {total_tokens}")
|
||||||
|
print(f" - Cost: ${cost:.6f}")
|
||||||
|
|
||||||
|
print("="*60 + "\n")
|
||||||
|
|
||||||
|
return text
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[Error] ✗ STT failed: {e}")
|
||||||
|
print(f"[Error] Full error: {type(e).__name__}: {str(e)}")
|
||||||
|
print("="*60 + "\n")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def text_to_speech(self, text):
|
||||||
|
print("\n" + "="*60)
|
||||||
|
print(f"[TTS] Gemini text-to-speech...")
|
||||||
|
print(f"[TTS] Text: {text[:50]}...")
|
||||||
|
print(f"[TTS] Length: {len(text)} chars")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Limit text length for TTS
|
||||||
|
text_to_speak = text[:500] if len(text) > 500 else text
|
||||||
|
|
||||||
|
print("[TTS] Generating audio with Gemini TTS model...")
|
||||||
|
response = self.gemini_client.models.generate_content(
|
||||||
|
model=self.tts_model,
|
||||||
|
contents=f"Say cheerfully: {text_to_speak}",
|
||||||
|
config=types.GenerateContentConfig(
|
||||||
|
response_modalities=["AUDIO"],
|
||||||
|
speech_config=types.SpeechConfig(
|
||||||
|
voice_config=types.VoiceConfig(
|
||||||
|
prebuilt_voice_config=types.PrebuiltVoiceConfig(
|
||||||
|
voice_name='Kore',
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
print("[TTS] Audio generated, converting to WAV...")
|
||||||
|
|
||||||
|
# Extract raw PCM audio data
|
||||||
|
pcm_data = response.candidates[0].content.parts[0].inline_data.data
|
||||||
|
print(f"[TTS] Raw PCM size: {len(pcm_data)} bytes")
|
||||||
|
|
||||||
|
# Print usage and cost (2.5 Flash Preview TTS: $10.00/1M audio output tokens)
|
||||||
|
if hasattr(response, 'usage_metadata'):
|
||||||
|
usage = response.usage_metadata
|
||||||
|
input_tokens = usage.prompt_token_count
|
||||||
|
output_tokens = usage.candidates_token_count
|
||||||
|
total_tokens = usage.total_token_count
|
||||||
|
# Text input is $0.30/1M, audio output is $10.00/1M
|
||||||
|
cost = (input_tokens * 0.30 + output_tokens * 10.00) / 1_000_000
|
||||||
|
print(f"[TTS] Usage:")
|
||||||
|
print(f" - Input tokens (text): {input_tokens}")
|
||||||
|
print(f" - Output tokens (audio): {output_tokens}")
|
||||||
|
print(f" - Total tokens: {total_tokens}")
|
||||||
|
print(f" - Cost: ${cost:.6f}")
|
||||||
|
|
||||||
|
# Create WAV file with proper headers
|
||||||
|
# Gemini TTS outputs: 24kHz sample rate, mono, 16-bit PCM
|
||||||
|
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
|
||||||
|
|
||||||
|
with wave.open(temp_file.name, 'wb') as wav_file:
|
||||||
|
wav_file.setnchannels(1) # Mono
|
||||||
|
wav_file.setsampwidth(2) # 16-bit = 2 bytes
|
||||||
|
wav_file.setframerate(24000) # 24kHz
|
||||||
|
wav_file.writeframes(pcm_data)
|
||||||
|
|
||||||
|
temp_file.close()
|
||||||
|
|
||||||
|
print(f"[TTS] WAV file saved: {temp_file.name}")
|
||||||
|
print("="*60 + "\n")
|
||||||
|
return temp_file.name
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[Error] ✗ TTS failed: {e}")
|
||||||
|
print(f"[Error] Full error: {type(e).__name__}: {str(e)}")
|
||||||
|
print("="*60 + "\n")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
assistant = Assistant()
|
||||||
|
|
||||||
|
# Test it
|
||||||
|
response = assistant.chat("What is Python?")
|
||||||
|
print(f"\nResponse: {response}")
|
||||||
4
week2/community-contributions/salah/requirements.txt
Normal file
4
week2/community-contributions/salah/requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
openai>=1.3.0
|
||||||
|
gradio>=4.0.0
|
||||||
|
python-dotenv>=1.0.0
|
||||||
|
google-genai>=0.3.0
|
||||||
Reference in New Issue
Block a user