diff --git a/week4/community-contributions/philip/week4_EXERCISE.ipynb b/week4/community-contributions/philip/week4_EXERCISE.ipynb new file mode 100644 index 0000000..039e2e2 --- /dev/null +++ b/week4/community-contributions/philip/week4_EXERCISE.ipynb @@ -0,0 +1,686 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import json\n", + "import re\n", + "from typing import List, Dict, Optional\n", + "from dotenv import load_dotenv\n", + "from openai import OpenAI\n", + "import gradio as gr\n", + "from IPython.display import Markdown, display\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Groq API Key not set (optional)\n", + "OpenRouter API Key loaded (begins with sk-or-)\n" + ] + } + ], + "source": [ + "load_dotenv(override=True)\n", + "\n", + "# Ollama connection \n", + "ollama_url = \"http://localhost:11434/v1\"\n", + "ollama_client = OpenAI(api_key=\"ollama\", base_url=ollama_url)\n", + "\n", + "# Groq connection\n", + "groq_api_key = os.getenv('GROQ_API_KEY')\n", + "groq_url = \"https://api.groq.com/openai/v1\"\n", + "groq_client = None\n", + "if groq_api_key:\n", + " groq_client = OpenAI(api_key=groq_api_key, base_url=groq_url)\n", + " print(f\"Groq API Key loaded (begins with {groq_api_key[:4]})\")\n", + "else:\n", + " print(\"Groq API Key not set (optional)\")\n", + "\n", + "# OpenRouter connection\n", + "openrouter_api_key = os.getenv('OPENROUTER_API_KEY')\n", + "openrouter_url = \"https://openrouter.ai/api/v1\"\n", + "openrouter_client = None\n", + "if openrouter_api_key:\n", + " openrouter_client = OpenAI(api_key=openrouter_api_key, base_url=openrouter_url)\n", + " print(f\"OpenRouter API Key loaded (begins with {openrouter_api_key[:6]})\")\n", + "else:\n", + " print(\"OpenRouter API Key not set (optional)\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Configured 3 models\n", + "OpenRouter models available (perfect for limited storage demos!)\n" + ] + } + ], + "source": [ + "# Open-source code models configuration\n", + "MODELS = {}\n", + "\n", + "if groq_client:\n", + " MODELS.update({\n", + " \"gpt-oss-20b-groq\": {\n", + " \"name\": \"GPT-OSS-20B (Groq)\",\n", + " \"client\": groq_client,\n", + " \"model\": \"gpt-oss:20b\",\n", + " \"description\": \"Cloud\"\n", + " },\n", + " \"gpt-oss-120b-groq\": {\n", + " \"name\": \"GPT-OSS-120B (Groq)\",\n", + " \"client\": groq_client,\n", + " \"model\": \"openai/gpt-oss-120b\",\n", + " \"description\": \"Cloud - Larger GPT-OSS\"\n", + " },\n", + " \"qwen2.5-coder-32b-groq\": {\n", + " \"name\": \"Qwen2.5-Coder 32B (Groq)\",\n", + " \"client\": groq_client,\n", + " \"model\": \"qwen/qwen2.5-coder-32b-instruct\",\n", + " \"description\": \"Cloud\"\n", + " },\n", + " })\n", + "\n", + "# OpenRouter models\n", + "if openrouter_client:\n", + " MODELS.update({\n", + " \"qwen-2.5-coder-32b-openrouter\": {\n", + " \"name\": \"Qwen2.5-Coder 32B (OpenRouter)\",\n", + " \"client\": openrouter_client,\n", + " \"model\": \"qwen/qwen-2.5-coder-32b-instruct\",\n", + " \"description\": \"Cloud - Perfect for demos, 50 req/day free\"\n", + " },\n", + " \"gpt-oss-20b-groq\": {\n", + " \"name\": \"GPT-OSS-20B\",\n", + " \"client\": openrouter_client,\n", + " \"model\": \"openai/gpt-oss-20b\",\n", + " \"description\": \"Cloud - OpenAI's open model, excellent for code!\"\n", + " },\n", + " })\n", + "\n", + "print(f\"Configured {len(MODELS)} models\")\n", + "if openrouter_client:\n", + " print(\"OpenRouter models available (perfect for limited storage demos!)\")\n", + "if groq_client:\n", + " print(\"Groq models available (fast cloud inference!)\")\n", + "if \"qwen2.5-coder:7b\" in MODELS:\n", + " print(\"Ollama models available (unlimited local usage!)\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [], + "source": [ + "BUG_DETECTION_SYSTEM_PROMPT = \"\"\"You are an expert code reviewer specializing in finding bugs, security vulnerabilities, and logic errors.\n", + "\n", + "Your task is to analyze Python code and identify issues. Return ONLY a valid JSON array with this exact format:\n", + "[{\n", + " \"severity\": \"critical|high|medium|low\",\n", + " \"line\": number,\n", + " \"issue\": \"brief description of the problem\",\n", + " \"suggestion\": \"specific fix recommendation\"\n", + "}]\n", + "\n", + "Be thorough but concise. Focus on real bugs and security issues.\"\"\"\n", + "\n", + "IMPROVEMENTS_SYSTEM_PROMPT = \"\"\"You are a senior software engineer specializing in code quality and best practices.\n", + "\n", + "Analyze the Python code and suggest improvements for:\n", + "- Code readability and maintainability\n", + "- Performance optimizations\n", + "- Pythonic idioms and conventions\n", + "- Better error handling\n", + "\n", + "Return ONLY a JSON array:\n", + "[{\n", + " \"category\": \"readability|performance|style|error_handling\",\n", + " \"line\": number,\n", + " \"current\": \"current code snippet\",\n", + " \"improved\": \"improved code snippet\",\n", + " \"explanation\": \"why this is better\"\n", + "}]\n", + "\n", + "Only suggest meaningful improvements.\"\"\"\n", + "\n", + "TEST_GENERATION_SYSTEM_PROMPT = \"\"\"You are an expert in writing comprehensive unit tests.\n", + "\n", + "Generate pytest unit tests for the given Python code. Include:\n", + "- Test cases for normal operation\n", + "- Edge cases and boundary conditions\n", + "- Error handling tests\n", + "- Tests for any bugs that were identified\n", + "\n", + "Return ONLY Python code with pytest tests. Include the original code at the top if needed.\n", + "Do not include explanations or markdown formatting.\"\"\"\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "def extract_json_from_response(text: str) -> List[Dict]:\n", + " \"\"\"Extract JSON array from model response, handling markdown code blocks.\"\"\"\n", + " # Remove markdown code blocks\n", + " text = re.sub(r'```json\\n?', '', text)\n", + " text = re.sub(r'```\\n?', '', text)\n", + " \n", + " # Try to find JSON array\n", + " json_match = re.search(r'\\[\\s*\\{.*\\}\\s*\\]', text, re.DOTALL)\n", + " if json_match:\n", + " try:\n", + " return json.loads(json_match.group())\n", + " except json.JSONDecodeError:\n", + " pass\n", + " \n", + " # Fallback: try parsing entire response\n", + " try:\n", + " return json.loads(text.strip())\n", + " except json.JSONDecodeError:\n", + " return []\n", + "\n", + "def detect_bugs(code: str, model_key: str) -> Dict:\n", + " \"\"\"Detect bugs and security issues in code.\"\"\"\n", + " model_config = MODELS[model_key]\n", + " client = model_config[\"client\"]\n", + " model_name = model_config[\"model\"]\n", + " \n", + " user_prompt = f\"Analyze this Python code for bugs and security issues:\\n\\n```python\\n{code}\\n```\"\n", + " \n", + " try:\n", + " response = client.chat.completions.create(\n", + " model=model_name,\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": BUG_DETECTION_SYSTEM_PROMPT},\n", + " {\"role\": \"user\", \"content\": user_prompt}\n", + " ],\n", + " temperature=0.1\n", + " )\n", + " \n", + " content = response.choices[0].message.content\n", + " issues = extract_json_from_response(content)\n", + " \n", + " return {\n", + " \"model\": model_config[\"name\"],\n", + " \"issues\": issues,\n", + " \"raw_response\": content,\n", + " \"success\": True\n", + " }\n", + " except Exception as e:\n", + " return {\n", + " \"model\": model_config[\"name\"],\n", + " \"issues\": [],\n", + " \"error\": str(e),\n", + " \"success\": False\n", + " }\n", + "\n", + "def suggest_improvements(code: str, model_key: str) -> Dict:\n", + " \"\"\"Suggest code improvements and best practices.\"\"\"\n", + " model_config = MODELS[model_key]\n", + " client = model_config[\"client\"]\n", + " model_name = model_config[\"model\"]\n", + " \n", + " user_prompt = f\"Suggest improvements for this Python code:\\n\\n```python\\n{code}\\n```\"\n", + " \n", + " try:\n", + " response = client.chat.completions.create(\n", + " model=model_name,\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": IMPROVEMENTS_SYSTEM_PROMPT},\n", + " {\"role\": \"user\", \"content\": user_prompt}\n", + " ],\n", + " temperature=0.2\n", + " )\n", + " \n", + " content = response.choices[0].message.content\n", + " improvements = extract_json_from_response(content)\n", + " \n", + " return {\n", + " \"model\": model_config[\"name\"],\n", + " \"improvements\": improvements,\n", + " \"raw_response\": content,\n", + " \"success\": True\n", + " }\n", + " except Exception as e:\n", + " return {\n", + " \"model\": model_config[\"name\"],\n", + " \"improvements\": [],\n", + " \"error\": str(e),\n", + " \"success\": False\n", + " }\n", + "\n", + "def generate_tests(code: str, bugs: List[Dict], model_key: str) -> Dict:\n", + " \"\"\"Generate unit tests for the code.\"\"\"\n", + " model_config = MODELS[model_key]\n", + " client = model_config[\"client\"]\n", + " model_name = model_config[\"model\"]\n", + " \n", + " bugs_context = \"\"\n", + " if bugs:\n", + " bugs_context = f\"\\n\\nNote: The following bugs were identified:\\n\" + \"\\n\".join([f\"- Line {b.get('line', '?')}: {b.get('issue', '')}\" for b in bugs])\n", + " \n", + " user_prompt = f\"Generate pytest unit tests for this Python code:{bugs_context}\\n\\n```python\\n{code}\\n```\"\n", + " \n", + " try:\n", + " response = client.chat.completions.create(\n", + " model=model_name,\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": TEST_GENERATION_SYSTEM_PROMPT},\n", + " {\"role\": \"user\", \"content\": user_prompt}\n", + " ],\n", + " temperature=0.3\n", + " )\n", + " \n", + " content = response.choices[0].message.content\n", + " # Remove markdown code blocks if present\n", + " test_code = re.sub(r'```python\\n?', '', content)\n", + " test_code = re.sub(r'```\\n?', '', test_code)\n", + " \n", + " return {\n", + " \"model\": model_config[\"name\"],\n", + " \"test_code\": test_code.strip(),\n", + " \"raw_response\": content,\n", + " \"success\": True\n", + " }\n", + " except Exception as e:\n", + " return {\n", + " \"model\": model_config[\"name\"],\n", + " \"test_code\": \"\",\n", + " \"error\": str(e),\n", + " \"success\": False\n", + " }\n" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "def format_bugs_output(result: Dict) -> str:\n", + " \"\"\"Format bug detection results for display.\"\"\"\n", + " if not result.get(\"success\"):\n", + " return f\"**Error with {result['model']}:** {result.get('error', 'Unknown error')}\"\n", + " \n", + " issues = result.get(\"issues\", [])\n", + " if not issues:\n", + " return f\"โ **{result['model']}**: No issues found. Code looks good!\"\n", + " \n", + " output = [f\"**{result['model']}** - Found {len(issues)} issue(s):\\n\"]\n", + " \n", + " severity_order = {\"critical\": 0, \"high\": 1, \"medium\": 2, \"low\": 3}\n", + " sorted_issues = sorted(issues, key=lambda x: severity_order.get(x.get(\"severity\", \"low\"), 3))\n", + " \n", + " for issue in sorted_issues:\n", + " severity = issue.get(\"severity\", \"unknown\").upper()\n", + " line = issue.get(\"line\", \"?\")\n", + " issue_desc = issue.get(\"issue\", \"\")\n", + " suggestion = issue.get(\"suggestion\", \"\")\n", + " \n", + " severity_emoji = {\n", + " \"CRITICAL\": \"๐ด\",\n", + " \"HIGH\": \"๐ \",\n", + " \"MEDIUM\": \"๐ก\",\n", + " \"LOW\": \"๐ต\"\n", + " }.get(severity, \"โช\")\n", + " \n", + " output.append(f\"{severity_emoji} **{severity}** (Line {line}): {issue_desc}\")\n", + " if suggestion:\n", + " output.append(f\" ๐ก *Fix:* {suggestion}\")\n", + " output.append(\"\")\n", + " \n", + " return \"\\n\".join(output)\n", + "\n", + "def format_improvements_output(result: Dict) -> str:\n", + " \"\"\"Format improvement suggestions for display.\"\"\"\n", + " if not result.get(\"success\"):\n", + " return f\"**Error with {result['model']}:** {result.get('error', 'Unknown error')}\"\n", + " \n", + " improvements = result.get(\"improvements\", [])\n", + " if not improvements:\n", + " return f\"โ **{result['model']}**: Code follows best practices. No major improvements needed!\"\n", + " \n", + " output = [f\"**{result['model']}** - {len(improvements)} suggestion(s):\\n\"]\n", + " \n", + " for imp in improvements:\n", + " category = imp.get(\"category\", \"general\").replace(\"_\", \" \").title()\n", + " line = imp.get(\"line\", \"?\")\n", + " current = imp.get(\"current\", \"\")\n", + " improved = imp.get(\"improved\", \"\")\n", + " explanation = imp.get(\"explanation\", \"\")\n", + " \n", + " output.append(f\"\\n๐ **{category}** (Line {line}):\")\n", + " if current and improved:\n", + " output.append(f\" Before: `{current[:60]}{'...' if len(current) > 60 else ''}`\")\n", + " output.append(f\" After: `{improved[:60]}{'...' if len(improved) > 60 else ''}`\")\n", + " if explanation:\n", + " output.append(f\" ๐ก {explanation}\")\n", + " \n", + " return \"\\n\".join(output)\n", + "\n", + "def format_tests_output(result: Dict) -> str:\n", + " \"\"\"Format test generation results for display.\"\"\"\n", + " if not result.get(\"success\"):\n", + " return f\"**Error with {result['model']}:** {result.get('error', 'Unknown error')}\"\n", + " \n", + " test_code = result.get(\"test_code\", \"\")\n", + " if not test_code:\n", + " return f\"โ ๏ธ **{result['model']}**: No tests generated.\"\n", + " \n", + " return test_code\n" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "def review_code(code: str, model_key: str, include_tests: bool = True) -> tuple:\n", + " \"\"\"Main function to perform complete code review.\"\"\"\n", + " if not code.strip():\n", + " return \"Please provide code to review.\", \"\", \"\"\n", + " \n", + " # Detect bugs\n", + " bugs_result = detect_bugs(code, model_key)\n", + " bugs_output = format_bugs_output(bugs_result)\n", + " bugs_issues = bugs_result.get(\"issues\", [])\n", + " \n", + " # Suggest improvements\n", + " improvements_result = suggest_improvements(code, model_key)\n", + " improvements_output = format_improvements_output(improvements_result)\n", + " \n", + " # Generate tests\n", + " tests_output = \"\"\n", + " if include_tests:\n", + " tests_result = generate_tests(code, bugs_issues, model_key)\n", + " tests_output = format_tests_output(tests_result)\n", + " \n", + " return bugs_output, improvements_output, tests_output\n" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "def compare_models(code: str, model_keys: List[str]) -> str:\n", + " \"\"\"Compare multiple models on the same code.\"\"\"\n", + " if not code.strip():\n", + " return \"Please provide code to review.\"\n", + " \n", + " results = []\n", + " all_issues = []\n", + " \n", + " for model_key in model_keys:\n", + " result = detect_bugs(code, model_key)\n", + " results.append(result)\n", + " if result.get(\"success\"):\n", + " all_issues.extend(result.get(\"issues\", []))\n", + " \n", + " # Build comparison output\n", + " output = [\"# Model Comparison Results\\n\"]\n", + " \n", + " for result in results:\n", + " model_name = result[\"model\"]\n", + " issues = result.get(\"issues\", [])\n", + " success = result.get(\"success\", False)\n", + " \n", + " if success:\n", + " output.append(f\"\\n**{model_name}**: Found {len(issues)} issue(s)\")\n", + " if issues:\n", + " severity_counts = {}\n", + " for issue in issues:\n", + " sev = issue.get(\"severity\", \"low\")\n", + " severity_counts[sev] = severity_counts.get(sev, 0) + 1\n", + " output.append(f\" Breakdown: {dict(severity_counts)}\")\n", + " else:\n", + " output.append(f\"\\n**{model_name}**: Error - {result.get('error', 'Unknown')}\")\n", + " \n", + " # Find consensus issues (found by multiple models)\n", + " if len(results) > 1:\n", + " issue_signatures = {}\n", + " for result in results:\n", + " if result.get(\"success\"):\n", + " for issue in result.get(\"issues\", []):\n", + " # Create signature from line and issue description\n", + " sig = f\"{issue.get('line')}-{issue.get('issue', '')[:50]}\"\n", + " if sig not in issue_signatures:\n", + " issue_signatures[sig] = []\n", + " issue_signatures[sig].append(result[\"model\"])\n", + " \n", + " consensus = [sig for sig, models in issue_signatures.items() if len(models) > 1]\n", + " if consensus:\n", + " output.append(f\"\\n\\n **Consensus Issues**: {len(consensus)} issue(s) identified by multiple models\")\n", + " \n", + " return \"\\n\".join(output)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Gradio UI\n" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "* Running on local URL: http://127.0.0.1:7883\n", + "* To create a public link, set `share=True` in `launch()`.\n" + ] + }, + { + "data": { + "text/html": [ + "
" + ], + "text/plain": [ + "