Files
LLM_Engineering_OLD/week5/community-contributions/dkisselev-zz/Week5_Excerise_EmailTerminator.ipynb
Dmitry Kisselev 5afafd77f4 week5
2025-10-22 12:51:09 -07:00

1911 lines
89 KiB
Plaintext
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/github/dkisselev-zz/llm_engineering/blob/wk5-excersise/week5/community-contributions/dkisselev-zz/Week5_Excerise_EmailTerminator.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "markdown",
"source": [
"# Gmail Terminator\n",
"\n",
"## An Intelligent Email Management System\n",
"\n",
"This application uses RAG (Retrieval Augmented Generation) and LLMs to analyze your Gmail inbox, identify important topics and interests, and help you safely delete unimportant emails with archiving.\n",
"\n",
"### Features:\n",
"- **IMAP Authentication**: Secure app-specific password authentication\n",
"- **Vector Embeddings**: OpenAI or BERT/HuggingFace models\n",
"- **Topic Analysis**: LLM-powered identification of your interests\n",
"- **Category Counts**: See breakdown of email categories\n",
"- **Chat-Based Topics Updates**: Use chat to find specific topics of interest\n",
"- **Selective Deletion**: Choose specific emails to delete with checkboxes\n",
"- **Safe Deletion**: Automatic archiving before deletion\n",
"- **Testing Mode**: Process limited emails with debug output\n",
"\n",
"### Architecture:\n",
"1. Connect to Gmail via IMAP\n",
"2. Fetch and parse emails\n",
"3. Chunk text and create embeddings\n",
"4. Store vectors in ChromaDB\n",
"5. Use LLM to identify important topics\n",
"6. Classify emails as keep/delete\n",
"7. Select specific emails to delete\n",
"8. Archive and safely delete selected emails\n",
"\n",
"## Setup Instructions\n",
"\n",
"### IMAP with App-Specific Password\n",
"\n",
"1. **Enable 2-Factor Authentication** on your Google account (required for app passwords)\n",
"2. **Create App-Specific Password**\n",
" - Go to [Google Account Security](https://myaccount.google.com/security)\n",
" - Under \"2-Step Verification\", find \"App passwords\"\n",
" - Generate a new app password for \"Mail\"\n",
"3. **Store Credentials**\n",
" - **Google Colab**: Store as secrets named `EMAIL` and `IMAP_PASSWORD`\n",
" - **Local**: Add to `.env` file:\n",
" ```\n",
" EMAIL=your.email@gmail.com\n",
" IMAP_PASSWORD=your_16_char_app_password\n",
" ```\n",
"4. **Connect**: If credentials are stored, they will auto-populate in the UI"
],
"metadata": {
"id": "ANmiUlCxG4Bh"
},
"id": "ANmiUlCxG4Bh"
},
{
"cell_type": "markdown",
"source": [
"## Install and Setup"
],
"metadata": {
"id": "NzQyA5qmu5fv"
},
"id": "NzQyA5qmu5fv"
},
{
"cell_type": "code",
"execution_count": null,
"id": "6f9842a8",
"metadata": {
"id": "6f9842a8"
},
"outputs": [],
"source": [
"%pip install -U -q imapclient langchain langchain-openai langchain-chroma langchain-community langchain-core langchain-text-splitters langchain-huggingface chromadb sentence-transformers\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "737e1c9e",
"metadata": {
"id": "737e1c9e"
},
"outputs": [],
"source": [
"# Standard library imports\n",
"import os\n",
"import json\n",
"import base64\n",
"import zipfile\n",
"import shutil\n",
"from datetime import datetime\n",
"from collections import Counter\n",
"from typing import List, Dict, Optional, Tuple\n",
"from abc import ABC, abstractmethod\n",
"\n",
"# Third-party imports\n",
"import pandas as pd\n",
"import numpy as np\n",
"from tqdm import tqdm\n",
"from bs4 import BeautifulSoup\n",
"\n",
"# IMAP imports\n",
"import imaplib\n",
"import email\n",
"from email.header import decode_header\n",
"\n",
"# LangChain v1.0+ imports\n",
"from langchain_core.documents import Document\n",
"from langchain_core.messages import HumanMessage\n",
"from langchain_text_splitters import CharacterTextSplitter\n",
"from langchain_openai import OpenAIEmbeddings, ChatOpenAI\n",
"from langchain_chroma import Chroma\n",
"from langchain_huggingface import HuggingFaceEmbeddings\n",
"from langchain_core.callbacks import StdOutCallbackHandler\n",
"\n",
"# LLM APIs\n",
"from openai import OpenAI\n",
"\n",
"# HuggingFace\n",
"from huggingface_hub import login\n",
"\n",
"# Gradio\n",
"import gradio as gr\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "191dc787",
"metadata": {
"id": "191dc787"
},
"outputs": [],
"source": [
"def setup_api_keys():\n",
" try:\n",
" # Try Colab environment first\n",
" from google.colab import userdata\n",
" api_keys = {\n",
" 'openai': userdata.get('OPENAI_API_KEY'),\n",
" 'anthropic': userdata.get('ANTHROPIC_API_KEY'),\n",
" 'google': userdata.get('GOOGLE_API_KEY'),\n",
" 'hf_token': userdata.get('HF_TOKEN')\n",
" }\n",
" email = userdata.get('EMAIL')\n",
" password = userdata.get('IMAP_PASSWORD')\n",
" print(\"✅ Using Colab secrets\")\n",
" except:\n",
" # Fallback to local environment\n",
" from dotenv import load_dotenv\n",
" load_dotenv()\n",
" api_keys = {\n",
" 'openai': os.getenv('OPENAI_API_KEY'),\n",
" 'anthropic': os.getenv('ANTHROPIC_API_KEY'),\n",
" 'google': os.getenv('GOOGLE_API_KEY'),\n",
" 'hf_token': os.getenv('HF_TOKEN')\n",
" }\n",
"\n",
" email = os.getenv('EMAIL', '')\n",
" password = os.getenv('IMAP_PASSWORD', '')\n",
" print(\"✅ Using local .env file\")\n",
"\n",
" # Initialize API clients\n",
" anthropic_url = \"https://api.anthropic.com/v1/\"\n",
" gemini_url = \"https://generativelanguage.googleapis.com/v1beta/openai/\"\n",
"\n",
" clients = {}\n",
" if api_keys['openai']:\n",
" clients['openai'] = OpenAI(api_key=api_keys['openai'])\n",
" if api_keys['anthropic']:\n",
" clients['anthropic'] = OpenAI(api_key=api_keys['anthropic'], base_url=anthropic_url)\n",
" if api_keys['google']:\n",
" clients['google'] = OpenAI(api_key=api_keys['google'], base_url=gemini_url)\n",
" if api_keys['hf_token']:\n",
" login(api_keys['hf_token'])\n",
"\n",
" os.environ['OPENAI_API_KEY'] = api_keys['openai']\n",
" os.environ['ANTHROPIC_API_KEY'] = api_keys['anthropic']\n",
" os.environ['GOOGLE_API_KEY'] = api_keys['google']\n",
"\n",
" return api_keys, clients, email, password\n",
"\n",
"# Initialize API keys and clients\n",
"api_keys, clients, default_email, default_password = setup_api_keys()\n",
"\n",
"# Constants\n",
"MODEL_OPENAI = \"gpt-4o-mini\"\n",
"MODEL_GEMINI = \"gemini-2.5-pro\"\n",
"DB_NAME = \"email_vector_db\"\n"
]
},
{
"cell_type": "markdown",
"source": [
"##Helper Functions"
],
"metadata": {
"id": "hUiNY8_I8ac0"
},
"id": "hUiNY8_I8ac0"
},
{
"cell_type": "code",
"source": [
"def get_header_value(headers, name):\n",
" \"\"\"Get header value from email headers.\"\"\"\n",
" for header in headers:\n",
" if header['name'].lower() == name.lower():\n",
" return header['value']\n",
" return \"\""
],
"metadata": {
"id": "Y4MjoYtb8b4i"
},
"id": "Y4MjoYtb8b4i",
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"##Gmail Connection Classes"
],
"metadata": {
"id": "g7F4Xgw98jec"
},
"id": "g7F4Xgw98jec"
},
{
"cell_type": "code",
"source": [
"class GmailConnection(ABC):\n",
" \"\"\"Abstract base class for Gmail connections.\"\"\"\n",
"\n",
" def __init__(self):\n",
" self.connection = None\n",
" self.auth_info = None\n",
"\n",
" @abstractmethod\n",
" def connect(self) -> bool:\n",
" pass\n",
"\n",
" def fetch_emails(self, max_emails: Optional[int] = None) -> Tuple[List[Document], str]:\n",
" \"\"\"Fetch emails. Returns (documents, diagnostic_message).\"\"\"\n",
" pass\n",
"\n",
" @abstractmethod\n",
" def delete_emails(self, documents: List[Document]) -> Tuple[int, int]:\n",
" pass\n",
"\n",
" def get_auth_info(self) -> Dict:\n",
" return self.auth_info\n",
"\n",
" def is_connected(self) -> bool:\n",
" return self.connection is not None\n",
"\n",
"\n",
"class IMAPConnection(GmailConnection):\n",
" \"\"\"IMAP Gmail connection.\n",
"\n",
" IMPORTANT: For proper email deletion with Gmail IMAP, configure these settings:\n",
" 1. Go to Gmail Settings → Forwarding and POP/IMAP tab\n",
" 2. Under \"When I mark a message in IMAP as deleted\":\n",
" - Set to \"Auto-Expunge off - Wait for the client to update the server\"\n",
" 3. Under \"When a message is marked as deleted and expunged from the last visible IMAP folder\":\n",
" - Select \"Move the message to the Trash\"\n",
" 4. Make sure \"Trash\" label is set to \"Show in IMAP\" under Labels settings\n",
"\n",
" This ensures deleted emails are properly moved to Trash when expunged.\n",
" \"\"\"\n",
"\n",
" def __init__(self, email_address: str, app_password: str):\n",
" super().__init__()\n",
" self.email_address = email_address\n",
" self.app_password = app_password\n",
"\n",
" def connect(self) -> bool:\n",
" \"\"\"Authenticate with Gmail using IMAP.\"\"\"\n",
" try:\n",
" imaplib._MAXLINE = 10000000 # 10MB\n",
"\n",
" self.connection = imaplib.IMAP4_SSL(\"imap.gmail.com\", 993)\n",
" self.connection.login(self.email_address, self.app_password)\n",
"\n",
" status, messages = self.connection.select(\"INBOX\")\n",
" if status == \"OK\":\n",
" self.auth_info = {\n",
" 'email': self.email_address,\n",
" 'total_messages': int(messages[0]),\n",
" 'auth_method': 'IMAP'\n",
" }\n",
"\n",
" print(f\"✓ IMAP connected as: {self.email_address}\")\n",
" print(f\"✓ Total messages in INBOX: {self.auth_info['total_messages']:,}\")\n",
" return True\n",
" else:\n",
" print(f\"❌ Failed to select INBOX: {status}\")\n",
" return False\n",
"\n",
" except Exception as e:\n",
" print(f\"❌ IMAP authentication failed: {e}\")\n",
" print(\"Make sure you're using an app-specific password.\")\n",
" return False\n",
"\n",
" def fetch_emails(self, max_emails: Optional[int] = None) -> Tuple[List[Document], str]:\n",
" \"\"\"Fetch emails using IMAP with UIDs. Returns (documents, diagnostic_message).\"\"\"\n",
" if not self.connection:\n",
" raise RuntimeError(\"Not connected. Call connect() first.\")\n",
"\n",
" diagnostics = [] # Capture diagnostic messages\n",
"\n",
" try:\n",
" self.connection.select(\"INBOX\")\n",
"\n",
" status, messages = self.connection.uid('search', None, \"ALL\")\n",
"\n",
" if status != \"OK\":\n",
" msg = f\"❌ Search failed with status: {status}\"\n",
" diagnostics.append(msg)\n",
" return [], \"\\n\".join(diagnostics)\n",
"\n",
" msg_uids = messages[0].split()\n",
" diagnostics.append(f\"✓ Found {len(msg_uids)} message UIDs\")\n",
"\n",
" if not msg_uids:\n",
" diagnostics.append(\"❌ No message UIDs returned from search\")\n",
" return [], \"\\n\".join(diagnostics)\n",
"\n",
" if max_emails:\n",
" msg_uids = msg_uids[-max_emails:] # Get most recent\n",
" diagnostics.append(f\" → Limited to {len(msg_uids)} most recent emails\")\n",
"\n",
" diagnostics.append(f\"Fetching {len(msg_uids)} emails...\")\n",
" documents = []\n",
" errors = []\n",
"\n",
" for uid in tqdm(msg_uids, desc=\"Processing emails\"):\n",
" try:\n",
" # Fetch using UID to get both UID and the email content\n",
" status, msg_data = self.connection.uid('fetch', uid, \"(RFC822)\")\n",
" if status != \"OK\":\n",
" errors.append(f\"Fetch failed for UID {uid}: {status}\")\n",
" continue\n",
"\n",
" # Check if msg_data is valid\n",
" if not msg_data or not msg_data[0] or len(msg_data[0]) < 2:\n",
" errors.append(f\"Invalid msg_data for UID {uid}\")\n",
" continue\n",
"\n",
" email_message = email.message_from_bytes(msg_data[0][1])\n",
"\n",
" # Extract headers\n",
" subject = email_message.get(\"Subject\", \"\")\n",
" if subject:\n",
" decoded = decode_header(subject)[0]\n",
" if isinstance(decoded[0], bytes):\n",
" subject = decoded[0].decode(decoded[1] or 'utf-8', errors='ignore')\n",
" else:\n",
" subject = decoded[0]\n",
"\n",
" sender = email_message.get(\"From\", \"\")\n",
" recipient = email_message.get(\"To\", \"\")\n",
" date_str = email_message.get(\"Date\", \"\")\n",
"\n",
" # Extract body\n",
" body = \"\"\n",
" if email_message.is_multipart():\n",
" for part in email_message.walk():\n",
" if part.get_content_type() == \"text/plain\":\n",
" try:\n",
" payload = part.get_payload(decode=True)\n",
" if payload:\n",
" body = payload.decode('utf-8', errors='ignore')\n",
" break\n",
" except Exception as e:\n",
" continue\n",
" elif part.get_content_type() == \"text/html\" and not body:\n",
" try:\n",
" payload = part.get_payload(decode=True)\n",
" if payload:\n",
" html = payload.decode('utf-8', errors='ignore')\n",
" body = BeautifulSoup(html, 'html.parser').get_text()\n",
" except Exception as e:\n",
" continue\n",
" else:\n",
" try:\n",
" payload = email_message.get_payload(decode=True)\n",
" if payload:\n",
" body = payload.decode('utf-8', errors='ignore')\n",
" if email_message.get_content_type() == \"text/html\":\n",
" body = BeautifulSoup(body, 'html.parser').get_text()\n",
" else:\n",
" # Try without decoding for plain text\n",
" body = str(email_message.get_payload())\n",
" except Exception as e:\n",
" # Last resort: use subject as body\n",
" body = \"\"\n",
"\n",
" # Clean whitespace\n",
" if body:\n",
" body = ' '.join(body.split())\n",
"\n",
" # Use subject if body is empty or too short\n",
" if not body or len(body) < 10:\n",
" body = subject or \"No content\"\n",
"\n",
" content = f\"Subject: {subject}\\nFrom: {sender}\\nTo: {recipient}\\nDate: {date_str}\\n\\n{body}\"\n",
"\n",
" doc = Document(\n",
" page_content=content,\n",
" metadata={\n",
" 'uid': uid.decode(),\n",
" 'message_id': uid.decode(),\n",
" 'subject': subject,\n",
" 'sender': sender,\n",
" 'recipient': recipient,\n",
" 'date': date_str,\n",
" 'source': 'gmail_imap'\n",
" }\n",
" )\n",
" documents.append(doc)\n",
"\n",
" except Exception as e:\n",
" errors.append(f\"Error processing UID {uid}: {str(e)}\")\n",
" continue\n",
"\n",
" diagnostics.append(f\"✓ Successfully fetched {len(documents)} emails out of {len(msg_uids)} attempted\")\n",
"\n",
" if errors:\n",
" diagnostics.append(f\"\\n⚠ Encountered {len(errors)} errors:\")\n",
" # Show first 5 errors\n",
" for err in errors[:5]:\n",
" diagnostics.append(f\" • {err}\")\n",
" if len(errors) > 5:\n",
" diagnostics.append(f\" ... and {len(errors) - 5} more errors\")\n",
"\n",
" if len(documents) == 0 and len(msg_uids) > 0:\n",
" diagnostics.append(\"\\n⚠ WARNING: No documents created despite having UIDs\")\n",
"\n",
" return documents, \"\\n\".join(diagnostics)\n",
"\n",
" except Exception as error:\n",
" diagnostics.append(f\"❌ Fetch error: {error}\")\n",
" import traceback\n",
" diagnostics.append(f\"\\nTraceback:\\n{traceback.format_exc()}\")\n",
" return [], \"\\n\".join(diagnostics)\n",
"\n",
" def delete_emails(self, documents: List[Document]) -> Tuple[int, int]:\n",
" \"\"\"Delete emails using IMAP with proper UID handling for Gmail.\n",
"\n",
" This method works with Gmail's \"Auto-Expunge off\" setting by:\n",
" 1. Using UIDs instead of sequence numbers for reliable identification\n",
" 2. Marking emails with \\\\Deleted flag\n",
" 3. Explicitly calling EXPUNGE to permanently remove them\n",
" 4. Moving emails to [Gmail]/Trash (Gmail's default behavior)\n",
" \"\"\"\n",
" if not self.connection:\n",
" raise RuntimeError(\"Not connected. Call connect() first.\")\n",
"\n",
" if not documents:\n",
" return 0, 0\n",
"\n",
" successful, failed = 0, 0\n",
" print(f\"Deleting {len(documents)} emails via IMAP...\")\n",
"\n",
" try:\n",
" # Select INBOX in read-write mode (default)\n",
" status, response = self.connection.select(\"INBOX\")\n",
" if status != \"OK\":\n",
" print(f\"❌ Failed to select INBOX: {response}\")\n",
" return 0, len(documents)\n",
"\n",
" for doc in tqdm(documents, desc=\"Marking emails for deletion\"):\n",
" # Try to get UID first, fall back to message_id\n",
" uid = doc.metadata.get('uid') or doc.metadata.get('message_id')\n",
" if not uid:\n",
" print(f\"⚠️ No UID found for email: {doc.metadata.get('subject', 'Unknown')}\")\n",
" failed += 1\n",
" continue\n",
"\n",
" try:\n",
" # Convert to bytes if it's a string\n",
" if isinstance(uid, str):\n",
" uid = uid.encode()\n",
"\n",
" # Use UID STORE to mark the email as deleted\n",
" # This is more reliable than using sequence numbers\n",
" status, response = self.connection.uid('STORE', uid, '+FLAGS', '(\\\\Deleted)')\n",
"\n",
" if status == \"OK\":\n",
" successful += 1\n",
" else:\n",
" print(f\"⚠️ Failed to mark UID {uid.decode()}: {response}\")\n",
" failed += 1\n",
"\n",
" except Exception as e:\n",
" print(f\"❌ Error deleting UID {uid}: {e}\")\n",
" failed += 1\n",
"\n",
" # Expunge to permanently delete all messages marked as \\\\Deleted\n",
" # With Gmail's \"Auto-Expunge off\", this command is required\n",
" print(f\"\\n📤 Expunging {successful} deleted emails...\")\n",
" try:\n",
" status, response = self.connection.expunge()\n",
" if status == \"OK\":\n",
" print(f\"✓ Expunge successful: {response}\")\n",
" else:\n",
" print(f\"⚠️ Expunge response: {status} - {response}\")\n",
" except Exception as e:\n",
" print(f\"❌ Expunge error: {e}\")\n",
"\n",
" # Close and reselect to ensure changes are committed\n",
" try:\n",
" self.connection.close()\n",
" self.connection.select(\"INBOX\")\n",
" except:\n",
" pass # Not critical if this fails\n",
"\n",
" print(f\"\\n✓ Deletion complete: {successful} successful, {failed} failed\")\n",
" if successful > 0:\n",
" print(f\" With Gmail's settings, deleted emails should appear in [Gmail]/Trash\")\n",
"\n",
" return successful, failed\n",
"\n",
" except Exception as error:\n",
" print(f\"❌ Delete operation error: {error}\")\n",
" return successful, failed\n",
"\n",
"\n",
"def create_gmail_connection(email: str, password: str) -> GmailConnection:\n",
" \"\"\"Factory function to create Gmail connection.\"\"\"\n",
" if not email or not password:\n",
" raise ValueError(\"Email and password required for IMAP\")\n",
" return IMAPConnection(email, password)"
],
"metadata": {
"id": "Mv4m2UqV8i-b"
},
"id": "Mv4m2UqV8i-b",
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"##Vector Database Manager"
],
"metadata": {
"id": "WI1_7UiU8iy3"
},
"id": "WI1_7UiU8iy3"
},
{
"cell_type": "code",
"source": [
"class VectorDatabaseManager:\n",
" \"\"\"Manages vector database operations for email embeddings.\"\"\"\n",
"\n",
" def __init__(self, db_name: str = DB_NAME):\n",
" self.db_name = db_name\n",
" self.vectorstore = None\n",
" self.embeddings = None\n",
"\n",
" def create_embeddings(self, model_type: str = \"openai\"):\n",
" \"\"\"Create embedding function based on model type.\"\"\"\n",
" if model_type.lower() == \"openai\":\n",
" print(\"Using OpenAI embeddings...\")\n",
" self.embeddings = OpenAIEmbeddings()\n",
" elif model_type.lower() == \"bert\":\n",
" print(\"Using BERT (HuggingFace) embeddings...\")\n",
" self.embeddings = HuggingFaceEmbeddings(\n",
" model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n",
" )\n",
" else:\n",
" raise ValueError(f\"Unknown model type: {model_type}. Use 'openai' or 'bert'.\")\n",
"\n",
" return self.embeddings\n",
"\n",
" def create_vector_store(self, chunks: List[Document], recreate: bool = True):\n",
" \"\"\"Chroma vector store from document chunks.\"\"\"\n",
" if not self.embeddings:\n",
" raise RuntimeError(\"Call create_embeddings() first\")\n",
"\n",
" if recreate and os.path.exists(self.db_name):\n",
" print(f\"Deleting existing database: {self.db_name}\")\n",
" try:\n",
" Chroma(persist_directory=self.db_name, embedding_function=self.embeddings).delete_collection()\n",
" except:\n",
" pass\n",
"\n",
" print(f\"Creating vector store with {len(chunks)} chunks\")\n",
" self.vectorstore = Chroma.from_documents(\n",
" documents=chunks,\n",
" embedding=self.embeddings,\n",
" persist_directory=self.db_name\n",
" )\n",
"\n",
" count = self.vectorstore._collection.count()\n",
" print(f\"Vector store created with {count:,} documents\")\n",
"\n",
" return self.vectorstore\n",
"\n",
" def load_vector_store(self):\n",
" \"\"\"Load existing Chroma vector store.\"\"\"\n",
" if not self.embeddings:\n",
" raise RuntimeError(\"Call create_embeddings() first\")\n",
"\n",
" if not os.path.exists(self.db_name):\n",
" raise FileNotFoundError(f\"Vector store not found: {self.db_name}\")\n",
"\n",
" self.vectorstore = Chroma(\n",
" persist_directory=self.db_name,\n",
" embedding_function=self.embeddings\n",
" )\n",
"\n",
" count = self.vectorstore._collection.count()\n",
" print(f\"Loaded vector store with {count:,} documents\")\n",
"\n",
" return self.vectorstore\n",
"\n",
" def get_vectorstore(self):\n",
" \"\"\"Get the vectorstore instance.\"\"\"\n",
" return self.vectorstore"
],
"metadata": {
"id": "R1S1CEwf9VF7"
},
"id": "R1S1CEwf9VF7",
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"## Email Processor"
],
"metadata": {
"id": "LWIukSSu9vl_"
},
"id": "LWIukSSu9vl_"
},
{
"cell_type": "code",
"source": [
"class EmailProcessor:\n",
" \"\"\"Email processor\"\"\"\n",
"\n",
" def __init__(self):\n",
" self.documents = []\n",
" self.chunks = []\n",
" self.llm = None\n",
" self.topics = \"\"\n",
" self.classified_emails = {'keep': [], 'delete': []}\n",
" self.topic_to_emails = {}\n",
" self.email_to_topic = {}\n",
"\n",
" def chunk_documents(self, documents: List[Document], chunk_size: int = 1000, chunk_overlap: int = 200):\n",
" \"\"\"Chunk email documents.\"\"\"\n",
" text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)\n",
"\n",
" self.documents = documents\n",
" self.chunks = text_splitter.split_documents(documents)\n",
" print(f\"Created {len(self.chunks)} chunks from {len(documents)} documents\")\n",
" return self.chunks\n",
"\n",
" def get_statistics(self, documents: List[Document]) -> Dict:\n",
" \"\"\"Calculate statistics.\"\"\"\n",
" if not documents:\n",
" return {}\n",
"\n",
" senders = [doc.metadata.get('sender', '') for doc in documents]\n",
" total_chars = sum(len(doc.page_content) for doc in documents)\n",
"\n",
" return {\n",
" 'total_emails': len(documents),\n",
" 'total_chars': total_chars,\n",
" 'avg_email_length': total_chars // len(documents),\n",
" 'unique_senders': len(set(senders)),\n",
" 'top_senders': Counter(senders).most_common(10)\n",
" }\n",
"\n",
" def create_llm(self, model_type: str = \"openai\", temperature: float = 0.7, debug: bool = False):\n",
" \"\"\"Create LLM instance.\"\"\"\n",
" callbacks = [StdOutCallbackHandler()] if debug else []\n",
"\n",
" if model_type.lower() == \"openai\":\n",
" self.llm = ChatOpenAI(\n",
" temperature=temperature,\n",
" model_name=MODEL_OPENAI,\n",
" callbacks=callbacks\n",
" )\n",
" else:\n",
" self.llm = ChatOpenAI(temperature=temperature, model_name=MODEL_OPENAI)\n",
"\n",
" return self.llm\n",
"\n",
" def analyze_personal_interests(self, documents: List[Document]) -> str:\n",
" \"\"\"Analyze personal interests using LLM.\"\"\"\n",
" if not self.llm:\n",
" raise RuntimeError(\"Call create_llm() first\")\n",
"\n",
" prompt = self._generate_topics_prompt(documents)\n",
" response = self.llm.invoke([HumanMessage(content=prompt)])\n",
" self.topics = response.content\n",
" return self.topics\n",
"\n",
" def _generate_topics_prompt(self, documents: List[Document], user_context: Optional[str] = None) -> str:\n",
" \"\"\"Generate LLM prompt for topic identification.\"\"\"\n",
" senders = [doc.metadata.get('sender', '') for doc in documents]\n",
" subjects = [doc.metadata.get('subject', '') for doc in documents]\n",
" sender_counts = Counter(senders).most_common(20)\n",
"\n",
" context_line = f'Based on the user\\'s query: \"{user_context}\"\\n\\n' if user_context else \"\"\n",
"\n",
" prompt = f\"\"\"\n",
"{context_line}I have {len(documents)} emails. Analyze and identify 5-10 important topics/categories.\n",
"\n",
"Top senders:\n",
"{chr(10).join([f\"- {sender}: {count}\" for sender, count in sender_counts])}\n",
"\n",
"Sample subjects (first 30):\n",
"{chr(10).join([f\"- {subj}\" for subj in subjects[:30]])}\n",
"\n",
"IMPORTANT: Format your response as a simple numbered list with ONLY the topic names, one per line.\n",
"Do NOT use markdown formatting (**, *, etc.).\n",
"Do NOT add descriptions or explanations after the topic name.\n",
"Do NOT add blank lines between topics.\n",
"\n",
"Example format:\n",
"1. Work Projects\n",
"2. Family Communications\n",
"3. Professional Development\n",
"\"\"\"\n",
"\n",
" if user_context:\n",
" prompt += f\"\\n\\nYour response should list topics that align with the user's query about: {user_context}\"\n",
"\n",
" return prompt\n",
"\n",
" def extract_topics_from_text(self, topics_text: str) -> List[str]:\n",
" \"\"\"Extract topic list from LLM-generated topics text.\"\"\"\n",
" topics = []\n",
" lines = topics_text.strip().split('\\n')\n",
"\n",
" for line in lines:\n",
" line = line.strip()\n",
"\n",
" # Skip empty lines\n",
" if not line or len(line) < 3:\n",
" continue\n",
"\n",
" # Skip lines that are clearly descriptions (start with lowercase, or too long)\n",
" if line[0].islower() or line.startswith(('Emails', 'Topics', 'Information', 'Communications', 'Offers')):\n",
" continue\n",
"\n",
" # Remove markdown formatting (**, *, _)\n",
" line = line.replace('**', '').replace('*', '').replace('_', '')\n",
"\n",
" # Remove numbering and bullet points\n",
" if line and line[0].isdigit():\n",
" # Remove \"1.\" or \"1)\"\n",
" parts = line.split('.', 1)\n",
" if len(parts) > 1:\n",
" line = parts[1].strip()\n",
" else:\n",
" parts = line.split(')', 1)\n",
" if len(parts) > 1:\n",
" line = parts[1].strip()\n",
" elif line.startswith(('-', '•')):\n",
" line = line[1:].strip()\n",
"\n",
" # Take only the topic name (before any dash or colon describing it)\n",
" if ' - ' in line:\n",
" topic = line.split(' - ')[0].strip()\n",
" elif ':' in line:\n",
" topic = line.split(':')[0].strip()\n",
" else:\n",
" topic = line.strip()\n",
"\n",
" # Validate: reasonable length for a topic name (not a full sentence/description)\n",
" # Topic names should be between 5-60 characters\n",
" if topic and 5 < len(topic) < 60 and not topic.lower().startswith('based on'):\n",
" topics.append(topic)\n",
"\n",
" return topics[:10] # Limit to top 10 topics\n",
"\n",
" def categorize_emails_by_topics(self, documents: List[Document], vectorstore) -> Dict[str, List[Document]]:\n",
" \"\"\"Categorize emails by matching them to identified topics using RAG.\"\"\"\n",
" if not self.topics or not vectorstore:\n",
" return {}\n",
"\n",
" # Extract topic list from the topics text\n",
" topic_list = self.extract_topics_from_text(self.topics)\n",
"\n",
" if not topic_list:\n",
" return {}\n",
"\n",
" # For each topic, find matching emails using vector similarity\n",
" topic_to_emails = {topic: [] for topic in topic_list}\n",
" topic_to_emails['Uncategorized'] = []\n",
"\n",
" # Track which emails have been matched to which topic\n",
" matched_email_ids = set()\n",
" email_to_topic = {} # Map message_id to topic name\n",
"\n",
" retriever = vectorstore.as_retriever(search_kwargs={\"k\": len(documents)})\n",
"\n",
" for topic in topic_list:\n",
" # Query vectorstore for emails matching this topic\n",
" query = f\"Emails about: {topic}\"\n",
" relevant_docs = retriever.invoke(query)\n",
"\n",
" # Take top matches (based on proportion of total emails - ~15% per topic)\n",
" num_matches = max(1, int(len(documents) * 0.15))\n",
"\n",
" for doc in relevant_docs[:num_matches]:\n",
" msg_id = doc.metadata.get('message_id')\n",
" if msg_id and msg_id not in matched_email_ids:\n",
" # Find the original document\n",
" original_doc = next((d for d in documents if d.metadata.get('message_id') == msg_id), None)\n",
" if original_doc:\n",
" topic_to_emails[topic].append(original_doc)\n",
" matched_email_ids.add(msg_id)\n",
" email_to_topic[msg_id] = topic\n",
"\n",
" # Add uncategorized emails\n",
" for doc in documents:\n",
" msg_id = doc.metadata.get('message_id')\n",
" if msg_id not in matched_email_ids:\n",
" topic_to_emails['Uncategorized'].append(doc)\n",
" email_to_topic[msg_id] = 'Uncategorized'\n",
"\n",
" # Store the mapping for use in dataframe creation\n",
" self.email_to_topic = email_to_topic\n",
"\n",
" return topic_to_emails\n",
"\n",
" def get_topic_counts_display(self, documents: List[Document], vectorstore) -> str:\n",
" \"\"\"Get formatted topic counts for display.\"\"\"\n",
" if not self.topics or not vectorstore:\n",
" return \"No topics identified yet.\"\n",
"\n",
" topic_to_emails = self.categorize_emails_by_topics(documents, vectorstore)\n",
"\n",
" counts_text = \"Email Counts by Identified Topic:\\n\\n\"\n",
"\n",
" # Sort by count, descending\n",
" sorted_topics = sorted(topic_to_emails.items(), key=lambda x: len(x[1]), reverse=True)\n",
"\n",
" for topic, emails in sorted_topics:\n",
" count = len(emails)\n",
" if count > 0:\n",
" counts_text += f\" {topic}: {count} emails\\n\"\n",
"\n",
" total = sum(len(emails) for emails in topic_to_emails.values())\n",
" counts_text += f\"\\n Total: {total} emails\"\n",
"\n",
" return counts_text\n",
"\n",
" def classify_emails(self, documents: List[Document], vectorstore, threshold: float = 0.5):\n",
" \"\"\"Classify emails based on identified topics.\n",
"\n",
" Emails matching identified topics → KEEP\n",
" Emails not matching any topic → DELETE candidates\n",
" \"\"\"\n",
" if not self.topics:\n",
" raise RuntimeError(\"Call analyze_personal_interests() first\")\n",
"\n",
" # Categorize emails by topics\n",
" topic_to_emails = self.categorize_emails_by_topics(documents, vectorstore)\n",
"\n",
" # Emails matching topics are KEPT\n",
" keep_emails = []\n",
" for topic, emails in topic_to_emails.items():\n",
" if topic != 'Uncategorized':\n",
" keep_emails.extend(emails)\n",
"\n",
" # Uncategorized emails are DELETE candidates\n",
" delete_candidates = topic_to_emails.get('Uncategorized', [])\n",
"\n",
" # Store topic categorization for counts display\n",
" self.topic_to_emails = topic_to_emails\n",
"\n",
" self.classified_emails = {'keep': keep_emails, 'delete': delete_candidates}\n",
"\n",
" print(f\"Classification: {len(keep_emails)} keep, {len(delete_candidates)} delete\")\n",
" print(f\"Matched to {len([t for t in topic_to_emails.keys() if t != 'Uncategorized'])} topics\")\n",
" return self.classified_emails\n",
"\n",
" def create_archive(self, documents: List[Document], archive_name: Optional[str] = None) -> str:\n",
" \"\"\"Create ZIP archive of emails.\"\"\"\n",
" if not documents:\n",
" raise ValueError(\"No documents to archive\")\n",
"\n",
" if not archive_name:\n",
" timestamp = datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
" archive_name = f\"email_archive_{timestamp}.zip\"\n",
"\n",
" archive_dir = \"email_archive_temp\"\n",
" os.makedirs(archive_dir, exist_ok=True)\n",
"\n",
" for i, doc in enumerate(documents):\n",
" email_data = {'metadata': doc.metadata, 'content': doc.page_content}\n",
" subject = doc.metadata.get('subject', 'no_subject')[:50]\n",
" safe_subject = \"\".join(c for c in subject if c.isalnum() or c in (' ', '-', '_')).strip()\n",
" filename = f\"{i+1:04d}_{safe_subject}.json\"\n",
"\n",
" with open(os.path.join(archive_dir, filename), 'w', encoding='utf-8') as f:\n",
" json.dump(email_data, f, indent=2, ensure_ascii=False)\n",
"\n",
" # Create ZIP\n",
" with zipfile.ZipFile(archive_name, 'w', zipfile.ZIP_DEFLATED) as zipf:\n",
" for root, dirs, files in os.walk(archive_dir):\n",
" for file in files:\n",
" zipf.write(os.path.join(root, file), file)\n",
"\n",
" shutil.rmtree(archive_dir)\n",
" print(f\"Archive created: {archive_name}\")\n",
" return archive_name\n",
"\n",
" def emails_to_dataframe(self, documents: List[Document], add_select_column: bool = False) -> pd.DataFrame:\n",
" \"\"\"Convert to DataFrame with Topics column.\"\"\"\n",
" data = [\n",
" {\n",
" 'Topics': self.email_to_topic.get(doc.metadata.get('message_id', ''), 'Unknown'),\n",
" 'Message ID': doc.metadata.get('message_id', ''),\n",
" 'Subject': doc.metadata.get('subject', '')[:100],\n",
" 'Sender': doc.metadata.get('sender', ''),\n",
" 'Length': len(doc.page_content)\n",
" }\n",
" for doc in documents\n",
" ]\n",
" df = pd.DataFrame(data)\n",
"\n",
" if add_select_column:\n",
" # Add Select column as first column\n",
" df.insert(0, 'Select', False)\n",
"\n",
" return df"
],
"metadata": {
"id": "7fUcjkI79vLa"
},
"id": "7fUcjkI79vLa",
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"##Application State"
],
"metadata": {
"id": "VWqZZRLY94ST"
},
"id": "VWqZZRLY94ST"
},
{
"cell_type": "code",
"source": [
"class AppState:\n",
" \"\"\"Global application state.\"\"\"\n",
" def __init__(self):\n",
" self.gmail_conn: Optional[GmailConnection] = None\n",
" self.vector_db_manager = VectorDatabaseManager()\n",
" self.email_processor = EmailProcessor()\n",
" self.testing_mode = False\n",
" self.debug_mode = False\n",
"\n",
"state = AppState()"
],
"metadata": {
"id": "eHKPF6WB93WZ"
},
"id": "eHKPF6WB93WZ",
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"##Gradio Callback Functions"
],
"metadata": {
"id": "yOCw1doE93LH"
},
"id": "yOCw1doE93LH"
},
{
"cell_type": "code",
"source": [
"def connect_imap(email, password):\n",
" try:\n",
" state.gmail_conn = create_gmail_connection(email, password)\n",
" if state.gmail_conn.connect():\n",
" info = state.gmail_conn.get_auth_info()\n",
" return f\"Connected as {info['email']}\\nTotal messages: {info['total_messages']:,}\"\n",
" return \"❌ Authentication failed\"\n",
" except Exception as e:\n",
" return f\"❌ Error: {str(e)}\"\n",
"\n",
"\n",
"def connect_imap(email, password):\n",
" try:\n",
" state.gmail_conn = create_gmail_connection(email, password)\n",
" if state.gmail_conn.connect():\n",
" info = state.gmail_conn.get_auth_info()\n",
" return f\"Connected as {info['email']}\\nTotal messages: {info['total_messages']:,}\"\n",
" return \"❌ Authentication failed\"\n",
" except Exception as e:\n",
" return f\"❌ Error: {str(e)}\"\n",
"\n",
"\n",
"def fetch_and_process(testing_mode, embedding_model):\n",
" try:\n",
" if not state.gmail_conn or not state.gmail_conn.is_connected():\n",
" return \"❌ Not authenticated\"\n",
"\n",
" state.testing_mode = testing_mode\n",
" max_emails = 50 if testing_mode else None\n",
"\n",
" documents, fetch_diagnostics = state.gmail_conn.fetch_emails(max_emails)\n",
"\n",
" if not documents:\n",
" return f\"❌ No emails fetched\\n\\n{fetch_diagnostics}\"\n",
"\n",
" stats = state.email_processor.get_statistics(documents)\n",
" chunks = state.email_processor.chunk_documents(documents)\n",
"\n",
" state.vector_db_manager.create_embeddings(embedding_model)\n",
" state.vector_db_manager.create_vector_store(chunks)\n",
"\n",
" return f\"\"\"✓ Processing completed!\n",
"\n",
"{fetch_diagnostics}\n",
"\n",
"Total emails: {stats['total_emails']}\n",
"Chunks created: {len(chunks)}\n",
"Top 5 senders:\n",
"{chr(10).join([f\" - {sender}: {count}\" for sender, count in stats['top_senders'][:5]])}\n",
"\"\"\"\n",
" except Exception as e:\n",
" import traceback\n",
" return f\"❌ Error: {str(e)}\\n\\nTraceback:\\n{traceback.format_exc()}\"\n",
"\n",
"\n",
"def analyze_topics(llm_model, threshold):\n",
" try:\n",
" if not state.email_processor.documents:\n",
" return \"❌ No documents loaded\", \"\", None, None\n",
"\n",
" state.email_processor.create_llm(llm_model)\n",
" topics = state.email_processor.analyze_personal_interests(state.email_processor.documents)\n",
"\n",
" # Automatically classify after analysis\n",
" classified = state.email_processor.classify_emails(\n",
" state.email_processor.documents,\n",
" state.vector_db_manager.vectorstore,\n",
" threshold\n",
" )\n",
"\n",
" # Get topic counts after classification (shows which topics emails matched to)\n",
" counts_text = state.email_processor.get_topic_counts_display(\n",
" state.email_processor.documents,\n",
" state.vector_db_manager.vectorstore\n",
" )\n",
"\n",
" # Get the actual topics list that was used for categorization\n",
" topic_list = state.email_processor.extract_topics_from_text(topics)\n",
" formatted_topics = \"Identified Topics:\\n\\n\" + \"\\n\".join([f\"{i+1}. {topic}\" for i, topic in enumerate(topic_list)])\n",
"\n",
" keep_df = state.email_processor.emails_to_dataframe(classified['keep'], add_select_column=False)\n",
" delete_df = state.email_processor.emails_to_dataframe(classified['delete'], add_select_column=True)\n",
"\n",
" return formatted_topics, counts_text, keep_df, delete_df\n",
" except Exception as e:\n",
" return f\"❌ Error: {str(e)}\", \"\", None, None\n",
"\n",
"\n",
"def refine_topics_with_chat(chat_query, llm_model, threshold):\n",
" \"\"\"Use LLM to identify topics based on user query about their interests.\"\"\"\n",
" try:\n",
" if not state.email_processor.documents or not state.vector_db_manager.vectorstore:\n",
" return \"❌ Please process emails first\", \"\", None, None\n",
"\n",
" if not chat_query or chat_query.strip() == \"\":\n",
" return \"❌ Please enter a query\", \"\", None, None\n",
"\n",
" # Create LLM if needed\n",
" if not state.email_processor.llm:\n",
" state.email_processor.create_llm(llm_model)\n",
"\n",
" prompt = state.email_processor._generate_topics_prompt(\n",
" state.email_processor.documents,\n",
" user_context=chat_query\n",
" )\n",
"\n",
" response = state.email_processor.llm.invoke([HumanMessage(content=prompt)])\n",
" state.email_processor.topics = response.content\n",
"\n",
" # Automatically classify emails based on the new topics\n",
" classified = state.email_processor.classify_emails(\n",
" state.email_processor.documents,\n",
" state.vector_db_manager.vectorstore,\n",
" threshold\n",
" )\n",
"\n",
" # Get topic counts after classification\n",
" counts_text = state.email_processor.get_topic_counts_display(\n",
" state.email_processor.documents,\n",
" state.vector_db_manager.vectorstore\n",
" )\n",
"\n",
" # Get the actual topics list that was used for categorization\n",
" topic_list = state.email_processor.extract_topics_from_text(state.email_processor.topics)\n",
" formatted_topics = \"Identified Topics:\\n\\n\" + \"\\n\".join([f\"{i+1}. {topic}\" for i, topic in enumerate(topic_list)])\n",
"\n",
" keep_df = state.email_processor.emails_to_dataframe(classified['keep'], add_select_column=False)\n",
" delete_df = state.email_processor.emails_to_dataframe(classified['delete'], add_select_column=True)\n",
"\n",
" return formatted_topics, counts_text, keep_df, delete_df\n",
" except Exception as e:\n",
" return f\"❌ Error: {str(e)}\", \"\", None, None\n",
"\n",
"\n",
"def select_all_emails(delete_df):\n",
" \"\"\"Select all delete candidate emails.\"\"\"\n",
" if delete_df is None or len(delete_df) == 0:\n",
" return delete_df\n",
"\n",
" delete_df_copy = delete_df.copy()\n",
" delete_df_copy['Select'] = True\n",
" return delete_df_copy\n",
"\n",
"\n",
"def deselect_all_emails(delete_df):\n",
" \"\"\"Deselect all delete candidate emails.\"\"\"\n",
" if delete_df is None or len(delete_df) == 0:\n",
" return delete_df\n",
"\n",
" delete_df_copy = delete_df.copy()\n",
" delete_df_copy['Select'] = False\n",
" return delete_df_copy\n",
"\n",
"\n",
"def create_archive_file():\n",
" try:\n",
" if not state.email_processor.classified_emails['delete']:\n",
" return \"❌ No emails to archive\", None\n",
"\n",
" archive_path = state.email_processor.create_archive(\n",
" state.email_processor.classified_emails['delete']\n",
" )\n",
" return f\"✓ Archive created: {archive_path}\", archive_path\n",
" except Exception as e:\n",
" return f\"❌ Error: {str(e)}\", None\n",
"\n",
"\n",
"def perform_deletion(confirmation_text, delete_df):\n",
" try:\n",
" if confirmation_text.strip().upper() != \"DELETE\":\n",
" return \"❌ Confirmation failed. Type 'DELETE' to confirm.\"\n",
"\n",
" if delete_df is None or len(delete_df) == 0:\n",
" return \"❌ No emails available for deletion\"\n",
"\n",
" # Get selected emails\n",
" if 'Select' not in delete_df.columns:\n",
" return \"❌ Invalid dataframe format\"\n",
"\n",
" selected_rows = delete_df[delete_df['Select'] == True]\n",
" if len(selected_rows) == 0:\n",
" return \"❌ No emails selected for deletion\"\n",
"\n",
" # Get message IDs of selected emails\n",
" selected_ids = set(selected_rows['Message ID'].tolist())\n",
"\n",
" # Filter documents to only selected ones\n",
" selected_docs = [\n",
" doc for doc in state.email_processor.classified_emails['delete']\n",
" if doc.metadata.get('message_id') in selected_ids\n",
" ]\n",
"\n",
" if not state.gmail_conn:\n",
" return \"❌ Not authenticated\"\n",
"\n",
" success, failed = state.gmail_conn.delete_emails(selected_docs)\n",
"\n",
" return f\"Deletion complete:\\n - Deleted: {success}\\n - Failed: {failed}\\n - Skipped: {len(state.email_processor.classified_emails['delete']) - len(selected_docs)}\"\n",
" except Exception as e:\n",
" return f\"❌ Error: {str(e)}\""
],
"metadata": {
"id": "2toGS3_z-dSE"
},
"id": "2toGS3_z-dSE",
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"##Gradio Interface"
],
"metadata": {
"id": "ja-oFdo8-h6b"
},
"id": "ja-oFdo8-h6b"
},
{
"cell_type": "code",
"source": [
"with gr.Blocks(title=\"Gmail Inbox Terminator\", theme=gr.themes.Soft()) as app:\n",
" gr.Markdown(\"# 🔥 Gmail Inbox Terminator\")\n",
" gr.Markdown(\"### Intelligent Email Management with AI\")\n",
" gr.Markdown(\"Identify important topics, then delete emails OUTSIDE those topics.\")\n",
"\n",
" with gr.Tabs():\n",
" # Tab 1: Connection\n",
" with gr.Tab(\"🔌 Connection\"):\n",
" gr.Markdown(\"## Connect to Gmail via IMAP\")\n",
"\n",
" if default_email and default_password:\n",
" gr.Markdown(\"\"\"\n",
"**✅ Credentials loaded**\n",
"\n",
"Use pre-filled credentials or enter different ones.\n",
"\"\"\")\n",
" else:\n",
" gr.Markdown(\"\"\"\n",
"**Requirements:**\n",
"1. Enable 2-Factor Authentication on your Google account\n",
"2. Create an app-specific password at [Google Account Security](https://myaccount.google.com/security)\n",
"3. Use the app password below (not your regular password)\n",
"\"\"\")\n",
"\n",
" with gr.Row():\n",
" imap_email = gr.Textbox(\n",
" label=\"Email Address\",\n",
" placeholder=\"your.email@gmail.com\",\n",
" value=default_email\n",
" )\n",
" imap_password = gr.Textbox(\n",
" label=\"App Password\",\n",
" type=\"password\",\n",
" placeholder=\"16-character app password\",\n",
" value=default_password\n",
" )\n",
"\n",
" imap_btn = gr.Button(\"Connect\", variant=\"primary\")\n",
" imap_status = gr.Textbox(label=\"Connection Status\", lines=3)\n",
"\n",
" gr.Markdown(\"---\")\n",
" gr.Markdown(\"## Process Emails\")\n",
"\n",
" with gr.Row():\n",
" testing_mode_check = gr.Checkbox(label=\"Testing Mode (50 emails only)\", value=True)\n",
" embedding_dropdown = gr.Dropdown(\n",
" choices=[\"openai\", \"bert\"],\n",
" value=\"openai\",\n",
" label=\"Embedding Model\"\n",
" )\n",
"\n",
" process_btn = gr.Button(\"📥 Fetch and Process Emails\", variant=\"primary\", size=\"lg\")\n",
" process_status = gr.Textbox(label=\"Processing Status\", lines=10)\n",
"\n",
" imap_btn.click(connect_imap, inputs=[imap_email, imap_password], outputs=imap_status)\n",
" process_btn.click(\n",
" fetch_and_process,\n",
" inputs=[testing_mode_check, embedding_dropdown],\n",
" outputs=process_status\n",
" )\n",
"\n",
" # Tab 2: Topic Analysis & Configuration\n",
" with gr.Tab(\"🔍 Topic Analysis & Configuration\"):\n",
" gr.Markdown(\"## a) Configuration\")\n",
"\n",
" with gr.Row():\n",
" llm_dropdown = gr.Dropdown(\n",
" choices=[\"openai\", \"gemini\"],\n",
" value=\"openai\",\n",
" label=\"LLM Model\"\n",
" )\n",
"\n",
" classification_threshold = gr.Slider(\n",
" minimum=0.1,\n",
" maximum=0.9,\n",
" value=0.5,\n",
" step=0.1,\n",
" label=\"Relevance Threshold (higher = more strict, fewer kept)\"\n",
" )\n",
"\n",
" gr.Markdown(\"---\")\n",
" gr.Markdown(\"## b) Interest Analysis\")\n",
" gr.Markdown(\"Identify topics that are IMPORTANT to you. Emails matching these topics will be KEPT, others offered for deletion.\")\n",
"\n",
" analyze_btn = gr.Button(\"🤖 Identify My Interests\", variant=\"primary\", size=\"lg\")\n",
" topics_output = gr.Textbox(label=\"Important Topics\", lines=10)\n",
" counts_output = gr.Textbox(label=\"Category Counts\", lines=8)\n",
"\n",
" gr.Markdown(\"---\")\n",
" gr.Markdown(\"### Refine Topics with LLM Query\")\n",
" gr.Markdown(\"Ask the LLM to identify specific topics based on your interests. Results replace topics above.\")\n",
"\n",
" with gr.Row():\n",
" chat_query_input = gr.Textbox(\n",
" label=\"Query about your interests\",\n",
" placeholder=\"e.g., 'What are my most important professional topics?'\",\n",
" scale=3\n",
" )\n",
" chat_submit_btn = gr.Button(\"Submit Query\", variant=\"secondary\", scale=1)\n",
"\n",
" gr.Markdown(\"\"\"\n",
"**Example queries:**\n",
"- \"What are my most important professional topics?\"\n",
"- \"Identify topics related to family and personal life\"\n",
"- \"What work-related topics should I keep?\"\n",
"\"\"\")\n",
"\n",
" # Tab 3: Email Management & Deletion\n",
" with gr.Tab(\"📧 Email Management & Deletion\"):\n",
" gr.Markdown(\"## Classified Emails based on topic analysi)\")\n",
" gr.Markdown(\"Emails matching your important topics are in 'Keep'. Others are deletion candidates.\")\n",
"\n",
" with gr.Row():\n",
" with gr.Column():\n",
" gr.Markdown(\"### 📌 Keep (Important)\")\n",
" keep_df = gr.Dataframe(label=\"Emails to Keep\", interactive=False)\n",
"\n",
" with gr.Column():\n",
" gr.Markdown(\"### 🗑️ Delete Candidates\")\n",
"\n",
" with gr.Row():\n",
" select_all_btn = gr.Button(\"✅ Select All\", size=\"sm\")\n",
" deselect_all_btn = gr.Button(\"❌ Deselect All\", size=\"sm\")\n",
"\n",
" delete_df = gr.Dataframe(\n",
" label=\"Select emails to delete\",\n",
" interactive=True,\n",
" datatype=[\"bool\", \"str\", \"str\", \"str\", \"str\", \"number\"],\n",
" col_count=(6, \"fixed\")\n",
" )\n",
"\n",
" select_all_btn.click(select_all_emails, inputs=delete_df, outputs=delete_df)\n",
" deselect_all_btn.click(deselect_all_emails, inputs=delete_df, outputs=delete_df)\n",
"\n",
" gr.Markdown(\"---\")\n",
" gr.Markdown(\"## Archive & Delete\")\n",
"\n",
" with gr.Row():\n",
" archive_btn = gr.Button(\"📦 Create Archive\", variant=\"secondary\")\n",
" delete_btn = gr.Button(\"🔥 DELETE SELECTED\", variant=\"stop\")\n",
"\n",
" with gr.Row():\n",
" with gr.Column():\n",
" archive_status = gr.Textbox(label=\"Archive Status\", lines=2)\n",
" with gr.Column():\n",
" confirmation_input = gr.Textbox(label=\"Type DELETE to confirm\", placeholder=\"DELETE\")\n",
"\n",
" archive_file = gr.File(label=\"Download Archive\")\n",
" deletion_status = gr.Textbox(label=\"Deletion Result\", lines=3)\n",
"\n",
" analyze_btn.click(\n",
" analyze_topics,\n",
" inputs=[llm_dropdown, classification_threshold],\n",
" outputs=[topics_output, counts_output, keep_df, delete_df]\n",
" )\n",
"\n",
" chat_submit_btn.click(\n",
" refine_topics_with_chat,\n",
" inputs=[chat_query_input, llm_dropdown, classification_threshold],\n",
" outputs=[topics_output, counts_output, keep_df, delete_df]\n",
" )\n",
"\n",
" archive_btn.click(create_archive_file, outputs=[archive_status, archive_file])\n",
" delete_btn.click(perform_deletion, inputs=[confirmation_input, delete_df], outputs=deletion_status)"
],
"metadata": {
"id": "iKC3MtzX-jVT"
},
"id": "iKC3MtzX-jVT",
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"## Launch App"
],
"metadata": {
"id": "rY9Pbte__Kqa"
},
"id": "rY9Pbte__Kqa"
},
{
"cell_type": "code",
"source": [
"app.launch(share=True, inbrowser=True)"
],
"metadata": {
"id": "YUHF1ZIl_Nv-"
},
"id": "YUHF1ZIl_Nv-",
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"##Unit Tests for Components"
],
"metadata": {
"id": "jHgVYNTc-tCf"
},
"id": "jHgVYNTc-tCf"
},
{
"cell_type": "code",
"source": [
"\n",
"print(\"=\" * 60)\n",
"print(\"UNIT TESTS - Testing Individual Components\")\n",
"print(\"=\" * 60)\n",
"\n",
"# Test 1: Helper Functions\n",
"print(\"\\n📝 Test 1: Helper Functions\")\n",
"print(\"-\" * 40)\n",
"\n",
"def test_helper_functions():\n",
" \"\"\"Test email parsing helper functions.\"\"\"\n",
" # Test get_header_value\n",
" test_headers = [\n",
" {'name': 'Subject', 'value': 'Test Email'},\n",
" {'name': 'From', 'value': 'sender@example.com'},\n",
" {'name': 'Date', 'value': '2025-10-21'}\n",
" ]\n",
"\n",
" assert get_header_value(test_headers, 'Subject') == 'Test Email'\n",
" assert get_header_value(test_headers, 'From') == 'sender@example.com'\n",
" assert get_header_value(test_headers, 'Missing') == ''\n",
"\n",
" print(\"✓ get_header_value() works correctly\")\n",
" return True\n",
"\n",
"try:\n",
" test_helper_functions()\n",
" print(\"\\n✅ Helper functions test PASSED\")\n",
"except AssertionError as e:\n",
" print(f\"\\n❌ Helper functions test FAILED: {e}\")\n",
"\n",
"# Test 2: VectorDatabaseManager\n",
"print(\"\\n\\n💾 Test 2: VectorDatabaseManager\")\n",
"print(\"-\" * 40)\n",
"\n",
"def test_vector_database_manager():\n",
" \"\"\"Test VectorDatabaseManager class.\"\"\"\n",
" test_docs = [\n",
" Document(\n",
" page_content=\"This is a test email about Python programming and data science.\",\n",
" metadata={'subject': 'Test 1', 'sender': 'test@example.com'}\n",
" ),\n",
" Document(\n",
" page_content=\"Another email discussing machine learning and AI topics.\",\n",
" metadata={'subject': 'Test 2', 'sender': 'ai@example.com'}\n",
" ),\n",
" Document(\n",
" page_content=\"Meeting invitation for tomorrow's project review.\",\n",
" metadata={'subject': 'Test 3', 'sender': 'manager@example.com'}\n",
" )\n",
" ]\n",
"\n",
" test_mgr = VectorDatabaseManager(db_name=\"test_vector_db\")\n",
" embeddings = test_mgr.create_embeddings(\"bert\")\n",
" assert test_mgr.embeddings is not None\n",
" print(\"✓ Embeddings created successfully\")\n",
"\n",
" vectorstore = test_mgr.create_vector_store(test_docs, recreate=True)\n",
" assert vectorstore is not None\n",
" assert test_mgr.vectorstore._collection.count() == len(test_docs)\n",
" print(f\"✓ Vector store created with {len(test_docs)} documents\")\n",
"\n",
" retriever = vectorstore.as_retriever(search_kwargs={\"k\": 2})\n",
" results = retriever.invoke(\"Python programming\")\n",
" assert len(results) > 0\n",
" print(f\"✓ Retrieval works: found {len(results)} relevant documents\")\n",
"\n",
" if os.path.exists(\"test_vector_db\"):\n",
" shutil.rmtree(\"test_vector_db\")\n",
"\n",
" return True\n",
"\n",
"try:\n",
" test_vector_database_manager()\n",
" print(\"\\n✅ VectorDatabaseManager test PASSED\")\n",
"except Exception as e:\n",
" print(f\"\\n❌ VectorDatabaseManager test FAILED: {e}\")\n",
"\n",
"# Test 3: EmailProcessor\n",
"print(\"\\n\\n📧 Test 3: EmailProcessor\")\n",
"print(\"-\" * 40)\n",
"\n",
"def test_email_processor():\n",
" \"\"\"Test EmailProcessor class.\"\"\"\n",
" test_docs = [\n",
" Document(\n",
" page_content=\"Subject: Project Update\\nFrom: boss@company.com\\nTo: me@company.com\\nDate: 2025-10-20\\n\\nPlease review the quarterly report.\",\n",
" metadata={'subject': 'Project Update', 'sender': 'boss@company.com', 'message_id': '001', 'date': '2025-10-20'}\n",
" ),\n",
" Document(\n",
" page_content=\"Subject: Newsletter\\nFrom: marketing@spam.com\\nTo: me@company.com\\nDate: 2025-10-19\\n\\nCheck out our latest deals!\",\n",
" metadata={'subject': 'Newsletter', 'sender': 'marketing@spam.com', 'message_id': '002', 'date': '2025-10-19'}\n",
" ),\n",
" Document(\n",
" page_content=\"Subject: Team Meeting\\nFrom: colleague@company.com\\nTo: me@company.com\\nDate: 2025-10-21\\n\\nMeeting tomorrow at 10am.\",\n",
" metadata={'subject': 'Team Meeting', 'sender': 'colleague@company.com', 'message_id': '003', 'date': '2025-10-21'}\n",
" )\n",
" ]\n",
"\n",
" processor = EmailProcessor()\n",
"\n",
" chunks = processor.chunk_documents(test_docs, chunk_size=100, chunk_overlap=20)\n",
" assert len(chunks) >= len(test_docs)\n",
" print(f\"✓ Chunking works: created {len(chunks)} chunks from {len(test_docs)} documents\")\n",
"\n",
" stats = processor.get_statistics(test_docs)\n",
" assert stats['total_emails'] == 3\n",
" assert stats['unique_senders'] == 3\n",
" print(f\"✓ Statistics calculation works: {stats['total_emails']} emails, {stats['unique_senders']} unique senders\")\n",
"\n",
" df = processor.emails_to_dataframe(test_docs, add_select_column=True)\n",
" assert len(df) == 3\n",
" assert 'Topics' in df.columns\n",
" assert 'Subject' in df.columns\n",
" assert 'Sender' in df.columns\n",
" assert 'Select' in df.columns\n",
" print(f\"✓ DataFrame conversion works: {len(df)} rows, {len(df.columns)} columns\")\n",
"\n",
" return True\n",
"\n",
"try:\n",
" test_email_processor()\n",
" print(\"\\n✅ EmailProcessor test PASSED\")\n",
"except Exception as e:\n",
" print(f\"\\n❌ EmailProcessor test FAILED: {e}\")\n",
"\n",
"# Test 4: Mock IMAP Connection\n",
"print(\"\\n\\n🔌 Test 4: Mock IMAP Connection\")\n",
"print(\"-\" * 40)\n",
"\n",
"def test_mock_connection():\n",
" \"\"\"Test the connection interface with a mock implementation.\"\"\"\n",
"\n",
" class MockIMAPConnection(GmailConnection):\n",
" \"\"\"Mock implementation for testing.\"\"\"\n",
"\n",
" def connect(self) -> bool:\n",
" self.auth_info = {\n",
" 'email': 'test@example.com',\n",
" 'total_messages': 100,\n",
" 'auth_method': 'Mock'\n",
" }\n",
" self.connection = \"mock_connection\"\n",
" return True\n",
"\n",
" def fetch_emails(self, max_emails: Optional[int] = None) -> Tuple[List[Document], str]:\n",
" limit = max_emails if max_emails else 10\n",
" docs = [\n",
" Document(\n",
" page_content=f\"Mock email {i}\",\n",
" metadata={\n",
" 'message_id': f'mock_{i}',\n",
" 'subject': f'Test Subject {i}',\n",
" 'sender': f'sender{i}@example.com',\n",
" 'date': '2025-10-21'\n",
" }\n",
" )\n",
" for i in range(min(limit, 5))\n",
" ]\n",
" return docs, f\"✓ Fetched {len(docs)} mock emails\"\n",
"\n",
" def delete_emails(self, documents: List[Document]) -> Tuple[int, int]:\n",
" return len(documents), 0\n",
"\n",
" mock_conn = MockIMAPConnection()\n",
"\n",
" assert mock_conn.connect()\n",
" print(\"✓ Mock connection established\")\n",
"\n",
" assert mock_conn.is_connected()\n",
" print(\"✓ Connection status check works\")\n",
"\n",
" info = mock_conn.get_auth_info()\n",
" assert info['email'] == 'test@example.com'\n",
" print(f\"✓ Auth info retrieved: {info['email']}\")\n",
"\n",
" emails, diagnostics = mock_conn.fetch_emails(max_emails=3)\n",
" assert len(emails) == 3\n",
" print(f\"✓ Fetched {len(emails)} mock emails\")\n",
" print(f\" Diagnostics: {diagnostics}\")\n",
"\n",
" success, failed = mock_conn.delete_emails(emails)\n",
" assert success == 3 and failed == 0\n",
" print(f\"✓ Mock deletion: {success} successful, {failed} failed\")\n",
"\n",
" return True\n",
"\n",
"try:\n",
" test_mock_connection()\n",
" print(\"\\n✅ Mock connection test PASSED\")\n",
"except Exception as e:\n",
" print(f\"\\n❌ Mock connection test FAILED: {e}\")\n",
"\n",
"print(\"\\n\" + \"=\" * 60)\n",
"print(\"✅ ALL UNIT TESTS COMPLETED\")\n",
"print(\"=\" * 60)\n"
],
"metadata": {
"id": "NQjxVtZl-sNm"
},
"id": "NQjxVtZl-sNm",
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"##Integration Test (with Mock Data)"
],
"metadata": {
"id": "sA6A8f2Q-r_2"
},
"id": "sA6A8f2Q-r_2"
},
{
"cell_type": "code",
"source": [
"print(\"\\n\\n\" + \"=\" * 60)\n",
"print(\"INTEGRATION TEST - Full Workflow with Mock Data\")\n",
"print(\"=\" * 60)\n",
"\n",
"def run_integration_test():\n",
" \"\"\"Run a complete workflow test with mock data.\"\"\"\n",
"\n",
" print(\"\\n🚀 Starting integration test...\")\n",
"\n",
" # Step 1: Create mock connection\n",
" print(\"\\n1⃣ Creating mock Gmail connection...\")\n",
"\n",
" class TestGmailConnection(GmailConnection):\n",
" def connect(self):\n",
" self.connection = True\n",
" self.auth_info = {'email': 'test@example.com', 'total_messages': 20, 'auth_method': 'Test'}\n",
" return True\n",
"\n",
" def fetch_emails(self, max_emails=None):\n",
" # Generate realistic mock emails\n",
" topics = [\n",
" (\"Work Project\", \"manager@company.com\", \"Need your input on Q4 planning and budget allocation.\"),\n",
" (\"Team Meeting\", \"colleague@company.com\", \"Weekly sync tomorrow at 10am to discuss progress.\"),\n",
" (\"Newsletter\", \"marketing@newsletter.com\", \"Top 10 deals this week! Don't miss out!\"),\n",
" (\"Spam Offer\", \"deals@promo.com\", \"You've won a million dollars! Click here now!\"),\n",
" (\"Client Update\", \"client@business.com\", \"Regarding the proposal you sent last week.\"),\n",
" (\"Training Course\", \"learning@company.com\", \"New Python course available for employees.\"),\n",
" (\"Marketing Email\", \"ads@shopping.com\", \"Summer sale - 50% off everything!\"),\n",
" (\"Boss Email\", \"ceo@company.com\", \"Great job on the presentation yesterday!\"),\n",
" (\"Junk\", \"random@spam.com\", \"Make money fast with this one weird trick!\"),\n",
" (\"Important Notice\", \"hr@company.com\", \"Annual review meeting scheduled for next month.\")\n",
" ]\n",
"\n",
" limit = min(max_emails if max_emails else 10, len(topics))\n",
"\n",
" docs = [\n",
" Document(\n",
" page_content=f\"Subject: {subj}\\nFrom: {sender}\\nTo: test@example.com\\nDate: 2025-10-{20-i}\\n\\n{body}\",\n",
" metadata={\n",
" 'message_id': f'test_{i}',\n",
" 'subject': subj,\n",
" 'sender': sender,\n",
" 'recipient': 'test@example.com',\n",
" 'date': f'2025-10-{20-i}',\n",
" 'source': 'test'\n",
" }\n",
" )\n",
" for i, (subj, sender, body) in enumerate(topics[:limit])\n",
" ]\n",
" return docs, f\"✓ Fetched {len(docs)} test emails\"\n",
"\n",
" def delete_emails(self, documents):\n",
" return len(documents), 0\n",
"\n",
" test_conn = TestGmailConnection()\n",
" test_conn.connect()\n",
" print(f\" ✓ Connected as: {test_conn.get_auth_info()['email']}\")\n",
"\n",
" # Step 2: Fetch emails\n",
" print(\"\\n2⃣ Fetching mock emails...\")\n",
" emails, diagnostics = test_conn.fetch_emails(max_emails=10)\n",
" print(f\" ✓ Fetched {len(emails)} emails\")\n",
" print(f\" {diagnostics}\")\n",
"\n",
" # Step 3: Process emails\n",
" print(\"\\n3⃣ Processing emails...\")\n",
" processor = EmailProcessor()\n",
" chunks = processor.chunk_documents(emails)\n",
" print(f\" ✓ Created {len(chunks)} chunks\")\n",
"\n",
" stats = processor.get_statistics(emails)\n",
" print(f\" ✓ Statistics: {stats['total_emails']} emails, {stats['unique_senders']} senders\")\n",
"\n",
" # Step 4: Create vector store\n",
" print(\"\\n4⃣ Creating vector store...\")\n",
" vector_mgr = VectorDatabaseManager(db_name=\"test_integration_db\")\n",
" vector_mgr.create_embeddings(\"bert\") # Use BERT to avoid API costs\n",
" vector_mgr.create_vector_store(chunks, recreate=True)\n",
" print(f\" ✓ Vector store created with {vector_mgr.vectorstore._collection.count()} documents\")\n",
"\n",
" # Step 5: Analyze topics (simulated - would normally use LLM)\n",
" print(\"\\n5⃣ Analyzing topics...\")\n",
" processor.topics = \"\"\"\n",
"Based on the email analysis:\n",
"1. Work Projects - Manager communications about planning and budgets\n",
"2. Team Collaboration - Meeting invites and team sync-ups\n",
"3. Client Relations - Important client communications\n",
"4. Professional Development - Training and learning opportunities\n",
"5. Company Announcements - HR and leadership communications\n",
"\"\"\"\n",
" print(\" Topics identified (mock analysis)\")\n",
"\n",
" # Step 6: Classify emails\n",
" print(\"\\n6⃣ Classifying emails...\")\n",
" # Simulate classification based on sender domains\n",
" work_domains = ['company.com', 'business.com']\n",
" spam_domains = ['newsletter.com', 'promo.com', 'spam.com', 'shopping.com']\n",
"\n",
" keep_emails = [email for email in emails if any(domain in email.metadata.get('sender', '') for domain in work_domains)]\n",
" delete_emails = [email for email in emails if any(domain in email.metadata.get('sender', '') for domain in spam_domains)]\n",
"\n",
" processor.classified_emails = {'keep': keep_emails, 'delete': delete_emails}\n",
" print(f\" ✓ Classification complete:\")\n",
" print(f\" - Keep: {len(keep_emails)} emails\")\n",
" print(f\" - Delete: {len(delete_emails)} emails\")\n",
"\n",
" # Step 7: Create archive\n",
" print(\"\\n7⃣ Creating archive...\")\n",
" if delete_emails:\n",
" archive_path = processor.create_archive(delete_emails)\n",
" print(f\" ✓ Archive created: {archive_path}\")\n",
" archive_exists = os.path.exists(archive_path)\n",
" print(f\" ✓ Archive file exists: {archive_exists}\")\n",
"\n",
" # Step 8: Simulate deletion\n",
" print(\"\\n8⃣ Simulating deletion...\")\n",
" success, failed = test_conn.delete_emails(delete_emails)\n",
" print(f\" ✓ Deletion complete: {success} successful, {failed} failed\")\n",
"\n",
" # Step 9: Display results as DataFrame\n",
" print(\"\\n9⃣ Generating reports...\")\n",
" keep_df = processor.emails_to_dataframe(keep_emails)\n",
" delete_df = processor.emails_to_dataframe(delete_emails)\n",
" print(f\" ✓ Keep DataFrame: {len(keep_df)} rows\")\n",
" print(f\" ✓ Delete DataFrame: {len(delete_df)} rows\")\n",
"\n",
" # Cleanup\n",
" print(\"\\n🧹 Cleaning up test files...\")\n",
" if os.path.exists(\"test_integration_db\"):\n",
" shutil.rmtree(\"test_integration_db\")\n",
" if delete_emails and os.path.exists(archive_path):\n",
" os.remove(archive_path)\n",
" print(\" ✓ Cleanup complete\")\n",
"\n",
" print(\"\\n\" + \"=\" * 60)\n",
" print(\"✅ INTEGRATION TEST COMPLETED SUCCESSFULLY!\")\n",
" print(\"=\" * 60)\n",
" print(\"\\n📊 Summary:\")\n",
" print(f\" • Total emails processed: {len(emails)}\")\n",
" print(f\" • Emails to keep: {len(keep_emails)}\")\n",
" print(f\" • Emails to delete: {len(delete_emails)}\")\n",
" print(f\" • Archive created: ✓\")\n",
" print(f\" • Deletion simulated: ✓\")\n",
" print(\"\\n💡 The refactored architecture makes testing easy!\")\n",
"\n",
" return True\n",
"\n",
"try:\n",
" run_integration_test()\n",
"except Exception as e:\n",
" print(f\"\\n❌ INTEGRATION TEST FAILED: {e}\")\n",
" import traceback\n",
" traceback.print_exc()"
],
"metadata": {
"id": "5MBAXKSW-9qp"
},
"id": "5MBAXKSW-9qp",
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"##Performance Test"
],
"metadata": {
"id": "zpaJTrOp_BdP"
},
"id": "zpaJTrOp_BdP"
},
{
"cell_type": "code",
"source": [
"\n",
"print(\"\\n\\n\" + \"=\" * 60)\n",
"print(\"PERFORMANCE TEST - Component Benchmarks\")\n",
"print(\"=\" * 60)\n",
"\n",
"import time\n",
"\n",
"def benchmark_component(name, func, *args, **kwargs):\n",
" \"\"\"Benchmark a component function.\"\"\"\n",
" start = time.time()\n",
" result = func(*args, **kwargs)\n",
" elapsed = time.time() - start\n",
" print(f\" {name}: {elapsed:.3f}s\")\n",
" return result, elapsed\n",
"\n",
"def run_performance_tests():\n",
" \"\"\"Run performance benchmarks.\"\"\"\n",
"\n",
" # Generate test data\n",
" print(\"\\n📊 Generating test data...\")\n",
" test_emails = [\n",
" Document(\n",
" page_content=f\"Subject: Test {i}\\nFrom: sender{i % 10}@example.com\\n\\n\" + \" \".join([\"word\"] * 100),\n",
" metadata={\n",
" 'message_id': f'perf_{i}',\n",
" 'subject': f'Test {i}',\n",
" 'sender': f'sender{i % 10}@example.com',\n",
" 'date': f'2025-10-{(i % 30) + 1:02d}'\n",
" }\n",
" )\n",
" for i in range(100)\n",
" ]\n",
" print(f\" ✓ Created {len(test_emails)} test emails\")\n",
"\n",
" # Benchmark EmailProcessor\n",
" print(\"\\n⏱ Benchmarking EmailProcessor...\")\n",
" processor = EmailProcessor()\n",
"\n",
" chunks, t1 = benchmark_component(\"Chunking\", processor.chunk_documents, test_emails)\n",
" stats, t2 = benchmark_component(\"Statistics\", processor.get_statistics, test_emails)\n",
" df, t3 = benchmark_component(\"DataFrame conversion\", processor.emails_to_dataframe, test_emails)\n",
"\n",
" # Benchmark VectorDatabaseManager\n",
" print(\"\\n⏱ Benchmarking VectorDatabaseManager...\")\n",
" vector_mgr = VectorDatabaseManager(db_name=\"test_perf_db\")\n",
"\n",
" emb, t4 = benchmark_component(\"Embedding creation\", vector_mgr.create_embeddings, \"bert\")\n",
" vs, t5 = benchmark_component(\"Vector store creation\", vector_mgr.create_vector_store, chunks[:50]) # Limit for speed\n",
"\n",
" # Cleanup\n",
" if os.path.exists(\"test_perf_db\"):\n",
" shutil.rmtree(\"test_perf_db\")\n",
"\n",
" print(\"\\n\" + \"=\" * 60)\n",
" print(\"✅ PERFORMANCE TEST COMPLETED\")\n",
" print(\"=\" * 60)\n",
" print(f\"\\n📈 Total time: {t1 + t2 + t3 + t4 + t5:.3f}s\")\n",
" print(f\" Fastest operation: DataFrame conversion ({t3:.3f}s)\")\n",
" print(f\" Slowest operation: Vector store creation ({t5:.3f}s)\")\n",
"\n",
"try:\n",
" run_performance_tests()\n",
"except Exception as e:\n",
" print(f\"\\n❌ PERFORMANCE TEST FAILED: {e}\")\n",
"\n"
],
"metadata": {
"id": "41w8FGJ9_CCU"
},
"id": "41w8FGJ9_CCU",
"execution_count": null,
"outputs": []
}
],
"metadata": {
"language_info": {
"name": "python"
},
"colab": {
"provenance": [],
"include_colab_link": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}