{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "* Running on local URL: http://127.0.0.1:7860\n", "* To create a public link, set `share=True` in `launch()`.\n" ] }, { "data": { "text/html": [ "
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import os, io, base64, textwrap, sqlite3\n", "from dotenv import load_dotenv\n", "from openai import OpenAI\n", "from PIL import Image, ImageDraw, ImageFont\n", "import gradio as gr\n", "\n", "load_dotenv(override=True)\n", "openai = OpenAI()\n", "\n", "DB = \"tools.db\"\n", "\n", "system_message = \"You are an expert assistant. Only use tools when explicitly requested by the user. Use create_pdf ONLY when the user specifically asks to create, generate, or make a PDF document. Use tts_voice ONLY when the user asks for audio or voice. For general questions and conversations, just respond normally without using any tools. Keep responses concise and well-formatted in markdown without code fences.\"\n", "\n", "def ensure_tools_db():\n", " with sqlite3.connect(DB) as conn:\n", " c = conn.cursor()\n", " c.execute(\"CREATE TABLE IF NOT EXISTS tools (name TEXT PRIMARY KEY, description TEXT)\")\n", " c.execute(\"INSERT OR IGNORE INTO tools(name, description) VALUES(?,?)\", (\"create_pdf\", \"Generate a PDF of the provided markdown text\"))\n", " c.execute(\"INSERT OR IGNORE INTO tools(name, description) VALUES(?,?)\", (\"tts_voice\", \"Generate voice audio from the provided text\"))\n", " conn.commit()\n", " \n", "tools_schema = [{\n", " \"type\": \"function\",\n", " \"function\": {\n", " \"name\": \"create_pdf\",\n", " \"description\": \"Generate a PDF from markdown text and return an identifier\",\n", " \"parameters\": {\n", " \"type\": \"object\",\n", " \"properties\": {\n", " \"title\": {\"type\": \"string\", \"description\": \"Document title\"},\n", " \"markdown\": {\"type\": \"string\", \"description\": \"Markdown content to render\"}\n", " },\n", " \"required\": [\"title\", \"markdown\"],\n", " \"additionalProperties\": False\n", " }\n", " }\n", "},{\n", " \"type\": \"function\",\n", " \"function\": {\n", " \"name\": \"tts_voice\",\n", " \"description\": \"Synthesize speech audio from provided text\",\n", " \"parameters\": {\n", " \"type\": \"object\",\n", " \"properties\": {\n", " \"text\": {\"type\": \"string\", \"description\": \"Text to speak\"}\n", " },\n", " \"required\": [\"text\"],\n", " \"additionalProperties\": False\n", " }\n", " }\n", "}]\n", "\n", "def text_to_pdf_file(md_text, title=\"Document\"):\n", " import tempfile\n", " try:\n", " from reportlab.lib.pagesizes import letter\n", " from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer\n", " from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle\n", " from reportlab.lib.units import inch\n", " \n", " temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=\".pdf\")\n", " doc = SimpleDocTemplate(temp_file.name, pagesize=letter)\n", " styles = getSampleStyleSheet()\n", " story = []\n", " \n", " title_style = ParagraphStyle('CustomTitle', parent=styles['Heading1'], fontSize=18, spaceAfter=30)\n", " story.append(Paragraph(title, title_style))\n", " story.append(Spacer(1, 12))\n", " for line in md_text.split('\\n'):\n", " if line.strip().startswith('# '):\n", " story.append(Paragraph(line[2:], styles['Heading1']))\n", " elif line.strip().startswith('## '):\n", " story.append(Paragraph(line[3:], styles['Heading2']))\n", " elif line.strip().startswith('### '):\n", " story.append(Paragraph(line[4:], styles['Heading3']))\n", " elif line.strip().startswith('- ') or line.strip().startswith('* '):\n", " story.append(Paragraph(f\"• {line[2:]}\", styles['Normal']))\n", " elif line.strip():\n", " story.append(Paragraph(line, styles['Normal']))\n", " else:\n", " story.append(Spacer(1, 6))\n", " \n", " doc.build(story)\n", " return temp_file.name\n", " except ImportError:\n", " lines = []\n", " for paragraph in md_text.splitlines():\n", " if not paragraph.strip():\n", " lines.append(\"\")\n", " continue\n", " wrapped = textwrap.wrap(paragraph, width=90, replace_whitespace=False, drop_whitespace=False)\n", " lines.extend(wrapped if wrapped else [\"\"])\n", " pages = []\n", " page_w, page_h = 1654, 2339\n", " margin = 100\n", " y = margin\n", " font = ImageFont.load_default()\n", " page = Image.new(\"RGB\", (page_w, page_h), \"white\")\n", " draw = ImageDraw.Draw(page)\n", " draw.text((margin, y-60), title, fill=(0,0,0), font=font)\n", " for line in lines:\n", " draw.text((margin, y), line, fill=(0,0,0), font=font)\n", " y += 22\n", " if y > page_h - margin:\n", " pages.append(page)\n", " page = Image.new(\"RGB\", (page_w, page_h), \"white\")\n", " draw = ImageDraw.Draw(page)\n", " y = margin\n", " pages.append(page)\n", " temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=\".pdf\")\n", " pages[0].save(temp_file.name, format=\"PDF\", save_all=True, append_images=pages[1:] if len(pages)>1 else [])\n", " return temp_file.name\n", "\n", "def tts_bytes(text):\n", " if not text.strip():\n", " return None\n", " speech = openai.audio.speech.create(model=\"gpt-4o-mini-tts\", voice=\"alloy\", input=text[:2000])\n", " import tempfile\n", " temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=\".mp3\")\n", " temp_file.write(speech.content)\n", " temp_file.close()\n", " return temp_file.name\n", "\n", "def build_pdf_data_url(pdf_bytes):\n", " b64 = base64.b64encode(pdf_bytes).decode(\"utf-8\")\n", " return f\"data:application/pdf;base64,{b64}\"\n", "\n", "state_storage = {\"last_pdf\": None, \"last_audio\": None}\n", "\n", "def handle_tool_calls(tool_calls):\n", " results = []\n", " pdf_preview_html = None\n", " audio_tuple = None\n", " for tc in tool_calls:\n", " name = tc.function.name\n", " args = tc.function.arguments\n", " try:\n", " import json as _json\n", " parsed = _json.loads(args) if isinstance(args, str) else args\n", " except Exception:\n", " parsed = {}\n", " if name == \"create_pdf\":\n", " title = parsed.get(\"title\", \"Document\")\n", " markdown = parsed.get(\"markdown\", \"\")\n", " pdf_file = text_to_pdf_file(markdown, title=title)\n", " state_storage[\"last_pdf\"] = pdf_file\n", " with open(pdf_file, \"rb\") as f:\n", " pdf_bytes = f.read()\n", " pdf_url = build_pdf_data_url(pdf_bytes)\n", " pdf_preview_html = f\"\"\n", " results.append({\"role\": \"tool\", \"content\": \"PDF created\", \"tool_call_id\": tc.id})\n", " elif name == \"tts_voice\":\n", " text = parsed.get(\"text\", \"\")\n", " audio_file = tts_bytes(text)\n", " state_storage[\"last_audio\"] = audio_file\n", " results.append({\"role\": \"tool\", \"content\": \"Audio generated\", \"tool_call_id\": tc.id})\n", " return results, pdf_preview_html, None\n", "\n", "def build_messages(history, user_text, base_doc_text):\n", " msgs = [{\"role\": \"system\", \"content\": system_message}]\n", " \n", " if base_doc_text:\n", " msgs.append({\"role\": \"system\", \"content\": f\"Context Document:\\n{base_doc_text}\\n\\nUse this document as reference for answering questions.\"})\n", " \n", " msgs.extend([{\"role\": h[\"role\"], \"content\": h[\"content\"]} for h in history])\n", " msgs.append({\"role\": \"user\", \"content\": user_text})\n", " return msgs\n", "\n", "ensure_tools_db()\n", "\n", "with gr.Blocks(theme=gr.themes.Soft(), css=\"\"\"\n", ".gradio-container{max-width:1200px;margin:auto}\n", "\"\"\") as demo:\n", " gr.Markdown(\"# Document Tools: PDF and Voice\")\n", " \n", " with gr.Row():\n", " with gr.Column(scale=2):\n", " chatbot = gr.Chatbot(height=500, type=\"messages\", value=[{\"role\":\"assistant\",\"content\":\"Hello! How can I assist you today?\"}])\n", " with gr.Row():\n", " user_msg = gr.Textbox(placeholder=\"Type your message here...\", show_label=False, scale=4)\n", " clear_btn = gr.Button(\"Clear\", scale=1)\n", " \n", " with gr.Column(scale=1):\n", " file_input = gr.File(label=\"Upload Document\", file_types=[\".txt\", \".md\", \".docx\", \".pdf\"], type=\"filepath\")\n", " voice_toggle = gr.Checkbox(label=\"Enable voice\", value=True)\n", " voice_input = gr.Audio(label=\"Voice Input\", sources=[\"microphone\"], type=\"filepath\")\n", " audio = gr.Audio(label=\"Voice Output\", autoplay=True)\n", " file_pdf = gr.File(label=\"Download PDF\")\n", " \n", " pdf_iframe = gr.HTML(visible=True)\n", "\n", " def put_user(m, h):\n", " return \"\", h + [{\"role\":\"user\", \"content\": m}]\n", " \n", " def process_voice_input(voice_file):\n", " if voice_file is None:\n", " return \"\"\n", " try:\n", " with open(voice_file, \"rb\") as f:\n", " transcript = openai.audio.transcriptions.create(\n", " model=\"whisper-1\",\n", " file=f\n", " )\n", " return transcript.text\n", " except Exception as e:\n", " return f\"Error processing voice: {str(e)}\"\n", "\n", " def extract_text_from_file(file_path):\n", " if not file_path:\n", " return \"\"\n", " \n", " try:\n", " file_ext = file_path.lower().split('.')[-1]\n", " if file_ext in ['txt', 'md']:\n", " with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:\n", " content = f.read()\n", " return content\n", " elif file_ext == 'docx':\n", " from docx import Document\n", " doc = Document(file_path)\n", " text = []\n", " for paragraph in doc.paragraphs:\n", " text.append(paragraph.text)\n", " content = '\\n'.join(text)\n", " return content\n", " elif file_ext == 'pdf':\n", " try:\n", " import PyPDF2\n", " text = []\n", " with open(file_path, 'rb') as f:\n", " pdf_reader = PyPDF2.PdfReader(f)\n", " for page in pdf_reader.pages:\n", " page_text = page.extract_text()\n", " text.append(page_text)\n", " content = '\\n'.join(text)\n", " return content\n", " except Exception:\n", " try:\n", " import fitz\n", " doc = fitz.open(file_path)\n", " text = []\n", " for page in doc:\n", " text.append(page.get_text())\n", " content = '\\n'.join(text)\n", " return content\n", " except Exception:\n", " return \"\"\n", " else:\n", " return \"\"\n", " except Exception:\n", " return \"\"\n", "\n", " def run_chat(history, m, file_path, allow_voice):\n", " base_doc = extract_text_from_file(file_path)\n", " msgs = build_messages(history, m, base_doc)\n", " tools = tools_schema if allow_voice else [tools_schema[0]]\n", " resp = openai.chat.completions.create(model=\"gpt-4.1-mini\", messages=msgs, tools=tools, stream=True)\n", " partial = \"\"\n", " for chunk in resp:\n", " delta = (chunk.choices[0].delta.content or \"\") if chunk.choices[0].delta else \"\"\n", " partial += delta\n", " yield history + [{\"role\":\"assistant\",\"content\": partial}], None, None, \"\"\n", "\n", " msgs.append({\"role\":\"assistant\",\"content\": partial})\n", " resp2 = openai.chat.completions.create(model=\"gpt-4.1-mini\", messages=msgs, tools=tools)\n", " pdf_html = None\n", " audio_out = None\n", " while resp2.choices[0].finish_reason == \"tool_calls\":\n", " message = resp2.choices[0].message\n", " tool_results, pdf_html, audio_out = handle_tool_calls(message.tool_calls)\n", " msgs.append({\"role\": message.role, \"content\": message.content, \"tool_calls\": message.tool_calls})\n", " msgs.extend(tool_results)\n", " resp2 = openai.chat.completions.create(model=\"gpt-4.1-mini\", messages=msgs, tools=tools)\n", " final_reply = resp2.choices[0].message.content if resp2.choices[0].message.content else partial\n", " history = history + [{\"role\":\"assistant\",\"content\": final_reply}]\n", " \n", " state_storage[\"last_audio\"] = None\n", " if final_reply and allow_voice:\n", " audio_file = tts_bytes(final_reply)\n", " yield history, audio_file, state_storage[\"last_pdf\"], (pdf_html or \"\")\n", " else:\n", " yield history, None, state_storage[\"last_pdf\"], (pdf_html or \"\")\n", "\n", " user_msg.submit(put_user, inputs=[user_msg, chatbot], outputs=[user_msg, chatbot]).then(\n", " run_chat, inputs=[chatbot, user_msg, file_input, voice_toggle], outputs=[chatbot, audio, file_pdf, pdf_iframe]\n", " )\n", " \n", " voice_input.change(process_voice_input, inputs=voice_input, outputs=user_msg)\n", "\n", "\n", " def clear_all():\n", " state_storage[\"last_pdf\"] = None\n", " state_storage[\"last_audio\"] = None\n", " return [{\"role\":\"assistant\",\"content\":\"Hello! How can I assist you today?\"}], None, None, \"\"\n", "\n", " clear_btn.click(clear_all, outputs=[chatbot, audio, file_pdf, pdf_iframe])\n", "\n", "demo.launch(inbrowser=True)\n" ] } ], "metadata": { "kernelspec": { "display_name": "base", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.5" } }, "nbformat": 4, "nbformat_minor": 2 }