From ccfab49b8c9bcf7ed29f40c6b848e5821e0b766d Mon Sep 17 00:00:00 2001 From: Kunmeer-SyedMohamedHyder Date: Tue, 1 Jul 2025 19:47:05 +0530 Subject: [PATCH] Week 2 Exercise of FlightAI ChatBot --- week2/week2 EXERCISE.ipynb | 605 ++++++++++++++++++++++++++++++++++++- 1 file changed, 604 insertions(+), 1 deletion(-) diff --git a/week2/week2 EXERCISE.ipynb b/week2/week2 EXERCISE.ipynb index d97f5cb..f6c96ca 100644 --- a/week2/week2 EXERCISE.ipynb +++ b/week2/week2 EXERCISE.ipynb @@ -24,6 +24,609 @@ "id": "a07e7793-b8f5-44f4-aded-5562f633271a", "metadata": {}, "outputs": [], + "source": [ + "# Imports\n", + "\n", + "import os\n", + "import json\n", + "import base64\n", + "import logging\n", + "import gradio as gr\n", + "from PIL import Image\n", + "from io import BytesIO\n", + "from openai import OpenAI\n", + "from dotenv import load_dotenv\n", + "from IPython.display import Audio, display" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e879f6ae-b246-479d-8f81-94e47a9072ec", + "metadata": {}, + "outputs": [], + "source": [ + "# Initialization\n", + "logging.basicConfig(level=logging.INFO)\n", + "load_dotenv(override=True)\n", + "\n", + "openai_api_key = os.getenv('OPENAI_API_KEY')\n", + "if openai_api_key:\n", + " logging.info(f\"OpenAI API Key exists and begins {openai_api_key[:8]}\")\n", + "else:\n", + " logging.error(\"OpenAI API Key not set\")\n", + " \n", + "MODEL = \"gpt-4o-mini\"\n", + "openai = OpenAI()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d4455169-9e5e-4171-92e8-6f850a06f6e3", + "metadata": {}, + "outputs": [], + "source": [ + "system_message = (\n", + " \"You are a helpful assistant for an airline called FlightAI. \"\n", + " \"Always respond in a short, courteous sentence. \"\n", + " \"Provide accurate information only. \"\n", + " \"If you don’t know something, say so clearly. \"\n", + " \"Before booking a ticket, strictly follow this order: \"\n", + " \"1) Check if the destination is available, \"\n", + " \"2) Then check the ticket price, \"\n", + " \"3) Collect all neccessary details like name, destination and date of journey, \"\n", + " \"4) Only then proceed with the booking. \"\n", + " \"Always use the appropriate tools or APIs for each step before confirming a booking.\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4bab8e2c-e2b1-4421-a95b-7f1251670817", + "metadata": {}, + "outputs": [], + "source": [ + "# Dummy funcs that mimic the ticket booking behaviour\n", + "# Replace these will real funcs (that call APIs or make DB transactions) to actually book a ticket\n", + "\n", + "ticket_prices = {\n", + " \"london\": \"$799\",\n", + " \"paris\": \"$899\",\n", + " \"tokyo\": \"$1400\",\n", + " \"berlin\": \"$499\"\n", + "}\n", + "\n", + "def check_destination_availability(destination: str) -> dict:\n", + " \"\"\"\n", + " Check if the given destination is available in our ticketing system.\n", + " \n", + " Args:\n", + " destination (str): The name of the city.\n", + " \n", + " Returns:\n", + " dict: {\"available\": bool}\n", + " \"\"\"\n", + " logging.info(f\"Checking availability for destination: {destination}\")\n", + " \n", + " available = destination.lower() in ticket_prices\n", + " return {\"available\": available}\n", + "\n", + "\n", + "def fetch_ticket_price(destination_city: str) -> dict:\n", + " \"\"\"\n", + " Retrieve the ticket price for a given city.\n", + " \n", + " Args:\n", + " destination_city (str): The name of the destination city.\n", + " \n", + " Returns:\n", + " dict: {\"price\": str} or {\"price\": \"Unknown\"} if not found\n", + " \"\"\"\n", + " logging.info(f\"Retrieving price for destination: {destination_city}\")\n", + " \n", + " city = destination_city.lower()\n", + " price = ticket_prices.get(city, \"Unknown\")\n", + " \n", + " return {\"price\": price}\n", + "\n", + "\n", + "def book_ticket(name: str, destination_city: str, journey_date: str) -> dict:\n", + " \"\"\"\n", + " Book a ticket to a destination city for a given user and date.\n", + " \n", + " Args:\n", + " name (str): Name of the passenger.\n", + " destination_city (str): Destination city.\n", + " journey_date (str): Date of journey in YYYY-MM-DD format.\n", + " \n", + " Returns:\n", + " dict: Booking confirmation with name, city, price, and date, or error.\n", + " \"\"\"\n", + " logging.info(f\"Booking ticket for {name} to {destination_city} on {journey_date}\")\n", + " \n", + " city = destination_city.lower()\n", + "\n", + " if city not in ticket_prices:\n", + " logging.error(f\"City '{destination_city}' not found in ticket list.\")\n", + " return {\"error\": \"Destination not found.\"}\n", + "\n", + " price_info = fetch_ticket_price(destination_city)\n", + " \n", + " return {\n", + " \"name\": name,\n", + " \"destination_city\": destination_city.title(),\n", + " \"journey_date\": journey_date,\n", + " \"price\": price_info[\"price\"]\n", + " }\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "400f4592-2326-43f6-a921-fcd051c4f022", + "metadata": {}, + "outputs": [], + "source": [ + "destination_availability_tool = {\n", + " \"name\": \"check_destination_availability\",\n", + " \"description\": \"Check if tickets are available for the given destination city before proceeding with any booking or pricing inquiry.\",\n", + " \"parameters\": {\n", + " \"type\": \"object\",\n", + " \"properties\": {\n", + " \"destination\": {\n", + " \"type\": \"string\",\n", + " \"description\": \"The name of the destination city to check for availability.\"\n", + " }\n", + " },\n", + " \"required\": [\"destination\"],\n", + " \"additionalProperties\": False\n", + " }\n", + "}\n", + "\n", + "ticket_price_tool = {\n", + " \"name\": \"fetch_ticket_price\",\n", + " \"description\": (\n", + " \"Get the price of a return ticket to the specified destination city. \"\n", + " \"Use this after confirming that the destination is available, especially when the customer asks for the ticket price.\"\n", + " ),\n", + " \"parameters\": {\n", + " \"type\": \"object\",\n", + " \"properties\": {\n", + " \"destination_city\": {\n", + " \"type\": \"string\",\n", + " \"description\": \"The city for which the customer wants the ticket price.\"\n", + " }\n", + " },\n", + " \"required\": [\"destination_city\"],\n", + " \"additionalProperties\": False\n", + " }\n", + "}\n", + "\n", + "ticket_booking_tool = {\n", + " \"name\": \"book_ticket\",\n", + " \"description\": (\n", + " \"Book a ticket for the customer to the specified destination city on the given journey date. \"\n", + " \"Use only after availability and price have been checked.\"\n", + " ),\n", + " \"parameters\": {\n", + " \"type\": \"object\",\n", + " \"properties\": {\n", + " \"name\": {\n", + " \"type\": \"string\",\n", + " \"description\": \"Full name of the person booking the ticket.\"\n", + " },\n", + " \"destination_city\": {\n", + " \"type\": \"string\",\n", + " \"description\": \"The city that the customer wants to travel to.\"\n", + " },\n", + " \"journey_date\": {\n", + " \"type\": \"string\",\n", + " \"format\": \"date\",\n", + " \"description\": \"The journey date in YYYY-MM-DD format.\"\n", + " }\n", + " },\n", + " \"required\": [\"name\", \"destination_city\", \"journey_date\"],\n", + " \"additionalProperties\": False\n", + " }\n", + "}\n", + "\n", + "tools = [\n", + " {\"type\": \"function\", \"function\": destination_availability_tool},\n", + " {\"type\": \"function\", \"function\": ticket_price_tool},\n", + " {\"type\": \"function\", \"function\": ticket_booking_tool},\n", + "]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f02c17ba-14f2-41c4-b6a2-d1397405d368", + "metadata": {}, + "outputs": [], + "source": [ + "def handle_tool_call(message):\n", + " \"\"\"\n", + " Handles a single OpenAI tool call message and returns both the result\n", + " and a formatted tool response dictionary.\n", + " \n", + " Args:\n", + " message (object): An OpenAI message containing a tool call.\n", + " \n", + " Returns:\n", + " tuple: (result_dict, response_dict)\n", + " \"\"\"\n", + " tool_call = message.tool_calls[0]\n", + " function_name = tool_call.function.name\n", + " arguments = json.loads(tool_call.function.arguments)\n", + "\n", + " result = None\n", + "\n", + " logging.info(f\"Tool call received: {function_name} with arguments: {arguments}\")\n", + "\n", + " if function_name == \"check_destination_availability\":\n", + " result = check_destination_availability(**arguments)\n", + "\n", + " elif function_name == \"fetch_ticket_price\":\n", + " city = arguments.get(\"destination_city\")\n", + " price_info = fetch_ticket_price(city)\n", + " result = {\"destination_city\": city, \"price\": price_info[\"price\"]}\n", + "\n", + " elif function_name == \"book_ticket\":\n", + " result = book_ticket(**arguments)\n", + "\n", + " else:\n", + " logging.warning(\"Unrecognized tool function: %s\", function_name)\n", + " result = {\"error\": f\"Unknown function '{function_name}'\"}\n", + "\n", + " response = {\n", + " \"role\": \"tool\",\n", + " \"tool_call_id\": tool_call.id,\n", + " \"content\": json.dumps(result)\n", + " }\n", + "\n", + " return result, response" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "72c1a9e7-186c-4218-9edc-01814baec431", + "metadata": {}, + "outputs": [], + "source": [ + "def artist(city: str, style: str = \"vibrant pop-art\", size: str = \"1024x1024\") -> Image.Image:\n", + " \"\"\"\n", + " Generates a city-themed vacation image using DALL·E.\n", + "\n", + " Args:\n", + " city (str): Name of the city to visualize.\n", + " style (str): Artistic style for the image prompt.\n", + " size (str): Image resolution (e.g., \"1024x1024\").\n", + "\n", + " Returns:\n", + " Image.Image: A PIL Image object representing the generated image.\n", + "\n", + " Raises:\n", + " ValueError: If city name is empty.\n", + " RuntimeError: If image generation fails.\n", + " \"\"\"\n", + " if not city.strip():\n", + " raise ValueError(\"City name cannot be empty.\")\n", + "\n", + " prompt = (\n", + " f\"An image representing a vacation in {city}, \"\n", + " f\"showing iconic tourist attractions, cultural elements, and everything unique about {city}, \"\n", + " f\"rendered in a {style} style.\"\n", + " )\n", + "\n", + " logging.info(\"Generating image for city: %s with style: %s\", city, style)\n", + "\n", + " try:\n", + " response = openai.images.generate(\n", + " model=\"dall-e-3\",\n", + " prompt=prompt,\n", + " size=size,\n", + " n=1,\n", + " response_format=\"b64_json\",\n", + " )\n", + "\n", + " image_base64 = response.data[0].b64_json\n", + " image_data = base64.b64decode(image_base64)\n", + " logging.info(\"Image generation successful for %s\", city)\n", + "\n", + " return Image.open(BytesIO(image_data))\n", + "\n", + " except Exception as e:\n", + " logging.error(\"Failed to generate image for city '%s': %s\", city, str(e))\n", + " raise RuntimeError(f\"Image generation failed for city '{city}'\") from e" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fdf7c091-6c68-4af6-8197-c1456b36cedf", + "metadata": {}, + "outputs": [], + "source": [ + "def talker(message: str, output_filename: str = \"output_audio.mp3\", autoplay: bool = True) -> None:\n", + " \"\"\"\n", + " Converts a text message into speech using OpenAI TTS and plays the audio.\n", + "\n", + " Args:\n", + " message (str): The text to convert to speech.\n", + " output_filename (str): The filename to save the generated audio.\n", + " autoplay (bool): Whether to autoplay the audio in the notebook.\n", + "\n", + " Raises:\n", + " ValueError: If the message is empty.\n", + " RuntimeError: If the audio generation fails.\n", + " \"\"\"\n", + " if not message.strip():\n", + " raise ValueError(\"Message cannot be empty.\")\n", + "\n", + " logging.info(\"Generating speech for message: %s\", message)\n", + "\n", + " try:\n", + " response = openai.audio.speech.create(\n", + " model=\"tts-1\",\n", + " voice=\"alloy\",\n", + " input=message\n", + " )\n", + "\n", + " with open(output_filename, \"wb\") as f:\n", + " f.write(response.content)\n", + "\n", + " logging.info(\"Audio written to: %s\", output_filename)\n", + "\n", + " if autoplay:\n", + " display(Audio(output_filename, autoplay=True))\n", + "\n", + " except Exception as e:\n", + " logging.error(\"Failed to generate or play audio: %s\", str(e))\n", + " raise RuntimeError(\"Text-to-speech generation failed.\") from e" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "54568b4a-be8d-47a1-b924-03acdafef70e", + "metadata": {}, + "outputs": [], + "source": [ + "def translate(message, language):\n", + " \"\"\"\n", + " Translates the given text into the specified language using OpenAI Chat API.\n", + "\n", + " Args:\n", + " message (str): The text to be translated.\n", + " language (str): Target language for translation (e.g., 'French', 'Japanese').\n", + "\n", + " Returns:\n", + " str: Translated text.\n", + "\n", + " Raises:\n", + " ValueError: If input message or language is empty.\n", + " RuntimeError: If translation fails due to API or other issues.\n", + " \"\"\"\n", + " if not message.strip():\n", + " raise ValueError(\"Input message cannot be empty.\")\n", + " if not language.strip():\n", + " raise ValueError(\"Target language cannot be empty.\")\n", + "\n", + " logging.info(\"Translating to %s: %s\", language, message)\n", + "\n", + " messages = [\n", + " {\"role\": \"system\", \"content\": f\"You are a translation assistant. Translate everything the user says to {language}.\"},\n", + " {\"role\": \"user\", \"content\": message}\n", + " ]\n", + "\n", + " try:\n", + " response = openai.chat.completions.create(\n", + " model=MODEL,\n", + " messages=messages\n", + " )\n", + " translated = response.choices[0].message.content.strip()\n", + " logging.info(\"Translation successful.\")\n", + " return translated\n", + "\n", + " except Exception as e:\n", + " logging.error(\"Translation failed: %s\", str(e))\n", + " raise RuntimeError(\"Failed to translate message.\") from e" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8e6cf470-8ea0-43b2-bbcc-53c2432feb0d", + "metadata": {}, + "outputs": [], + "source": [ + "def transcribe_audio(audio_path):\n", + " \"\"\"\n", + " Transcribes an audio file using OpenAI's Whisper model.\n", + "\n", + " Args:\n", + " audio_path (str): Path to the audio file (e.g., .mp3, .wav).\n", + " model (str): OpenAI model for transcription (default: 'whisper-1').\n", + "\n", + " Returns:\n", + " str: Transcribed text from the audio file.\n", + "\n", + " Raises:\n", + " ValueError: If the path is invalid or the file does not exist.\n", + " RuntimeError: If the transcription fails.\n", + " \"\"\"\n", + " if not audio_path or not os.path.exists(audio_path):\n", + " raise ValueError(\"Invalid or missing audio file path.\")\n", + "\n", + " logging.info(\"Transcribing audio file: %s using model: whisper-1\", audio_path)\n", + "\n", + " try:\n", + " with open(audio_path, \"rb\") as f:\n", + " response = openai.audio.transcriptions.create(\n", + " model=\"whisper-1\",\n", + " file=f\n", + " )\n", + " transcript = response.text.strip()\n", + " logging.info(\"Transcription successful.\")\n", + " return transcript\n", + "\n", + " except Exception as e:\n", + " logging.error(\"Transcription failed: %s\", str(e))\n", + " raise RuntimeError(\"Failed to transcribe audio.\") from e" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3489656e-0f08-4d41-94b1-d902c93ca164", + "metadata": {}, + "outputs": [], + "source": [ + "def chat(history: list, language: str, translated_history: list, speaking_language: str) -> tuple:\n", + " \"\"\"\n", + " Handles a chat interaction including tool calls, image generation, translation, and TTS playback.\n", + "\n", + " Args:\n", + " history (list): List of previous conversation messages.\n", + " language (str): Target language for translation and TTS.\n", + "\n", + " Returns:\n", + " tuple: (updated history list, generated image if any, translated response string)\n", + " \"\"\"\n", + " messages = [{\"role\": \"system\", \"content\": system_message}] + history\n", + " image = None\n", + "\n", + " try:\n", + " # Initial assistant response\n", + " response = openai.chat.completions.create(model=MODEL, messages=messages, tools=tools)\n", + " choice = response.choices[0]\n", + "\n", + " # Handle tool calls if triggered\n", + " if choice.finish_reason == \"tool_calls\":\n", + " message = choice.message\n", + " result, tool_response = handle_tool_call(message)\n", + "\n", + " # Append tool-related messages\n", + " messages.append(message)\n", + " messages.append(tool_response)\n", + " logging.info(\"Tool call result: %s\", result)\n", + "\n", + " # Generate image if a booking was completed\n", + " if message.tool_calls[0].function.name == \"book_ticket\" and \"destination_city\" in result:\n", + " image = artist(result[\"destination_city\"])\n", + "\n", + " # Get final assistant response after tool execution\n", + " response = openai.chat.completions.create(model=MODEL, messages=messages)\n", + " choice = response.choices[0]\n", + "\n", + " reply = choice.message.content.strip()\n", + " history.append({\"role\": \"assistant\", \"content\": reply})\n", + "\n", + " # Translate and speak the reply\n", + " translated_reply = translate(reply, language)\n", + " translated_history.append({\"role\": \"assistant\", \"content\": translated_reply})\n", + "\n", + " if speaking_language == \"English\":\n", + " talker(reply)\n", + " else:\n", + " talker(translated_reply)\n", + "\n", + " return history, image, translated_history\n", + "\n", + " except Exception as e:\n", + " logging.error(\"Chat processing failed: %s\", str(e))\n", + " raise RuntimeError(\"Failed to complete chat interaction.\") from e" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f76acc68-726e-457f-88ab-99da75debde5", + "metadata": {}, + "outputs": [], + "source": [ + "force_dark_mode = \"\"\"\n", + "function refresh() {\n", + " const url = new URL(window.location);\n", + " if (url.searchParams.get('__theme') !== 'dark') {\n", + " url.searchParams.set('__theme', 'dark');\n", + " window.location.href = url.href;\n", + " }\n", + "}\n", + "\"\"\"\n", + "\n", + "with gr.Blocks(js=force_dark_mode) as ui:\n", + " with gr.Row():\n", + " gr.Markdown(\"### FlightAI Chat with Translation\")\n", + "\n", + " with gr.Row():\n", + " lang_dropdown = gr.Dropdown(\n", + " choices=[\"Spanish\", \"French\", \"German\", \"Japanese\", \"Hindi\"],\n", + " value=\"Spanish\",\n", + " label=\"Translate To\"\n", + " )\n", + " \n", + " speak_dropdown = gr.Dropdown(\n", + " choices=[\"English\", \"Selected Language\"],\n", + " value=\"English\",\n", + " label=\"Speak out in\"\n", + " )\n", + " \n", + " with gr.Row():\n", + " chatbot = gr.Chatbot(height=500, type=\"messages\", label=\"Chat History\")\n", + " translated_chatbot = gr.Chatbot(height=500, type=\"messages\", label=\"Translated Chat\")\n", + " image_output = gr.Image(height=500)\n", + "\n", + " with gr.Row():\n", + " entry = gr.Textbox(label=\"Chat with our AI Assistant:\")\n", + " audio_input = gr.Audio(sources=\"microphone\", type=\"filepath\", label=\"Or speak to the assistant\")\n", + "\n", + " with gr.Row():\n", + " clear = gr.Button(\"Clear\")\n", + "\n", + " def do_entry(message, history, audio, translated_history, language):\n", + " if audio:\n", + " message = transcribe_audio(audio)\n", + "\n", + " if message:\n", + " history += [{\"role\": \"user\", \"content\": message}]\n", + " translated_history += [{\"role\": \"user\", \"content\": translate(message, language)}]\n", + " return \"\", history, None, translated_history\n", + "\n", + " entry.submit(\n", + " do_entry,\n", + " inputs=[entry, chatbot, audio_input, translated_chatbot, lang_dropdown],\n", + " outputs=[entry, chatbot, audio_input, translated_chatbot]\n", + " ).then(\n", + " chat,\n", + " inputs=[chatbot, lang_dropdown, translated_chatbot, speak_dropdown],\n", + " outputs=[chatbot, image_output, translated_chatbot]\n", + " )\n", + "\n", + " audio_input.change(\n", + " do_entry,\n", + " inputs=[entry, chatbot, audio_input, translated_chatbot, lang_dropdown],\n", + " outputs=[entry, chatbot, audio_input, translated_chatbot]\n", + " ).then(\n", + " chat,\n", + " inputs=[chatbot, lang_dropdown, translated_chatbot, speak_dropdown],\n", + " outputs=[chatbot, image_output, translated_chatbot]\n", + " )\n", + "\n", + " clear.click(lambda: [\"\", [], None, [], None], inputs=None, outputs=[entry, chatbot, audio_input, translated_chatbot, image_output], queue=False)\n", + "\n", + "ui.launch(inbrowser=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "58f97435-fa0d-45f7-b02f-4ac5f4901c53", + "metadata": {}, + "outputs": [], "source": [] } ], @@ -43,7 +646,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.11" + "version": "3.10.6" } }, "nbformat": 4,