diff --git a/week2/community-contributions/salah/.env.example b/week2/community-contributions/salah/.env.example new file mode 100644 index 0000000..bbaf1a0 --- /dev/null +++ b/week2/community-contributions/salah/.env.example @@ -0,0 +1,2 @@ +OPENAI_API_KEY=sk-or-v1-openai-api-key +GEMINI_API_KEY=AI-gemini-api-key diff --git a/week2/community-contributions/salah/app.py b/week2/community-contributions/salah/app.py new file mode 100644 index 0000000..0f856d9 --- /dev/null +++ b/week2/community-contributions/salah/app.py @@ -0,0 +1,213 @@ +import gradio as gr +from simple_assistant import Assistant + +class SimpleUI: + def __init__(self): + print("\n" + "="*60) + print("Starting up...") + print("="*60) + self.assistant = Assistant() + self.history = [] # Text history for API + self.display_history = [] # Display history with audio for chat UI + self.audio_enabled = True + print("UI initialized") + print("Audio features: Gemini STT + TTS") + print("="*60 + "\n") + + def add_message(self, msg): + print("\n" + ">"*60) + print(f"[UI] New message: {msg[:50]}...") + + if not msg.strip(): + print("[UI] Empty message, ignoring") + print(">"*60 + "\n") + return self.display_history, "" + + print(f"[UI] Adding to history (current: {len(self.history)} messages)") + # Add to API history (text only) + self.history.append({"role": "user", "content": msg}) + # Add to display history + self.display_history.append({"role": "user", "content": msg}) + + print("[UI] Getting AI response...") + response = self.assistant.chat(msg, self.history) + + print(f"[UI] Adding response to history") + # Add to API history (text only) + self.history.append({"role": "assistant", "content": response}) + # Add to display history + self.display_history.append({"role": "assistant", "content": response}) + print(f"[UI] Total history: {len(self.history)} messages") + + print(f"[UI] Returning {len(self.display_history)} messages to display") + print(">"*60 + "\n") + return self.display_history, "" + + def handle_voice_input(self, audio_file): + print("\n" + ">"*60) + print("[UI] Voice input received") + print(f"[UI] Audio file: {audio_file}") + + if not audio_file: + print("[UI] No audio file") + print(">"*60 + "\n") + return self.display_history, None + + # Transcribe + print("[UI] Transcribing with Gemini...") + text = self.assistant.speech_to_text(audio_file) + + if not text: + print("[UI] Transcription failed") + print(">"*60 + "\n") + error_msg = "Sorry, couldn't transcribe audio" + self.history.append({"role": "assistant", "content": error_msg}) + self.display_history.append({"role": "assistant", "content": error_msg}) + return self.display_history, None + + print(f"[UI] Transcribed: {text}") + + # Add to API history (text only) + self.history.append({"role": "user", "content": text}) + + # Add voice message to display history with audio file + self.display_history.append({ + "role": "user", + "content": { + "path": audio_file, + "alt_text": f"🎤 {text}" + } + }) + + # Get response + print("[UI] Getting AI response...") + response = self.assistant.chat(text, self.history) + + # Add text response to API history + self.history.append({"role": "assistant", "content": response}) + + # Generate audio response + print("[UI] Generating audio with Gemini TTS...") + audio_response = self.assistant.text_to_speech(response) + + if audio_response: + print(f"[UI] ✓ Audio response generated") + # Add response with audio to display history + self.display_history.append({ + "role": "assistant", + "content": { + "path": audio_response, + "alt_text": f"🔊 {response[:100]}..." + } + }) + else: + print(f"[UI] ⚠ No audio, text only") + self.display_history.append({"role": "assistant", "content": response}) + + print(f"[UI] Returning {len(self.display_history)} messages") + print(">"*60 + "\n") + + return self.display_history, None + + def analyze(self, code, lang): + print("\n" + ">"*60) + print(f"[UI] Code analysis request") + print(f"[UI] Language: {lang}") + print(f"[UI] Code length: {len(code)} chars") + + if not code.strip(): + print("[UI] Empty code, ignoring") + print(">"*60 + "\n") + return self.display_history + + print("[UI] Calling analyze_code...") + result = self.assistant.analyze_code(code, lang) + + print("[UI] Adding to history") + # Add to API history + self.history.append({"role": "user", "content": f"Analyze {lang} code"}) + self.history.append({"role": "assistant", "content": result}) + + # Add to display history + self.display_history.append({"role": "user", "content": f"Analyze {lang} code"}) + self.display_history.append({"role": "assistant", "content": result}) + + print(f"[UI] Returning {len(self.display_history)} messages") + print(">"*60 + "\n") + return self.display_history + + def create_ui(self): + print("\n" + "="*60) + print("Creating Gradio UI...") + print("="*60) + + with gr.Blocks() as app: + + gr.Markdown("# Tech Assistant") + gr.Markdown("**Voice-enabled**: Type or record audio messages") + + # Chat panel - shows all messages including audio + chat = gr.Chatbot(type="messages", height=500) + print("✓ Chatbot created") + + # Input area at bottom (like ChatGPT) + with gr.Row(): + msg = gr.Textbox( + label="Message", + placeholder="Type a message or record audio...", + scale=9, + container=False + ) + mic = gr.Audio( + sources=["microphone"], + type="filepath", + label="🎤 Record", + scale=1, + waveform_options={"show_controls": False} + ) + print("✓ Message and record inputs created") + + # Wire events + msg.submit(self.add_message, msg, [chat, msg]) + print("✓ Message submit event wired") + + mic.stop_recording(self.handle_voice_input, mic, [chat, mic]) + print("✓ Voice input event wired") + + # Tools section + with gr.Accordion("Tools", open=False): + + gr.Markdown("### Code Analysis") + code = gr.Textbox(label="Code", lines=8) + lang = gr.Dropdown( + choices=["python", "javascript", "java"], + value="python", + label="Language" + ) + analyze_btn = gr.Button("Analyze") + print("✓ Code analysis tools created") + + analyze_btn.click(self.analyze, [code, lang], chat) + print("✓ Analyze button event wired") + + print("✓ UI creation complete") + print("="*60 + "\n") + return app + + def launch(self): + print("\n" + "="*60) + print("Launching Gradio app...") + print("="*60) + app = self.create_ui() + print("Starting server on port 7862...") + print("="*60 + "\n") + app.launch(server_port=7862) + + +if __name__ == "__main__": + print("\n" + "#"*60) + print("# TECH ASSISTANT - SIMPLE UI") + print("#"*60 + "\n") + + ui = SimpleUI() + ui.launch() diff --git a/week2/community-contributions/salah/assistant.py b/week2/community-contributions/salah/assistant.py new file mode 100644 index 0000000..4862fac --- /dev/null +++ b/week2/community-contributions/salah/assistant.py @@ -0,0 +1,259 @@ +import os +import json +from google import genai +from google.genai import types +from dotenv import load_dotenv +from openai import OpenAI +from pathlib import Path +import tempfile +import wave + +load_dotenv() + +class Assistant: + def __init__(self): + print("\n" + "="*60) + print("Initializing Assistant...") + print("="*60) + + openrouter_key = os.getenv('OPENAI_API_KEY') + gemini_key = os.getenv('GEMINI_API_KEY') + + print(f"OpenRouter API Key: {openrouter_key[:20]}..." if openrouter_key else "OpenRouter API Key: NOT FOUND") + print(f"Gemini API Key: {gemini_key[:20]}..." if gemini_key else "Gemini API Key: NOT FOUND") + + # OpenRouter client for text (GPT-4o-mini) + print("Setting up OpenRouter client...") + self.openrouter = OpenAI( + api_key=openrouter_key, + base_url="https://openrouter.ai/api/v1" + ) + print("OpenRouter client ready") + + # Gemini client for audio and images + print("Setting up Gemini client...") + self.gemini_client = genai.Client(api_key=gemini_key) + print("Gemini client ready (audio + images)") + + self.text_model = "openai/gpt-4o-mini" + self.system_prompt = "You are a helpful technical assistant. Keep answers clear and practical." + self.stt_model = "gemini-2.0-flash-exp" + self.tts_model = "gemini-2.5-flash-preview-tts" + + print(f"Text Model: {self.text_model}") + print(f"STT Model: {self.stt_model}") + print(f"TTS Model: {self.tts_model}") + + def chat(self, message, history=[]): + print(f"[Chat] User: {message[:50]}...") + print(f"[Chat] History messages: {len(history)}") + print(f"[Chat] Model: {self.text_model}") + + messages = [{"role": "system", "content": self.system_prompt}] + messages.extend(history) + messages.append({"role": "user", "content": message}) + + print(f"[Chat] Total messages to send: {len(messages)}") + print("[Chat] Calling OpenRouter API...") + + try: + response = self.openrouter.chat.completions.create( + model=self.text_model, + messages=messages, + extra_body={ + "usage": { + "include": True + } + } + ) + reply = response.choices[0].message.content + print(f"[Chat] Response received") + print(f"[Chat] GPT-4o-mini: {len(reply)} chars") + print(f"[Chat] Preview: {reply[:100]}...") + + # Print usage and cost + if hasattr(response, 'usage') and response.usage: + usage = response.usage + print(f"[Chat] Usage:") + print(f" - Prompt tokens: {usage.prompt_tokens}") + print(f" - Completion tokens: {usage.completion_tokens}") + print(f" - Total tokens: {usage.total_tokens}") + if hasattr(usage, 'cost') and usage.cost: + print(f" - Cost: ${usage.cost:.6f}") + + print("-"*60 + "\n") + return reply + except Exception as e: + print(f"[Error] ✗ API call failed: {e}") + print("-"*60 + "\n") + return f"Error: {str(e)}" + + def analyze_code(self, code, language="python"): + print("\n" + "="*60) + print(f"[Code] Analyzing {language} code...") + print(f"[Code] Code length: {len(code)} characters") + print(f"[Code] Lines: {len(code.splitlines())}") + print("="*60) + + prompt = f"Analyze this {language} code for bugs and improvements:\n\n```{language}\n{code}\n```" + result = self.chat(prompt) + + print("[Code] Analysis complete\n") + return result + + def generate_image(self, description): + print("\n" + "="*60) + print(f"[Image] Gemini generating: {description[:50]}...") + print(f"[Image] Model: gemini-2.0-flash-exp") + + try: + prompt = f"Generate an image of: {description}. Make it clear and professional." + print("[Image] Calling Gemini API...") + response = self.gemini_client.models.generate_content( + model='gemini-2.0-flash-exp', + contents=prompt + ) + print("[Image] Response received") + print(f"[Image] Result length: {len(response.text)} chars") + + # Print usage and cost (Gemini 2.0 Flash: $0.30/1M input, $2.50/1M output) + if hasattr(response, 'usage_metadata'): + usage = response.usage_metadata + input_tokens = usage.prompt_token_count + output_tokens = usage.candidates_token_count + total_tokens = usage.total_token_count + cost = (input_tokens * 0.30 + output_tokens * 2.50) / 1_000_000 + print(f"[Image] Usage:") + print(f" - Input tokens: {input_tokens}") + print(f" - Output tokens: {output_tokens}") + print(f" - Total tokens: {total_tokens}") + print(f" - Cost: ${cost:.6f}") + + print("="*60 + "\n") + return response.text + except Exception as e: + print(f"[Error] ✗ Image generation failed: {e}") + print("="*60 + "\n") + return None + + def speech_to_text(self, audio_file_path): + print("\n" + "="*60) + print("[STT] Gemini speech-to-text...") + print(f"[STT] Audio file: {audio_file_path}") + + try: + print("[STT] Uploading audio file to Gemini...") + audio_file = self.gemini_client.files.upload(file=audio_file_path) + print(f"[STT] File uploaded: {audio_file.name}") + + print("[STT] Transcribing with Gemini...") + prompt = "Generate a transcript of the speech." + + response = self.gemini_client.models.generate_content( + model=self.stt_model, + contents=[prompt, audio_file] + ) + text = response.text.strip() + + print(f"[STT] Transcribed: {text[:100]}...") + print(f"[STT] Length: {len(text)} chars") + + # Print usage and cost (Flash Native Audio Input: $3.00/1M tokens) + if hasattr(response, 'usage_metadata'): + usage = response.usage_metadata + input_tokens = usage.prompt_token_count + output_tokens = usage.candidates_token_count + total_tokens = usage.total_token_count + # Audio input is $3.00/1M, text output is $2.50/1M + cost = (input_tokens * 3.00 + output_tokens * 2.50) / 1_000_000 + print(f"[STT] Usage:") + print(f" - Input tokens (audio): {input_tokens}") + print(f" - Output tokens (text): {output_tokens}") + print(f" - Total tokens: {total_tokens}") + print(f" - Cost: ${cost:.6f}") + + print("="*60 + "\n") + + return text + + except Exception as e: + print(f"[Error] ✗ STT failed: {e}") + print(f"[Error] Full error: {type(e).__name__}: {str(e)}") + print("="*60 + "\n") + return None + + def text_to_speech(self, text): + print("\n" + "="*60) + print(f"[TTS] Gemini text-to-speech...") + print(f"[TTS] Text: {text[:50]}...") + print(f"[TTS] Length: {len(text)} chars") + + try: + # Limit text length for TTS + text_to_speak = text[:500] if len(text) > 500 else text + + print("[TTS] Generating audio with Gemini TTS model...") + response = self.gemini_client.models.generate_content( + model=self.tts_model, + contents=f"Say cheerfully: {text_to_speak}", + config=types.GenerateContentConfig( + response_modalities=["AUDIO"], + speech_config=types.SpeechConfig( + voice_config=types.VoiceConfig( + prebuilt_voice_config=types.PrebuiltVoiceConfig( + voice_name='Kore', + ) + ) + ), + ) + ) + + print("[TTS] Audio generated, converting to WAV...") + + # Extract raw PCM audio data + pcm_data = response.candidates[0].content.parts[0].inline_data.data + print(f"[TTS] Raw PCM size: {len(pcm_data)} bytes") + + # Print usage and cost (2.5 Flash Preview TTS: $10.00/1M audio output tokens) + if hasattr(response, 'usage_metadata'): + usage = response.usage_metadata + input_tokens = usage.prompt_token_count + output_tokens = usage.candidates_token_count + total_tokens = usage.total_token_count + # Text input is $0.30/1M, audio output is $10.00/1M + cost = (input_tokens * 0.30 + output_tokens * 10.00) / 1_000_000 + print(f"[TTS] Usage:") + print(f" - Input tokens (text): {input_tokens}") + print(f" - Output tokens (audio): {output_tokens}") + print(f" - Total tokens: {total_tokens}") + print(f" - Cost: ${cost:.6f}") + + # Create WAV file with proper headers + # Gemini TTS outputs: 24kHz sample rate, mono, 16-bit PCM + temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") + + with wave.open(temp_file.name, 'wb') as wav_file: + wav_file.setnchannels(1) # Mono + wav_file.setsampwidth(2) # 16-bit = 2 bytes + wav_file.setframerate(24000) # 24kHz + wav_file.writeframes(pcm_data) + + temp_file.close() + + print(f"[TTS] WAV file saved: {temp_file.name}") + print("="*60 + "\n") + return temp_file.name + + except Exception as e: + print(f"[Error] ✗ TTS failed: {e}") + print(f"[Error] Full error: {type(e).__name__}: {str(e)}") + print("="*60 + "\n") + return None + + +if __name__ == "__main__": + assistant = Assistant() + + # Test it + response = assistant.chat("What is Python?") + print(f"\nResponse: {response}") diff --git a/week2/community-contributions/salah/requirements.txt b/week2/community-contributions/salah/requirements.txt new file mode 100644 index 0000000..6557225 --- /dev/null +++ b/week2/community-contributions/salah/requirements.txt @@ -0,0 +1,4 @@ +openai>=1.3.0 +gradio>=4.0.0 +python-dotenv>=1.0.0 +google-genai>=0.3.0