From 54607433028348c993883c6807dca5a8afcce46e Mon Sep 17 00:00:00 2001 From: Umar Javed Date: Mon, 20 Oct 2025 16:50:57 +0500 Subject: [PATCH] Fixed the file path --- .../community-contributions/week2_day5.ipynb | 360 ++++++++++++++++++ 1 file changed, 360 insertions(+) create mode 100644 week2/community-contributions/week2_day5.ipynb diff --git a/week2/community-contributions/week2_day5.ipynb b/week2/community-contributions/week2_day5.ipynb new file mode 100644 index 0000000..d6fd12d --- /dev/null +++ b/week2/community-contributions/week2_day5.ipynb @@ -0,0 +1,360 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "* Running on local URL: http://127.0.0.1:7860\n", + "* To create a public link, set `share=True` in `launch()`.\n" + ] + }, + { + "data": { + "text/html": [ + "
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import os, io, base64, textwrap, sqlite3\n", + "from dotenv import load_dotenv\n", + "from openai import OpenAI\n", + "from PIL import Image, ImageDraw, ImageFont\n", + "import gradio as gr\n", + "\n", + "load_dotenv(override=True)\n", + "openai = OpenAI()\n", + "\n", + "DB = \"tools.db\"\n", + "\n", + "system_message = \"You are an expert assistant. Only use tools when explicitly requested by the user. Use create_pdf ONLY when the user specifically asks to create, generate, or make a PDF document. Use tts_voice ONLY when the user asks for audio or voice. For general questions and conversations, just respond normally without using any tools. Keep responses concise and well-formatted in markdown without code fences.\"\n", + "\n", + "def ensure_tools_db():\n", + " with sqlite3.connect(DB) as conn:\n", + " c = conn.cursor()\n", + " c.execute(\"CREATE TABLE IF NOT EXISTS tools (name TEXT PRIMARY KEY, description TEXT)\")\n", + " c.execute(\"INSERT OR IGNORE INTO tools(name, description) VALUES(?,?)\", (\"create_pdf\", \"Generate a PDF of the provided markdown text\"))\n", + " c.execute(\"INSERT OR IGNORE INTO tools(name, description) VALUES(?,?)\", (\"tts_voice\", \"Generate voice audio from the provided text\"))\n", + " conn.commit()\n", + " \n", + "tools_schema = [{\n", + " \"type\": \"function\",\n", + " \"function\": {\n", + " \"name\": \"create_pdf\",\n", + " \"description\": \"Generate a PDF from markdown text and return an identifier\",\n", + " \"parameters\": {\n", + " \"type\": \"object\",\n", + " \"properties\": {\n", + " \"title\": {\"type\": \"string\", \"description\": \"Document title\"},\n", + " \"markdown\": {\"type\": \"string\", \"description\": \"Markdown content to render\"}\n", + " },\n", + " \"required\": [\"title\", \"markdown\"],\n", + " \"additionalProperties\": False\n", + " }\n", + " }\n", + "},{\n", + " \"type\": \"function\",\n", + " \"function\": {\n", + " \"name\": \"tts_voice\",\n", + " \"description\": \"Synthesize speech audio from provided text\",\n", + " \"parameters\": {\n", + " \"type\": \"object\",\n", + " \"properties\": {\n", + " \"text\": {\"type\": \"string\", \"description\": \"Text to speak\"}\n", + " },\n", + " \"required\": [\"text\"],\n", + " \"additionalProperties\": False\n", + " }\n", + " }\n", + "}]\n", + "\n", + "def text_to_pdf_file(md_text, title=\"Document\"):\n", + " import tempfile\n", + " try:\n", + " from reportlab.lib.pagesizes import letter\n", + " from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer\n", + " from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle\n", + " from reportlab.lib.units import inch\n", + " \n", + " temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=\".pdf\")\n", + " doc = SimpleDocTemplate(temp_file.name, pagesize=letter)\n", + " styles = getSampleStyleSheet()\n", + " story = []\n", + " \n", + " title_style = ParagraphStyle('CustomTitle', parent=styles['Heading1'], fontSize=18, spaceAfter=30)\n", + " story.append(Paragraph(title, title_style))\n", + " story.append(Spacer(1, 12))\n", + " for line in md_text.split('\\n'):\n", + " if line.strip().startswith('# '):\n", + " story.append(Paragraph(line[2:], styles['Heading1']))\n", + " elif line.strip().startswith('## '):\n", + " story.append(Paragraph(line[3:], styles['Heading2']))\n", + " elif line.strip().startswith('### '):\n", + " story.append(Paragraph(line[4:], styles['Heading3']))\n", + " elif line.strip().startswith('- ') or line.strip().startswith('* '):\n", + " story.append(Paragraph(f\"• {line[2:]}\", styles['Normal']))\n", + " elif line.strip():\n", + " story.append(Paragraph(line, styles['Normal']))\n", + " else:\n", + " story.append(Spacer(1, 6))\n", + " \n", + " doc.build(story)\n", + " return temp_file.name\n", + " except ImportError:\n", + " lines = []\n", + " for paragraph in md_text.splitlines():\n", + " if not paragraph.strip():\n", + " lines.append(\"\")\n", + " continue\n", + " wrapped = textwrap.wrap(paragraph, width=90, replace_whitespace=False, drop_whitespace=False)\n", + " lines.extend(wrapped if wrapped else [\"\"])\n", + " pages = []\n", + " page_w, page_h = 1654, 2339\n", + " margin = 100\n", + " y = margin\n", + " font = ImageFont.load_default()\n", + " page = Image.new(\"RGB\", (page_w, page_h), \"white\")\n", + " draw = ImageDraw.Draw(page)\n", + " draw.text((margin, y-60), title, fill=(0,0,0), font=font)\n", + " for line in lines:\n", + " draw.text((margin, y), line, fill=(0,0,0), font=font)\n", + " y += 22\n", + " if y > page_h - margin:\n", + " pages.append(page)\n", + " page = Image.new(\"RGB\", (page_w, page_h), \"white\")\n", + " draw = ImageDraw.Draw(page)\n", + " y = margin\n", + " pages.append(page)\n", + " temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=\".pdf\")\n", + " pages[0].save(temp_file.name, format=\"PDF\", save_all=True, append_images=pages[1:] if len(pages)>1 else [])\n", + " return temp_file.name\n", + "\n", + "def tts_bytes(text):\n", + " if not text.strip():\n", + " return None\n", + " speech = openai.audio.speech.create(model=\"gpt-4o-mini-tts\", voice=\"alloy\", input=text[:2000])\n", + " import tempfile\n", + " temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=\".mp3\")\n", + " temp_file.write(speech.content)\n", + " temp_file.close()\n", + " return temp_file.name\n", + "\n", + "def build_pdf_data_url(pdf_bytes):\n", + " b64 = base64.b64encode(pdf_bytes).decode(\"utf-8\")\n", + " return f\"data:application/pdf;base64,{b64}\"\n", + "\n", + "state_storage = {\"last_pdf\": None, \"last_audio\": None}\n", + "\n", + "def handle_tool_calls(tool_calls):\n", + " results = []\n", + " pdf_preview_html = None\n", + " audio_tuple = None\n", + " for tc in tool_calls:\n", + " name = tc.function.name\n", + " args = tc.function.arguments\n", + " try:\n", + " import json as _json\n", + " parsed = _json.loads(args) if isinstance(args, str) else args\n", + " except Exception:\n", + " parsed = {}\n", + " if name == \"create_pdf\":\n", + " title = parsed.get(\"title\", \"Document\")\n", + " markdown = parsed.get(\"markdown\", \"\")\n", + " pdf_file = text_to_pdf_file(markdown, title=title)\n", + " state_storage[\"last_pdf\"] = pdf_file\n", + " with open(pdf_file, \"rb\") as f:\n", + " pdf_bytes = f.read()\n", + " pdf_url = build_pdf_data_url(pdf_bytes)\n", + " pdf_preview_html = f\"\"\n", + " results.append({\"role\": \"tool\", \"content\": \"PDF created\", \"tool_call_id\": tc.id})\n", + " elif name == \"tts_voice\":\n", + " text = parsed.get(\"text\", \"\")\n", + " audio_file = tts_bytes(text)\n", + " state_storage[\"last_audio\"] = audio_file\n", + " results.append({\"role\": \"tool\", \"content\": \"Audio generated\", \"tool_call_id\": tc.id})\n", + " return results, pdf_preview_html, None\n", + "\n", + "def build_messages(history, user_text, base_doc_text):\n", + " msgs = [{\"role\": \"system\", \"content\": system_message}]\n", + " \n", + " if base_doc_text:\n", + " msgs.append({\"role\": \"system\", \"content\": f\"Context Document:\\n{base_doc_text}\\n\\nUse this document as reference for answering questions.\"})\n", + " \n", + " msgs.extend([{\"role\": h[\"role\"], \"content\": h[\"content\"]} for h in history])\n", + " msgs.append({\"role\": \"user\", \"content\": user_text})\n", + " return msgs\n", + "\n", + "ensure_tools_db()\n", + "\n", + "with gr.Blocks(theme=gr.themes.Soft(), css=\"\"\"\n", + ".gradio-container{max-width:1200px;margin:auto}\n", + "\"\"\") as demo:\n", + " gr.Markdown(\"# Document Tools: PDF and Voice\")\n", + " \n", + " with gr.Row():\n", + " with gr.Column(scale=2):\n", + " chatbot = gr.Chatbot(height=500, type=\"messages\", value=[{\"role\":\"assistant\",\"content\":\"Hello! How can I assist you today?\"}])\n", + " with gr.Row():\n", + " user_msg = gr.Textbox(placeholder=\"Type your message here...\", show_label=False, scale=4)\n", + " clear_btn = gr.Button(\"Clear\", scale=1)\n", + " \n", + " with gr.Column(scale=1):\n", + " file_input = gr.File(label=\"Upload Document\", file_types=[\".txt\", \".md\", \".docx\", \".pdf\"], type=\"filepath\")\n", + " voice_toggle = gr.Checkbox(label=\"Enable voice\", value=True)\n", + " voice_input = gr.Audio(label=\"Voice Input\", sources=[\"microphone\"], type=\"filepath\")\n", + " audio = gr.Audio(label=\"Voice Output\", autoplay=True)\n", + " file_pdf = gr.File(label=\"Download PDF\")\n", + " \n", + " pdf_iframe = gr.HTML(visible=True)\n", + "\n", + " def put_user(m, h):\n", + " return \"\", h + [{\"role\":\"user\", \"content\": m}]\n", + " \n", + " def process_voice_input(voice_file):\n", + " if voice_file is None:\n", + " return \"\"\n", + " try:\n", + " with open(voice_file, \"rb\") as f:\n", + " transcript = openai.audio.transcriptions.create(\n", + " model=\"whisper-1\",\n", + " file=f\n", + " )\n", + " return transcript.text\n", + " except Exception as e:\n", + " return f\"Error processing voice: {str(e)}\"\n", + "\n", + " def extract_text_from_file(file_path):\n", + " if not file_path:\n", + " return \"\"\n", + " \n", + " try:\n", + " file_ext = file_path.lower().split('.')[-1]\n", + " if file_ext in ['txt', 'md']:\n", + " with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:\n", + " content = f.read()\n", + " return content\n", + " elif file_ext == 'docx':\n", + " from docx import Document\n", + " doc = Document(file_path)\n", + " text = []\n", + " for paragraph in doc.paragraphs:\n", + " text.append(paragraph.text)\n", + " content = '\\n'.join(text)\n", + " return content\n", + " elif file_ext == 'pdf':\n", + " try:\n", + " import PyPDF2\n", + " text = []\n", + " with open(file_path, 'rb') as f:\n", + " pdf_reader = PyPDF2.PdfReader(f)\n", + " for page in pdf_reader.pages:\n", + " page_text = page.extract_text()\n", + " text.append(page_text)\n", + " content = '\\n'.join(text)\n", + " return content\n", + " except Exception:\n", + " try:\n", + " import fitz\n", + " doc = fitz.open(file_path)\n", + " text = []\n", + " for page in doc:\n", + " text.append(page.get_text())\n", + " content = '\\n'.join(text)\n", + " return content\n", + " except Exception:\n", + " return \"\"\n", + " else:\n", + " return \"\"\n", + " except Exception:\n", + " return \"\"\n", + "\n", + " def run_chat(history, m, file_path, allow_voice):\n", + " base_doc = extract_text_from_file(file_path)\n", + " msgs = build_messages(history, m, base_doc)\n", + " tools = tools_schema if allow_voice else [tools_schema[0]]\n", + " resp = openai.chat.completions.create(model=\"gpt-4.1-mini\", messages=msgs, tools=tools, stream=True)\n", + " partial = \"\"\n", + " for chunk in resp:\n", + " delta = (chunk.choices[0].delta.content or \"\") if chunk.choices[0].delta else \"\"\n", + " partial += delta\n", + " yield history + [{\"role\":\"assistant\",\"content\": partial}], None, None, \"\"\n", + "\n", + " msgs.append({\"role\":\"assistant\",\"content\": partial})\n", + " resp2 = openai.chat.completions.create(model=\"gpt-4.1-mini\", messages=msgs, tools=tools)\n", + " pdf_html = None\n", + " audio_out = None\n", + " while resp2.choices[0].finish_reason == \"tool_calls\":\n", + " message = resp2.choices[0].message\n", + " tool_results, pdf_html, audio_out = handle_tool_calls(message.tool_calls)\n", + " msgs.append({\"role\": message.role, \"content\": message.content, \"tool_calls\": message.tool_calls})\n", + " msgs.extend(tool_results)\n", + " resp2 = openai.chat.completions.create(model=\"gpt-4.1-mini\", messages=msgs, tools=tools)\n", + " final_reply = resp2.choices[0].message.content if resp2.choices[0].message.content else partial\n", + " history = history + [{\"role\":\"assistant\",\"content\": final_reply}]\n", + " \n", + " state_storage[\"last_audio\"] = None\n", + " if final_reply and allow_voice:\n", + " audio_file = tts_bytes(final_reply)\n", + " yield history, audio_file, state_storage[\"last_pdf\"], (pdf_html or \"\")\n", + " else:\n", + " yield history, None, state_storage[\"last_pdf\"], (pdf_html or \"\")\n", + "\n", + " user_msg.submit(put_user, inputs=[user_msg, chatbot], outputs=[user_msg, chatbot]).then(\n", + " run_chat, inputs=[chatbot, user_msg, file_input, voice_toggle], outputs=[chatbot, audio, file_pdf, pdf_iframe]\n", + " )\n", + " \n", + " voice_input.change(process_voice_input, inputs=voice_input, outputs=user_msg)\n", + "\n", + "\n", + " def clear_all():\n", + " state_storage[\"last_pdf\"] = None\n", + " state_storage[\"last_audio\"] = None\n", + " return [{\"role\":\"assistant\",\"content\":\"Hello! How can I assist you today?\"}], None, None, \"\"\n", + "\n", + " clear_btn.click(clear_all, outputs=[chatbot, audio, file_pdf, pdf_iframe])\n", + "\n", + "demo.launch(inbrowser=True)\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "base", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.5" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}