diff --git a/week5/community-contributions/dkisselev-zz/Week5_Excerise_EmailTerminator.ipynb b/week5/community-contributions/dkisselev-zz/Week5_Excerise_EmailTerminator.ipynb new file mode 100644 index 0000000..fded773 --- /dev/null +++ b/week5/community-contributions/dkisselev-zz/Week5_Excerise_EmailTerminator.ipynb @@ -0,0 +1,1911 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "view-in-github", + "colab_type": "text" + }, + "source": [ + "\"Open" + ] + }, + { + "cell_type": "markdown", + "source": [ + "# Gmail Terminator\n", + "\n", + "## An Intelligent Email Management System\n", + "\n", + "This application uses RAG (Retrieval Augmented Generation) and LLMs to analyze your Gmail inbox, identify important topics and interests, and help you safely delete unimportant emails with archiving.\n", + "\n", + "### Features:\n", + "- **IMAP Authentication**: Secure app-specific password authentication\n", + "- **Vector Embeddings**: OpenAI or BERT/HuggingFace models\n", + "- **Topic Analysis**: LLM-powered identification of your interests\n", + "- **Category Counts**: See breakdown of email categories\n", + "- **Chat-Based Topics Updates**: Use chat to find specific topics of interest\n", + "- **Selective Deletion**: Choose specific emails to delete with checkboxes\n", + "- **Safe Deletion**: Automatic archiving before deletion\n", + "- **Testing Mode**: Process limited emails with debug output\n", + "\n", + "### Architecture:\n", + "1. Connect to Gmail via IMAP\n", + "2. Fetch and parse emails\n", + "3. Chunk text and create embeddings\n", + "4. Store vectors in ChromaDB\n", + "5. Use LLM to identify important topics\n", + "6. Classify emails as keep/delete\n", + "7. Select specific emails to delete\n", + "8. Archive and safely delete selected emails\n", + "\n", + "## Setup Instructions\n", + "\n", + "### IMAP with App-Specific Password\n", + "\n", + "1. **Enable 2-Factor Authentication** on your Google account (required for app passwords)\n", + "2. **Create App-Specific Password**\n", + " - Go to [Google Account Security](https://myaccount.google.com/security)\n", + " - Under \"2-Step Verification\", find \"App passwords\"\n", + " - Generate a new app password for \"Mail\"\n", + "3. **Store Credentials**\n", + " - **Google Colab**: Store as secrets named `EMAIL` and `IMAP_PASSWORD`\n", + " - **Local**: Add to `.env` file:\n", + " ```\n", + " EMAIL=your.email@gmail.com\n", + " IMAP_PASSWORD=your_16_char_app_password\n", + " ```\n", + "4. **Connect**: If credentials are stored, they will auto-populate in the UI" + ], + "metadata": { + "id": "ANmiUlCxG4Bh" + }, + "id": "ANmiUlCxG4Bh" + }, + { + "cell_type": "markdown", + "source": [ + "## Install and Setup" + ], + "metadata": { + "id": "NzQyA5qmu5fv" + }, + "id": "NzQyA5qmu5fv" + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6f9842a8", + "metadata": { + "id": "6f9842a8" + }, + "outputs": [], + "source": [ + "%pip install -U -q imapclient langchain langchain-openai langchain-chroma langchain-community langchain-core langchain-text-splitters langchain-huggingface chromadb sentence-transformers\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "737e1c9e", + "metadata": { + "id": "737e1c9e" + }, + "outputs": [], + "source": [ + "# Standard library imports\n", + "import os\n", + "import json\n", + "import base64\n", + "import zipfile\n", + "import shutil\n", + "from datetime import datetime\n", + "from collections import Counter\n", + "from typing import List, Dict, Optional, Tuple\n", + "from abc import ABC, abstractmethod\n", + "\n", + "# Third-party imports\n", + "import pandas as pd\n", + "import numpy as np\n", + "from tqdm import tqdm\n", + "from bs4 import BeautifulSoup\n", + "\n", + "# IMAP imports\n", + "import imaplib\n", + "import email\n", + "from email.header import decode_header\n", + "\n", + "# LangChain v1.0+ imports\n", + "from langchain_core.documents import Document\n", + "from langchain_core.messages import HumanMessage\n", + "from langchain_text_splitters import CharacterTextSplitter\n", + "from langchain_openai import OpenAIEmbeddings, ChatOpenAI\n", + "from langchain_chroma import Chroma\n", + "from langchain_huggingface import HuggingFaceEmbeddings\n", + "from langchain_core.callbacks import StdOutCallbackHandler\n", + "\n", + "# LLM APIs\n", + "from openai import OpenAI\n", + "\n", + "# HuggingFace\n", + "from huggingface_hub import login\n", + "\n", + "# Gradio\n", + "import gradio as gr\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "191dc787", + "metadata": { + "id": "191dc787" + }, + "outputs": [], + "source": [ + "def setup_api_keys():\n", + " try:\n", + " # Try Colab environment first\n", + " from google.colab import userdata\n", + " api_keys = {\n", + " 'openai': userdata.get('OPENAI_API_KEY'),\n", + " 'anthropic': userdata.get('ANTHROPIC_API_KEY'),\n", + " 'google': userdata.get('GOOGLE_API_KEY'),\n", + " 'hf_token': userdata.get('HF_TOKEN')\n", + " }\n", + " email = userdata.get('EMAIL')\n", + " password = userdata.get('IMAP_PASSWORD')\n", + " print(\"✅ Using Colab secrets\")\n", + " except:\n", + " # Fallback to local environment\n", + " from dotenv import load_dotenv\n", + " load_dotenv()\n", + " api_keys = {\n", + " 'openai': os.getenv('OPENAI_API_KEY'),\n", + " 'anthropic': os.getenv('ANTHROPIC_API_KEY'),\n", + " 'google': os.getenv('GOOGLE_API_KEY'),\n", + " 'hf_token': os.getenv('HF_TOKEN')\n", + " }\n", + "\n", + " email = os.getenv('EMAIL', '')\n", + " password = os.getenv('IMAP_PASSWORD', '')\n", + " print(\"✅ Using local .env file\")\n", + "\n", + " # Initialize API clients\n", + " anthropic_url = \"https://api.anthropic.com/v1/\"\n", + " gemini_url = \"https://generativelanguage.googleapis.com/v1beta/openai/\"\n", + "\n", + " clients = {}\n", + " if api_keys['openai']:\n", + " clients['openai'] = OpenAI(api_key=api_keys['openai'])\n", + " if api_keys['anthropic']:\n", + " clients['anthropic'] = OpenAI(api_key=api_keys['anthropic'], base_url=anthropic_url)\n", + " if api_keys['google']:\n", + " clients['google'] = OpenAI(api_key=api_keys['google'], base_url=gemini_url)\n", + " if api_keys['hf_token']:\n", + " login(api_keys['hf_token'])\n", + "\n", + " os.environ['OPENAI_API_KEY'] = api_keys['openai']\n", + " os.environ['ANTHROPIC_API_KEY'] = api_keys['anthropic']\n", + " os.environ['GOOGLE_API_KEY'] = api_keys['google']\n", + "\n", + " return api_keys, clients, email, password\n", + "\n", + "# Initialize API keys and clients\n", + "api_keys, clients, default_email, default_password = setup_api_keys()\n", + "\n", + "# Constants\n", + "MODEL_OPENAI = \"gpt-4o-mini\"\n", + "MODEL_GEMINI = \"gemini-2.5-pro\"\n", + "DB_NAME = \"email_vector_db\"\n" + ] + }, + { + "cell_type": "markdown", + "source": [ + "##Helper Functions" + ], + "metadata": { + "id": "hUiNY8_I8ac0" + }, + "id": "hUiNY8_I8ac0" + }, + { + "cell_type": "code", + "source": [ + "def get_header_value(headers, name):\n", + " \"\"\"Get header value from email headers.\"\"\"\n", + " for header in headers:\n", + " if header['name'].lower() == name.lower():\n", + " return header['value']\n", + " return \"\"" + ], + "metadata": { + "id": "Y4MjoYtb8b4i" + }, + "id": "Y4MjoYtb8b4i", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "##Gmail Connection Classes" + ], + "metadata": { + "id": "g7F4Xgw98jec" + }, + "id": "g7F4Xgw98jec" + }, + { + "cell_type": "code", + "source": [ + "class GmailConnection(ABC):\n", + " \"\"\"Abstract base class for Gmail connections.\"\"\"\n", + "\n", + " def __init__(self):\n", + " self.connection = None\n", + " self.auth_info = None\n", + "\n", + " @abstractmethod\n", + " def connect(self) -> bool:\n", + " pass\n", + "\n", + " def fetch_emails(self, max_emails: Optional[int] = None) -> Tuple[List[Document], str]:\n", + " \"\"\"Fetch emails. Returns (documents, diagnostic_message).\"\"\"\n", + " pass\n", + "\n", + " @abstractmethod\n", + " def delete_emails(self, documents: List[Document]) -> Tuple[int, int]:\n", + " pass\n", + "\n", + " def get_auth_info(self) -> Dict:\n", + " return self.auth_info\n", + "\n", + " def is_connected(self) -> bool:\n", + " return self.connection is not None\n", + "\n", + "\n", + "class IMAPConnection(GmailConnection):\n", + " \"\"\"IMAP Gmail connection.\n", + "\n", + " IMPORTANT: For proper email deletion with Gmail IMAP, configure these settings:\n", + " 1. Go to Gmail Settings → Forwarding and POP/IMAP tab\n", + " 2. Under \"When I mark a message in IMAP as deleted\":\n", + " - Set to \"Auto-Expunge off - Wait for the client to update the server\"\n", + " 3. Under \"When a message is marked as deleted and expunged from the last visible IMAP folder\":\n", + " - Select \"Move the message to the Trash\"\n", + " 4. Make sure \"Trash\" label is set to \"Show in IMAP\" under Labels settings\n", + "\n", + " This ensures deleted emails are properly moved to Trash when expunged.\n", + " \"\"\"\n", + "\n", + " def __init__(self, email_address: str, app_password: str):\n", + " super().__init__()\n", + " self.email_address = email_address\n", + " self.app_password = app_password\n", + "\n", + " def connect(self) -> bool:\n", + " \"\"\"Authenticate with Gmail using IMAP.\"\"\"\n", + " try:\n", + " imaplib._MAXLINE = 10000000 # 10MB\n", + "\n", + " self.connection = imaplib.IMAP4_SSL(\"imap.gmail.com\", 993)\n", + " self.connection.login(self.email_address, self.app_password)\n", + "\n", + " status, messages = self.connection.select(\"INBOX\")\n", + " if status == \"OK\":\n", + " self.auth_info = {\n", + " 'email': self.email_address,\n", + " 'total_messages': int(messages[0]),\n", + " 'auth_method': 'IMAP'\n", + " }\n", + "\n", + " print(f\"✓ IMAP connected as: {self.email_address}\")\n", + " print(f\"✓ Total messages in INBOX: {self.auth_info['total_messages']:,}\")\n", + " return True\n", + " else:\n", + " print(f\"❌ Failed to select INBOX: {status}\")\n", + " return False\n", + "\n", + " except Exception as e:\n", + " print(f\"❌ IMAP authentication failed: {e}\")\n", + " print(\"Make sure you're using an app-specific password.\")\n", + " return False\n", + "\n", + " def fetch_emails(self, max_emails: Optional[int] = None) -> Tuple[List[Document], str]:\n", + " \"\"\"Fetch emails using IMAP with UIDs. Returns (documents, diagnostic_message).\"\"\"\n", + " if not self.connection:\n", + " raise RuntimeError(\"Not connected. Call connect() first.\")\n", + "\n", + " diagnostics = [] # Capture diagnostic messages\n", + "\n", + " try:\n", + " self.connection.select(\"INBOX\")\n", + "\n", + " status, messages = self.connection.uid('search', None, \"ALL\")\n", + "\n", + " if status != \"OK\":\n", + " msg = f\"❌ Search failed with status: {status}\"\n", + " diagnostics.append(msg)\n", + " return [], \"\\n\".join(diagnostics)\n", + "\n", + " msg_uids = messages[0].split()\n", + " diagnostics.append(f\"✓ Found {len(msg_uids)} message UIDs\")\n", + "\n", + " if not msg_uids:\n", + " diagnostics.append(\"❌ No message UIDs returned from search\")\n", + " return [], \"\\n\".join(diagnostics)\n", + "\n", + " if max_emails:\n", + " msg_uids = msg_uids[-max_emails:] # Get most recent\n", + " diagnostics.append(f\" → Limited to {len(msg_uids)} most recent emails\")\n", + "\n", + " diagnostics.append(f\"Fetching {len(msg_uids)} emails...\")\n", + " documents = []\n", + " errors = []\n", + "\n", + " for uid in tqdm(msg_uids, desc=\"Processing emails\"):\n", + " try:\n", + " # Fetch using UID to get both UID and the email content\n", + " status, msg_data = self.connection.uid('fetch', uid, \"(RFC822)\")\n", + " if status != \"OK\":\n", + " errors.append(f\"Fetch failed for UID {uid}: {status}\")\n", + " continue\n", + "\n", + " # Check if msg_data is valid\n", + " if not msg_data or not msg_data[0] or len(msg_data[0]) < 2:\n", + " errors.append(f\"Invalid msg_data for UID {uid}\")\n", + " continue\n", + "\n", + " email_message = email.message_from_bytes(msg_data[0][1])\n", + "\n", + " # Extract headers\n", + " subject = email_message.get(\"Subject\", \"\")\n", + " if subject:\n", + " decoded = decode_header(subject)[0]\n", + " if isinstance(decoded[0], bytes):\n", + " subject = decoded[0].decode(decoded[1] or 'utf-8', errors='ignore')\n", + " else:\n", + " subject = decoded[0]\n", + "\n", + " sender = email_message.get(\"From\", \"\")\n", + " recipient = email_message.get(\"To\", \"\")\n", + " date_str = email_message.get(\"Date\", \"\")\n", + "\n", + " # Extract body\n", + " body = \"\"\n", + " if email_message.is_multipart():\n", + " for part in email_message.walk():\n", + " if part.get_content_type() == \"text/plain\":\n", + " try:\n", + " payload = part.get_payload(decode=True)\n", + " if payload:\n", + " body = payload.decode('utf-8', errors='ignore')\n", + " break\n", + " except Exception as e:\n", + " continue\n", + " elif part.get_content_type() == \"text/html\" and not body:\n", + " try:\n", + " payload = part.get_payload(decode=True)\n", + " if payload:\n", + " html = payload.decode('utf-8', errors='ignore')\n", + " body = BeautifulSoup(html, 'html.parser').get_text()\n", + " except Exception as e:\n", + " continue\n", + " else:\n", + " try:\n", + " payload = email_message.get_payload(decode=True)\n", + " if payload:\n", + " body = payload.decode('utf-8', errors='ignore')\n", + " if email_message.get_content_type() == \"text/html\":\n", + " body = BeautifulSoup(body, 'html.parser').get_text()\n", + " else:\n", + " # Try without decoding for plain text\n", + " body = str(email_message.get_payload())\n", + " except Exception as e:\n", + " # Last resort: use subject as body\n", + " body = \"\"\n", + "\n", + " # Clean whitespace\n", + " if body:\n", + " body = ' '.join(body.split())\n", + "\n", + " # Use subject if body is empty or too short\n", + " if not body or len(body) < 10:\n", + " body = subject or \"No content\"\n", + "\n", + " content = f\"Subject: {subject}\\nFrom: {sender}\\nTo: {recipient}\\nDate: {date_str}\\n\\n{body}\"\n", + "\n", + " doc = Document(\n", + " page_content=content,\n", + " metadata={\n", + " 'uid': uid.decode(),\n", + " 'message_id': uid.decode(),\n", + " 'subject': subject,\n", + " 'sender': sender,\n", + " 'recipient': recipient,\n", + " 'date': date_str,\n", + " 'source': 'gmail_imap'\n", + " }\n", + " )\n", + " documents.append(doc)\n", + "\n", + " except Exception as e:\n", + " errors.append(f\"Error processing UID {uid}: {str(e)}\")\n", + " continue\n", + "\n", + " diagnostics.append(f\"✓ Successfully fetched {len(documents)} emails out of {len(msg_uids)} attempted\")\n", + "\n", + " if errors:\n", + " diagnostics.append(f\"\\n⚠️ Encountered {len(errors)} errors:\")\n", + " # Show first 5 errors\n", + " for err in errors[:5]:\n", + " diagnostics.append(f\" • {err}\")\n", + " if len(errors) > 5:\n", + " diagnostics.append(f\" ... and {len(errors) - 5} more errors\")\n", + "\n", + " if len(documents) == 0 and len(msg_uids) > 0:\n", + " diagnostics.append(\"\\n⚠️ WARNING: No documents created despite having UIDs\")\n", + "\n", + " return documents, \"\\n\".join(diagnostics)\n", + "\n", + " except Exception as error:\n", + " diagnostics.append(f\"❌ Fetch error: {error}\")\n", + " import traceback\n", + " diagnostics.append(f\"\\nTraceback:\\n{traceback.format_exc()}\")\n", + " return [], \"\\n\".join(diagnostics)\n", + "\n", + " def delete_emails(self, documents: List[Document]) -> Tuple[int, int]:\n", + " \"\"\"Delete emails using IMAP with proper UID handling for Gmail.\n", + "\n", + " This method works with Gmail's \"Auto-Expunge off\" setting by:\n", + " 1. Using UIDs instead of sequence numbers for reliable identification\n", + " 2. Marking emails with \\\\Deleted flag\n", + " 3. Explicitly calling EXPUNGE to permanently remove them\n", + " 4. Moving emails to [Gmail]/Trash (Gmail's default behavior)\n", + " \"\"\"\n", + " if not self.connection:\n", + " raise RuntimeError(\"Not connected. Call connect() first.\")\n", + "\n", + " if not documents:\n", + " return 0, 0\n", + "\n", + " successful, failed = 0, 0\n", + " print(f\"Deleting {len(documents)} emails via IMAP...\")\n", + "\n", + " try:\n", + " # Select INBOX in read-write mode (default)\n", + " status, response = self.connection.select(\"INBOX\")\n", + " if status != \"OK\":\n", + " print(f\"❌ Failed to select INBOX: {response}\")\n", + " return 0, len(documents)\n", + "\n", + " for doc in tqdm(documents, desc=\"Marking emails for deletion\"):\n", + " # Try to get UID first, fall back to message_id\n", + " uid = doc.metadata.get('uid') or doc.metadata.get('message_id')\n", + " if not uid:\n", + " print(f\"⚠️ No UID found for email: {doc.metadata.get('subject', 'Unknown')}\")\n", + " failed += 1\n", + " continue\n", + "\n", + " try:\n", + " # Convert to bytes if it's a string\n", + " if isinstance(uid, str):\n", + " uid = uid.encode()\n", + "\n", + " # Use UID STORE to mark the email as deleted\n", + " # This is more reliable than using sequence numbers\n", + " status, response = self.connection.uid('STORE', uid, '+FLAGS', '(\\\\Deleted)')\n", + "\n", + " if status == \"OK\":\n", + " successful += 1\n", + " else:\n", + " print(f\"⚠️ Failed to mark UID {uid.decode()}: {response}\")\n", + " failed += 1\n", + "\n", + " except Exception as e:\n", + " print(f\"❌ Error deleting UID {uid}: {e}\")\n", + " failed += 1\n", + "\n", + " # Expunge to permanently delete all messages marked as \\\\Deleted\n", + " # With Gmail's \"Auto-Expunge off\", this command is required\n", + " print(f\"\\n📤 Expunging {successful} deleted emails...\")\n", + " try:\n", + " status, response = self.connection.expunge()\n", + " if status == \"OK\":\n", + " print(f\"✓ Expunge successful: {response}\")\n", + " else:\n", + " print(f\"⚠️ Expunge response: {status} - {response}\")\n", + " except Exception as e:\n", + " print(f\"❌ Expunge error: {e}\")\n", + "\n", + " # Close and reselect to ensure changes are committed\n", + " try:\n", + " self.connection.close()\n", + " self.connection.select(\"INBOX\")\n", + " except:\n", + " pass # Not critical if this fails\n", + "\n", + " print(f\"\\n✓ Deletion complete: {successful} successful, {failed} failed\")\n", + " if successful > 0:\n", + " print(f\"ℹ️ With Gmail's settings, deleted emails should appear in [Gmail]/Trash\")\n", + "\n", + " return successful, failed\n", + "\n", + " except Exception as error:\n", + " print(f\"❌ Delete operation error: {error}\")\n", + " return successful, failed\n", + "\n", + "\n", + "def create_gmail_connection(email: str, password: str) -> GmailConnection:\n", + " \"\"\"Factory function to create Gmail connection.\"\"\"\n", + " if not email or not password:\n", + " raise ValueError(\"Email and password required for IMAP\")\n", + " return IMAPConnection(email, password)" + ], + "metadata": { + "id": "Mv4m2UqV8i-b" + }, + "id": "Mv4m2UqV8i-b", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "##Vector Database Manager" + ], + "metadata": { + "id": "WI1_7UiU8iy3" + }, + "id": "WI1_7UiU8iy3" + }, + { + "cell_type": "code", + "source": [ + "class VectorDatabaseManager:\n", + " \"\"\"Manages vector database operations for email embeddings.\"\"\"\n", + "\n", + " def __init__(self, db_name: str = DB_NAME):\n", + " self.db_name = db_name\n", + " self.vectorstore = None\n", + " self.embeddings = None\n", + "\n", + " def create_embeddings(self, model_type: str = \"openai\"):\n", + " \"\"\"Create embedding function based on model type.\"\"\"\n", + " if model_type.lower() == \"openai\":\n", + " print(\"Using OpenAI embeddings...\")\n", + " self.embeddings = OpenAIEmbeddings()\n", + " elif model_type.lower() == \"bert\":\n", + " print(\"Using BERT (HuggingFace) embeddings...\")\n", + " self.embeddings = HuggingFaceEmbeddings(\n", + " model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n", + " )\n", + " else:\n", + " raise ValueError(f\"Unknown model type: {model_type}. Use 'openai' or 'bert'.\")\n", + "\n", + " return self.embeddings\n", + "\n", + " def create_vector_store(self, chunks: List[Document], recreate: bool = True):\n", + " \"\"\"Chroma vector store from document chunks.\"\"\"\n", + " if not self.embeddings:\n", + " raise RuntimeError(\"Call create_embeddings() first\")\n", + "\n", + " if recreate and os.path.exists(self.db_name):\n", + " print(f\"Deleting existing database: {self.db_name}\")\n", + " try:\n", + " Chroma(persist_directory=self.db_name, embedding_function=self.embeddings).delete_collection()\n", + " except:\n", + " pass\n", + "\n", + " print(f\"Creating vector store with {len(chunks)} chunks\")\n", + " self.vectorstore = Chroma.from_documents(\n", + " documents=chunks,\n", + " embedding=self.embeddings,\n", + " persist_directory=self.db_name\n", + " )\n", + "\n", + " count = self.vectorstore._collection.count()\n", + " print(f\"Vector store created with {count:,} documents\")\n", + "\n", + " return self.vectorstore\n", + "\n", + " def load_vector_store(self):\n", + " \"\"\"Load existing Chroma vector store.\"\"\"\n", + " if not self.embeddings:\n", + " raise RuntimeError(\"Call create_embeddings() first\")\n", + "\n", + " if not os.path.exists(self.db_name):\n", + " raise FileNotFoundError(f\"Vector store not found: {self.db_name}\")\n", + "\n", + " self.vectorstore = Chroma(\n", + " persist_directory=self.db_name,\n", + " embedding_function=self.embeddings\n", + " )\n", + "\n", + " count = self.vectorstore._collection.count()\n", + " print(f\"Loaded vector store with {count:,} documents\")\n", + "\n", + " return self.vectorstore\n", + "\n", + " def get_vectorstore(self):\n", + " \"\"\"Get the vectorstore instance.\"\"\"\n", + " return self.vectorstore" + ], + "metadata": { + "id": "R1S1CEwf9VF7" + }, + "id": "R1S1CEwf9VF7", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Email Processor" + ], + "metadata": { + "id": "LWIukSSu9vl_" + }, + "id": "LWIukSSu9vl_" + }, + { + "cell_type": "code", + "source": [ + "class EmailProcessor:\n", + " \"\"\"Email processor\"\"\"\n", + "\n", + " def __init__(self):\n", + " self.documents = []\n", + " self.chunks = []\n", + " self.llm = None\n", + " self.topics = \"\"\n", + " self.classified_emails = {'keep': [], 'delete': []}\n", + " self.topic_to_emails = {}\n", + " self.email_to_topic = {}\n", + "\n", + " def chunk_documents(self, documents: List[Document], chunk_size: int = 1000, chunk_overlap: int = 200):\n", + " \"\"\"Chunk email documents.\"\"\"\n", + " text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)\n", + "\n", + " self.documents = documents\n", + " self.chunks = text_splitter.split_documents(documents)\n", + " print(f\"Created {len(self.chunks)} chunks from {len(documents)} documents\")\n", + " return self.chunks\n", + "\n", + " def get_statistics(self, documents: List[Document]) -> Dict:\n", + " \"\"\"Calculate statistics.\"\"\"\n", + " if not documents:\n", + " return {}\n", + "\n", + " senders = [doc.metadata.get('sender', '') for doc in documents]\n", + " total_chars = sum(len(doc.page_content) for doc in documents)\n", + "\n", + " return {\n", + " 'total_emails': len(documents),\n", + " 'total_chars': total_chars,\n", + " 'avg_email_length': total_chars // len(documents),\n", + " 'unique_senders': len(set(senders)),\n", + " 'top_senders': Counter(senders).most_common(10)\n", + " }\n", + "\n", + " def create_llm(self, model_type: str = \"openai\", temperature: float = 0.7, debug: bool = False):\n", + " \"\"\"Create LLM instance.\"\"\"\n", + " callbacks = [StdOutCallbackHandler()] if debug else []\n", + "\n", + " if model_type.lower() == \"openai\":\n", + " self.llm = ChatOpenAI(\n", + " temperature=temperature,\n", + " model_name=MODEL_OPENAI,\n", + " callbacks=callbacks\n", + " )\n", + " else:\n", + " self.llm = ChatOpenAI(temperature=temperature, model_name=MODEL_OPENAI)\n", + "\n", + " return self.llm\n", + "\n", + " def analyze_personal_interests(self, documents: List[Document]) -> str:\n", + " \"\"\"Analyze personal interests using LLM.\"\"\"\n", + " if not self.llm:\n", + " raise RuntimeError(\"Call create_llm() first\")\n", + "\n", + " prompt = self._generate_topics_prompt(documents)\n", + " response = self.llm.invoke([HumanMessage(content=prompt)])\n", + " self.topics = response.content\n", + " return self.topics\n", + "\n", + " def _generate_topics_prompt(self, documents: List[Document], user_context: Optional[str] = None) -> str:\n", + " \"\"\"Generate LLM prompt for topic identification.\"\"\"\n", + " senders = [doc.metadata.get('sender', '') for doc in documents]\n", + " subjects = [doc.metadata.get('subject', '') for doc in documents]\n", + " sender_counts = Counter(senders).most_common(20)\n", + "\n", + " context_line = f'Based on the user\\'s query: \"{user_context}\"\\n\\n' if user_context else \"\"\n", + "\n", + " prompt = f\"\"\"\n", + "{context_line}I have {len(documents)} emails. Analyze and identify 5-10 important topics/categories.\n", + "\n", + "Top senders:\n", + "{chr(10).join([f\"- {sender}: {count}\" for sender, count in sender_counts])}\n", + "\n", + "Sample subjects (first 30):\n", + "{chr(10).join([f\"- {subj}\" for subj in subjects[:30]])}\n", + "\n", + "IMPORTANT: Format your response as a simple numbered list with ONLY the topic names, one per line.\n", + "Do NOT use markdown formatting (**, *, etc.).\n", + "Do NOT add descriptions or explanations after the topic name.\n", + "Do NOT add blank lines between topics.\n", + "\n", + "Example format:\n", + "1. Work Projects\n", + "2. Family Communications\n", + "3. Professional Development\n", + "\"\"\"\n", + "\n", + " if user_context:\n", + " prompt += f\"\\n\\nYour response should list topics that align with the user's query about: {user_context}\"\n", + "\n", + " return prompt\n", + "\n", + " def extract_topics_from_text(self, topics_text: str) -> List[str]:\n", + " \"\"\"Extract topic list from LLM-generated topics text.\"\"\"\n", + " topics = []\n", + " lines = topics_text.strip().split('\\n')\n", + "\n", + " for line in lines:\n", + " line = line.strip()\n", + "\n", + " # Skip empty lines\n", + " if not line or len(line) < 3:\n", + " continue\n", + "\n", + " # Skip lines that are clearly descriptions (start with lowercase, or too long)\n", + " if line[0].islower() or line.startswith(('Emails', 'Topics', 'Information', 'Communications', 'Offers')):\n", + " continue\n", + "\n", + " # Remove markdown formatting (**, *, _)\n", + " line = line.replace('**', '').replace('*', '').replace('_', '')\n", + "\n", + " # Remove numbering and bullet points\n", + " if line and line[0].isdigit():\n", + " # Remove \"1.\" or \"1)\"\n", + " parts = line.split('.', 1)\n", + " if len(parts) > 1:\n", + " line = parts[1].strip()\n", + " else:\n", + " parts = line.split(')', 1)\n", + " if len(parts) > 1:\n", + " line = parts[1].strip()\n", + " elif line.startswith(('-', '•')):\n", + " line = line[1:].strip()\n", + "\n", + " # Take only the topic name (before any dash or colon describing it)\n", + " if ' - ' in line:\n", + " topic = line.split(' - ')[0].strip()\n", + " elif ':' in line:\n", + " topic = line.split(':')[0].strip()\n", + " else:\n", + " topic = line.strip()\n", + "\n", + " # Validate: reasonable length for a topic name (not a full sentence/description)\n", + " # Topic names should be between 5-60 characters\n", + " if topic and 5 < len(topic) < 60 and not topic.lower().startswith('based on'):\n", + " topics.append(topic)\n", + "\n", + " return topics[:10] # Limit to top 10 topics\n", + "\n", + " def categorize_emails_by_topics(self, documents: List[Document], vectorstore) -> Dict[str, List[Document]]:\n", + " \"\"\"Categorize emails by matching them to identified topics using RAG.\"\"\"\n", + " if not self.topics or not vectorstore:\n", + " return {}\n", + "\n", + " # Extract topic list from the topics text\n", + " topic_list = self.extract_topics_from_text(self.topics)\n", + "\n", + " if not topic_list:\n", + " return {}\n", + "\n", + " # For each topic, find matching emails using vector similarity\n", + " topic_to_emails = {topic: [] for topic in topic_list}\n", + " topic_to_emails['Uncategorized'] = []\n", + "\n", + " # Track which emails have been matched to which topic\n", + " matched_email_ids = set()\n", + " email_to_topic = {} # Map message_id to topic name\n", + "\n", + " retriever = vectorstore.as_retriever(search_kwargs={\"k\": len(documents)})\n", + "\n", + " for topic in topic_list:\n", + " # Query vectorstore for emails matching this topic\n", + " query = f\"Emails about: {topic}\"\n", + " relevant_docs = retriever.invoke(query)\n", + "\n", + " # Take top matches (based on proportion of total emails - ~15% per topic)\n", + " num_matches = max(1, int(len(documents) * 0.15))\n", + "\n", + " for doc in relevant_docs[:num_matches]:\n", + " msg_id = doc.metadata.get('message_id')\n", + " if msg_id and msg_id not in matched_email_ids:\n", + " # Find the original document\n", + " original_doc = next((d for d in documents if d.metadata.get('message_id') == msg_id), None)\n", + " if original_doc:\n", + " topic_to_emails[topic].append(original_doc)\n", + " matched_email_ids.add(msg_id)\n", + " email_to_topic[msg_id] = topic\n", + "\n", + " # Add uncategorized emails\n", + " for doc in documents:\n", + " msg_id = doc.metadata.get('message_id')\n", + " if msg_id not in matched_email_ids:\n", + " topic_to_emails['Uncategorized'].append(doc)\n", + " email_to_topic[msg_id] = 'Uncategorized'\n", + "\n", + " # Store the mapping for use in dataframe creation\n", + " self.email_to_topic = email_to_topic\n", + "\n", + " return topic_to_emails\n", + "\n", + " def get_topic_counts_display(self, documents: List[Document], vectorstore) -> str:\n", + " \"\"\"Get formatted topic counts for display.\"\"\"\n", + " if not self.topics or not vectorstore:\n", + " return \"No topics identified yet.\"\n", + "\n", + " topic_to_emails = self.categorize_emails_by_topics(documents, vectorstore)\n", + "\n", + " counts_text = \"Email Counts by Identified Topic:\\n\\n\"\n", + "\n", + " # Sort by count, descending\n", + " sorted_topics = sorted(topic_to_emails.items(), key=lambda x: len(x[1]), reverse=True)\n", + "\n", + " for topic, emails in sorted_topics:\n", + " count = len(emails)\n", + " if count > 0:\n", + " counts_text += f\" {topic}: {count} emails\\n\"\n", + "\n", + " total = sum(len(emails) for emails in topic_to_emails.values())\n", + " counts_text += f\"\\n Total: {total} emails\"\n", + "\n", + " return counts_text\n", + "\n", + " def classify_emails(self, documents: List[Document], vectorstore, threshold: float = 0.5):\n", + " \"\"\"Classify emails based on identified topics.\n", + "\n", + " Emails matching identified topics → KEEP\n", + " Emails not matching any topic → DELETE candidates\n", + " \"\"\"\n", + " if not self.topics:\n", + " raise RuntimeError(\"Call analyze_personal_interests() first\")\n", + "\n", + " # Categorize emails by topics\n", + " topic_to_emails = self.categorize_emails_by_topics(documents, vectorstore)\n", + "\n", + " # Emails matching topics are KEPT\n", + " keep_emails = []\n", + " for topic, emails in topic_to_emails.items():\n", + " if topic != 'Uncategorized':\n", + " keep_emails.extend(emails)\n", + "\n", + " # Uncategorized emails are DELETE candidates\n", + " delete_candidates = topic_to_emails.get('Uncategorized', [])\n", + "\n", + " # Store topic categorization for counts display\n", + " self.topic_to_emails = topic_to_emails\n", + "\n", + " self.classified_emails = {'keep': keep_emails, 'delete': delete_candidates}\n", + "\n", + " print(f\"Classification: {len(keep_emails)} keep, {len(delete_candidates)} delete\")\n", + " print(f\"Matched to {len([t for t in topic_to_emails.keys() if t != 'Uncategorized'])} topics\")\n", + " return self.classified_emails\n", + "\n", + " def create_archive(self, documents: List[Document], archive_name: Optional[str] = None) -> str:\n", + " \"\"\"Create ZIP archive of emails.\"\"\"\n", + " if not documents:\n", + " raise ValueError(\"No documents to archive\")\n", + "\n", + " if not archive_name:\n", + " timestamp = datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n", + " archive_name = f\"email_archive_{timestamp}.zip\"\n", + "\n", + " archive_dir = \"email_archive_temp\"\n", + " os.makedirs(archive_dir, exist_ok=True)\n", + "\n", + " for i, doc in enumerate(documents):\n", + " email_data = {'metadata': doc.metadata, 'content': doc.page_content}\n", + " subject = doc.metadata.get('subject', 'no_subject')[:50]\n", + " safe_subject = \"\".join(c for c in subject if c.isalnum() or c in (' ', '-', '_')).strip()\n", + " filename = f\"{i+1:04d}_{safe_subject}.json\"\n", + "\n", + " with open(os.path.join(archive_dir, filename), 'w', encoding='utf-8') as f:\n", + " json.dump(email_data, f, indent=2, ensure_ascii=False)\n", + "\n", + " # Create ZIP\n", + " with zipfile.ZipFile(archive_name, 'w', zipfile.ZIP_DEFLATED) as zipf:\n", + " for root, dirs, files in os.walk(archive_dir):\n", + " for file in files:\n", + " zipf.write(os.path.join(root, file), file)\n", + "\n", + " shutil.rmtree(archive_dir)\n", + " print(f\"Archive created: {archive_name}\")\n", + " return archive_name\n", + "\n", + " def emails_to_dataframe(self, documents: List[Document], add_select_column: bool = False) -> pd.DataFrame:\n", + " \"\"\"Convert to DataFrame with Topics column.\"\"\"\n", + " data = [\n", + " {\n", + " 'Topics': self.email_to_topic.get(doc.metadata.get('message_id', ''), 'Unknown'),\n", + " 'Message ID': doc.metadata.get('message_id', ''),\n", + " 'Subject': doc.metadata.get('subject', '')[:100],\n", + " 'Sender': doc.metadata.get('sender', ''),\n", + " 'Length': len(doc.page_content)\n", + " }\n", + " for doc in documents\n", + " ]\n", + " df = pd.DataFrame(data)\n", + "\n", + " if add_select_column:\n", + " # Add Select column as first column\n", + " df.insert(0, 'Select', False)\n", + "\n", + " return df" + ], + "metadata": { + "id": "7fUcjkI79vLa" + }, + "id": "7fUcjkI79vLa", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "##Application State" + ], + "metadata": { + "id": "VWqZZRLY94ST" + }, + "id": "VWqZZRLY94ST" + }, + { + "cell_type": "code", + "source": [ + "class AppState:\n", + " \"\"\"Global application state.\"\"\"\n", + " def __init__(self):\n", + " self.gmail_conn: Optional[GmailConnection] = None\n", + " self.vector_db_manager = VectorDatabaseManager()\n", + " self.email_processor = EmailProcessor()\n", + " self.testing_mode = False\n", + " self.debug_mode = False\n", + "\n", + "state = AppState()" + ], + "metadata": { + "id": "eHKPF6WB93WZ" + }, + "id": "eHKPF6WB93WZ", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "##Gradio Callback Functions" + ], + "metadata": { + "id": "yOCw1doE93LH" + }, + "id": "yOCw1doE93LH" + }, + { + "cell_type": "code", + "source": [ + "def connect_imap(email, password):\n", + " try:\n", + " state.gmail_conn = create_gmail_connection(email, password)\n", + " if state.gmail_conn.connect():\n", + " info = state.gmail_conn.get_auth_info()\n", + " return f\"Connected as {info['email']}\\nTotal messages: {info['total_messages']:,}\"\n", + " return \"❌ Authentication failed\"\n", + " except Exception as e:\n", + " return f\"❌ Error: {str(e)}\"\n", + "\n", + "\n", + "def connect_imap(email, password):\n", + " try:\n", + " state.gmail_conn = create_gmail_connection(email, password)\n", + " if state.gmail_conn.connect():\n", + " info = state.gmail_conn.get_auth_info()\n", + " return f\"Connected as {info['email']}\\nTotal messages: {info['total_messages']:,}\"\n", + " return \"❌ Authentication failed\"\n", + " except Exception as e:\n", + " return f\"❌ Error: {str(e)}\"\n", + "\n", + "\n", + "def fetch_and_process(testing_mode, embedding_model):\n", + " try:\n", + " if not state.gmail_conn or not state.gmail_conn.is_connected():\n", + " return \"❌ Not authenticated\"\n", + "\n", + " state.testing_mode = testing_mode\n", + " max_emails = 50 if testing_mode else None\n", + "\n", + " documents, fetch_diagnostics = state.gmail_conn.fetch_emails(max_emails)\n", + "\n", + " if not documents:\n", + " return f\"❌ No emails fetched\\n\\n{fetch_diagnostics}\"\n", + "\n", + " stats = state.email_processor.get_statistics(documents)\n", + " chunks = state.email_processor.chunk_documents(documents)\n", + "\n", + " state.vector_db_manager.create_embeddings(embedding_model)\n", + " state.vector_db_manager.create_vector_store(chunks)\n", + "\n", + " return f\"\"\"✓ Processing completed!\n", + "\n", + "{fetch_diagnostics}\n", + "\n", + "Total emails: {stats['total_emails']}\n", + "Chunks created: {len(chunks)}\n", + "Top 5 senders:\n", + "{chr(10).join([f\" - {sender}: {count}\" for sender, count in stats['top_senders'][:5]])}\n", + "\"\"\"\n", + " except Exception as e:\n", + " import traceback\n", + " return f\"❌ Error: {str(e)}\\n\\nTraceback:\\n{traceback.format_exc()}\"\n", + "\n", + "\n", + "def analyze_topics(llm_model, threshold):\n", + " try:\n", + " if not state.email_processor.documents:\n", + " return \"❌ No documents loaded\", \"\", None, None\n", + "\n", + " state.email_processor.create_llm(llm_model)\n", + " topics = state.email_processor.analyze_personal_interests(state.email_processor.documents)\n", + "\n", + " # Automatically classify after analysis\n", + " classified = state.email_processor.classify_emails(\n", + " state.email_processor.documents,\n", + " state.vector_db_manager.vectorstore,\n", + " threshold\n", + " )\n", + "\n", + " # Get topic counts after classification (shows which topics emails matched to)\n", + " counts_text = state.email_processor.get_topic_counts_display(\n", + " state.email_processor.documents,\n", + " state.vector_db_manager.vectorstore\n", + " )\n", + "\n", + " # Get the actual topics list that was used for categorization\n", + " topic_list = state.email_processor.extract_topics_from_text(topics)\n", + " formatted_topics = \"Identified Topics:\\n\\n\" + \"\\n\".join([f\"{i+1}. {topic}\" for i, topic in enumerate(topic_list)])\n", + "\n", + " keep_df = state.email_processor.emails_to_dataframe(classified['keep'], add_select_column=False)\n", + " delete_df = state.email_processor.emails_to_dataframe(classified['delete'], add_select_column=True)\n", + "\n", + " return formatted_topics, counts_text, keep_df, delete_df\n", + " except Exception as e:\n", + " return f\"❌ Error: {str(e)}\", \"\", None, None\n", + "\n", + "\n", + "def refine_topics_with_chat(chat_query, llm_model, threshold):\n", + " \"\"\"Use LLM to identify topics based on user query about their interests.\"\"\"\n", + " try:\n", + " if not state.email_processor.documents or not state.vector_db_manager.vectorstore:\n", + " return \"❌ Please process emails first\", \"\", None, None\n", + "\n", + " if not chat_query or chat_query.strip() == \"\":\n", + " return \"❌ Please enter a query\", \"\", None, None\n", + "\n", + " # Create LLM if needed\n", + " if not state.email_processor.llm:\n", + " state.email_processor.create_llm(llm_model)\n", + "\n", + " prompt = state.email_processor._generate_topics_prompt(\n", + " state.email_processor.documents,\n", + " user_context=chat_query\n", + " )\n", + "\n", + " response = state.email_processor.llm.invoke([HumanMessage(content=prompt)])\n", + " state.email_processor.topics = response.content\n", + "\n", + " # Automatically classify emails based on the new topics\n", + " classified = state.email_processor.classify_emails(\n", + " state.email_processor.documents,\n", + " state.vector_db_manager.vectorstore,\n", + " threshold\n", + " )\n", + "\n", + " # Get topic counts after classification\n", + " counts_text = state.email_processor.get_topic_counts_display(\n", + " state.email_processor.documents,\n", + " state.vector_db_manager.vectorstore\n", + " )\n", + "\n", + " # Get the actual topics list that was used for categorization\n", + " topic_list = state.email_processor.extract_topics_from_text(state.email_processor.topics)\n", + " formatted_topics = \"Identified Topics:\\n\\n\" + \"\\n\".join([f\"{i+1}. {topic}\" for i, topic in enumerate(topic_list)])\n", + "\n", + " keep_df = state.email_processor.emails_to_dataframe(classified['keep'], add_select_column=False)\n", + " delete_df = state.email_processor.emails_to_dataframe(classified['delete'], add_select_column=True)\n", + "\n", + " return formatted_topics, counts_text, keep_df, delete_df\n", + " except Exception as e:\n", + " return f\"❌ Error: {str(e)}\", \"\", None, None\n", + "\n", + "\n", + "def select_all_emails(delete_df):\n", + " \"\"\"Select all delete candidate emails.\"\"\"\n", + " if delete_df is None or len(delete_df) == 0:\n", + " return delete_df\n", + "\n", + " delete_df_copy = delete_df.copy()\n", + " delete_df_copy['Select'] = True\n", + " return delete_df_copy\n", + "\n", + "\n", + "def deselect_all_emails(delete_df):\n", + " \"\"\"Deselect all delete candidate emails.\"\"\"\n", + " if delete_df is None or len(delete_df) == 0:\n", + " return delete_df\n", + "\n", + " delete_df_copy = delete_df.copy()\n", + " delete_df_copy['Select'] = False\n", + " return delete_df_copy\n", + "\n", + "\n", + "def create_archive_file():\n", + " try:\n", + " if not state.email_processor.classified_emails['delete']:\n", + " return \"❌ No emails to archive\", None\n", + "\n", + " archive_path = state.email_processor.create_archive(\n", + " state.email_processor.classified_emails['delete']\n", + " )\n", + " return f\"✓ Archive created: {archive_path}\", archive_path\n", + " except Exception as e:\n", + " return f\"❌ Error: {str(e)}\", None\n", + "\n", + "\n", + "def perform_deletion(confirmation_text, delete_df):\n", + " try:\n", + " if confirmation_text.strip().upper() != \"DELETE\":\n", + " return \"❌ Confirmation failed. Type 'DELETE' to confirm.\"\n", + "\n", + " if delete_df is None or len(delete_df) == 0:\n", + " return \"❌ No emails available for deletion\"\n", + "\n", + " # Get selected emails\n", + " if 'Select' not in delete_df.columns:\n", + " return \"❌ Invalid dataframe format\"\n", + "\n", + " selected_rows = delete_df[delete_df['Select'] == True]\n", + " if len(selected_rows) == 0:\n", + " return \"❌ No emails selected for deletion\"\n", + "\n", + " # Get message IDs of selected emails\n", + " selected_ids = set(selected_rows['Message ID'].tolist())\n", + "\n", + " # Filter documents to only selected ones\n", + " selected_docs = [\n", + " doc for doc in state.email_processor.classified_emails['delete']\n", + " if doc.metadata.get('message_id') in selected_ids\n", + " ]\n", + "\n", + " if not state.gmail_conn:\n", + " return \"❌ Not authenticated\"\n", + "\n", + " success, failed = state.gmail_conn.delete_emails(selected_docs)\n", + "\n", + " return f\"Deletion complete:\\n - Deleted: {success}\\n - Failed: {failed}\\n - Skipped: {len(state.email_processor.classified_emails['delete']) - len(selected_docs)}\"\n", + " except Exception as e:\n", + " return f\"❌ Error: {str(e)}\"" + ], + "metadata": { + "id": "2toGS3_z-dSE" + }, + "id": "2toGS3_z-dSE", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "##Gradio Interface" + ], + "metadata": { + "id": "ja-oFdo8-h6b" + }, + "id": "ja-oFdo8-h6b" + }, + { + "cell_type": "code", + "source": [ + "with gr.Blocks(title=\"Gmail Inbox Terminator\", theme=gr.themes.Soft()) as app:\n", + " gr.Markdown(\"# 🔥 Gmail Inbox Terminator\")\n", + " gr.Markdown(\"### Intelligent Email Management with AI\")\n", + " gr.Markdown(\"Identify important topics, then delete emails OUTSIDE those topics.\")\n", + "\n", + " with gr.Tabs():\n", + " # Tab 1: Connection\n", + " with gr.Tab(\"🔌 Connection\"):\n", + " gr.Markdown(\"## Connect to Gmail via IMAP\")\n", + "\n", + " if default_email and default_password:\n", + " gr.Markdown(\"\"\"\n", + "**✅ Credentials loaded**\n", + "\n", + "Use pre-filled credentials or enter different ones.\n", + "\"\"\")\n", + " else:\n", + " gr.Markdown(\"\"\"\n", + "**Requirements:**\n", + "1. Enable 2-Factor Authentication on your Google account\n", + "2. Create an app-specific password at [Google Account Security](https://myaccount.google.com/security)\n", + "3. Use the app password below (not your regular password)\n", + "\"\"\")\n", + "\n", + " with gr.Row():\n", + " imap_email = gr.Textbox(\n", + " label=\"Email Address\",\n", + " placeholder=\"your.email@gmail.com\",\n", + " value=default_email\n", + " )\n", + " imap_password = gr.Textbox(\n", + " label=\"App Password\",\n", + " type=\"password\",\n", + " placeholder=\"16-character app password\",\n", + " value=default_password\n", + " )\n", + "\n", + " imap_btn = gr.Button(\"Connect\", variant=\"primary\")\n", + " imap_status = gr.Textbox(label=\"Connection Status\", lines=3)\n", + "\n", + " gr.Markdown(\"---\")\n", + " gr.Markdown(\"## Process Emails\")\n", + "\n", + " with gr.Row():\n", + " testing_mode_check = gr.Checkbox(label=\"Testing Mode (50 emails only)\", value=True)\n", + " embedding_dropdown = gr.Dropdown(\n", + " choices=[\"openai\", \"bert\"],\n", + " value=\"openai\",\n", + " label=\"Embedding Model\"\n", + " )\n", + "\n", + " process_btn = gr.Button(\"📥 Fetch and Process Emails\", variant=\"primary\", size=\"lg\")\n", + " process_status = gr.Textbox(label=\"Processing Status\", lines=10)\n", + "\n", + " imap_btn.click(connect_imap, inputs=[imap_email, imap_password], outputs=imap_status)\n", + " process_btn.click(\n", + " fetch_and_process,\n", + " inputs=[testing_mode_check, embedding_dropdown],\n", + " outputs=process_status\n", + " )\n", + "\n", + " # Tab 2: Topic Analysis & Configuration\n", + " with gr.Tab(\"🔍 Topic Analysis & Configuration\"):\n", + " gr.Markdown(\"## a) Configuration\")\n", + "\n", + " with gr.Row():\n", + " llm_dropdown = gr.Dropdown(\n", + " choices=[\"openai\", \"gemini\"],\n", + " value=\"openai\",\n", + " label=\"LLM Model\"\n", + " )\n", + "\n", + " classification_threshold = gr.Slider(\n", + " minimum=0.1,\n", + " maximum=0.9,\n", + " value=0.5,\n", + " step=0.1,\n", + " label=\"Relevance Threshold (higher = more strict, fewer kept)\"\n", + " )\n", + "\n", + " gr.Markdown(\"---\")\n", + " gr.Markdown(\"## b) Interest Analysis\")\n", + " gr.Markdown(\"Identify topics that are IMPORTANT to you. Emails matching these topics will be KEPT, others offered for deletion.\")\n", + "\n", + " analyze_btn = gr.Button(\"🤖 Identify My Interests\", variant=\"primary\", size=\"lg\")\n", + " topics_output = gr.Textbox(label=\"Important Topics\", lines=10)\n", + " counts_output = gr.Textbox(label=\"Category Counts\", lines=8)\n", + "\n", + " gr.Markdown(\"---\")\n", + " gr.Markdown(\"### Refine Topics with LLM Query\")\n", + " gr.Markdown(\"Ask the LLM to identify specific topics based on your interests. Results replace topics above.\")\n", + "\n", + " with gr.Row():\n", + " chat_query_input = gr.Textbox(\n", + " label=\"Query about your interests\",\n", + " placeholder=\"e.g., 'What are my most important professional topics?'\",\n", + " scale=3\n", + " )\n", + " chat_submit_btn = gr.Button(\"Submit Query\", variant=\"secondary\", scale=1)\n", + "\n", + " gr.Markdown(\"\"\"\n", + "**Example queries:**\n", + "- \"What are my most important professional topics?\"\n", + "- \"Identify topics related to family and personal life\"\n", + "- \"What work-related topics should I keep?\"\n", + "\"\"\")\n", + "\n", + " # Tab 3: Email Management & Deletion\n", + " with gr.Tab(\"📧 Email Management & Deletion\"):\n", + " gr.Markdown(\"## Classified Emails based on topic analysi)\")\n", + " gr.Markdown(\"Emails matching your important topics are in 'Keep'. Others are deletion candidates.\")\n", + "\n", + " with gr.Row():\n", + " with gr.Column():\n", + " gr.Markdown(\"### 📌 Keep (Important)\")\n", + " keep_df = gr.Dataframe(label=\"Emails to Keep\", interactive=False)\n", + "\n", + " with gr.Column():\n", + " gr.Markdown(\"### 🗑️ Delete Candidates\")\n", + "\n", + " with gr.Row():\n", + " select_all_btn = gr.Button(\"✅ Select All\", size=\"sm\")\n", + " deselect_all_btn = gr.Button(\"❌ Deselect All\", size=\"sm\")\n", + "\n", + " delete_df = gr.Dataframe(\n", + " label=\"Select emails to delete\",\n", + " interactive=True,\n", + " datatype=[\"bool\", \"str\", \"str\", \"str\", \"str\", \"number\"],\n", + " col_count=(6, \"fixed\")\n", + " )\n", + "\n", + " select_all_btn.click(select_all_emails, inputs=delete_df, outputs=delete_df)\n", + " deselect_all_btn.click(deselect_all_emails, inputs=delete_df, outputs=delete_df)\n", + "\n", + " gr.Markdown(\"---\")\n", + " gr.Markdown(\"## Archive & Delete\")\n", + "\n", + " with gr.Row():\n", + " archive_btn = gr.Button(\"📦 Create Archive\", variant=\"secondary\")\n", + " delete_btn = gr.Button(\"🔥 DELETE SELECTED\", variant=\"stop\")\n", + "\n", + " with gr.Row():\n", + " with gr.Column():\n", + " archive_status = gr.Textbox(label=\"Archive Status\", lines=2)\n", + " with gr.Column():\n", + " confirmation_input = gr.Textbox(label=\"Type DELETE to confirm\", placeholder=\"DELETE\")\n", + "\n", + " archive_file = gr.File(label=\"Download Archive\")\n", + " deletion_status = gr.Textbox(label=\"Deletion Result\", lines=3)\n", + "\n", + " analyze_btn.click(\n", + " analyze_topics,\n", + " inputs=[llm_dropdown, classification_threshold],\n", + " outputs=[topics_output, counts_output, keep_df, delete_df]\n", + " )\n", + "\n", + " chat_submit_btn.click(\n", + " refine_topics_with_chat,\n", + " inputs=[chat_query_input, llm_dropdown, classification_threshold],\n", + " outputs=[topics_output, counts_output, keep_df, delete_df]\n", + " )\n", + "\n", + " archive_btn.click(create_archive_file, outputs=[archive_status, archive_file])\n", + " delete_btn.click(perform_deletion, inputs=[confirmation_input, delete_df], outputs=deletion_status)" + ], + "metadata": { + "id": "iKC3MtzX-jVT" + }, + "id": "iKC3MtzX-jVT", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Launch App" + ], + "metadata": { + "id": "rY9Pbte__Kqa" + }, + "id": "rY9Pbte__Kqa" + }, + { + "cell_type": "code", + "source": [ + "app.launch(share=True, inbrowser=True)" + ], + "metadata": { + "id": "YUHF1ZIl_Nv-" + }, + "id": "YUHF1ZIl_Nv-", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "##Unit Tests for Components" + ], + "metadata": { + "id": "jHgVYNTc-tCf" + }, + "id": "jHgVYNTc-tCf" + }, + { + "cell_type": "code", + "source": [ + "\n", + "print(\"=\" * 60)\n", + "print(\"UNIT TESTS - Testing Individual Components\")\n", + "print(\"=\" * 60)\n", + "\n", + "# Test 1: Helper Functions\n", + "print(\"\\n📝 Test 1: Helper Functions\")\n", + "print(\"-\" * 40)\n", + "\n", + "def test_helper_functions():\n", + " \"\"\"Test email parsing helper functions.\"\"\"\n", + " # Test get_header_value\n", + " test_headers = [\n", + " {'name': 'Subject', 'value': 'Test Email'},\n", + " {'name': 'From', 'value': 'sender@example.com'},\n", + " {'name': 'Date', 'value': '2025-10-21'}\n", + " ]\n", + "\n", + " assert get_header_value(test_headers, 'Subject') == 'Test Email'\n", + " assert get_header_value(test_headers, 'From') == 'sender@example.com'\n", + " assert get_header_value(test_headers, 'Missing') == ''\n", + "\n", + " print(\"✓ get_header_value() works correctly\")\n", + " return True\n", + "\n", + "try:\n", + " test_helper_functions()\n", + " print(\"\\n✅ Helper functions test PASSED\")\n", + "except AssertionError as e:\n", + " print(f\"\\n❌ Helper functions test FAILED: {e}\")\n", + "\n", + "# Test 2: VectorDatabaseManager\n", + "print(\"\\n\\n💾 Test 2: VectorDatabaseManager\")\n", + "print(\"-\" * 40)\n", + "\n", + "def test_vector_database_manager():\n", + " \"\"\"Test VectorDatabaseManager class.\"\"\"\n", + " test_docs = [\n", + " Document(\n", + " page_content=\"This is a test email about Python programming and data science.\",\n", + " metadata={'subject': 'Test 1', 'sender': 'test@example.com'}\n", + " ),\n", + " Document(\n", + " page_content=\"Another email discussing machine learning and AI topics.\",\n", + " metadata={'subject': 'Test 2', 'sender': 'ai@example.com'}\n", + " ),\n", + " Document(\n", + " page_content=\"Meeting invitation for tomorrow's project review.\",\n", + " metadata={'subject': 'Test 3', 'sender': 'manager@example.com'}\n", + " )\n", + " ]\n", + "\n", + " test_mgr = VectorDatabaseManager(db_name=\"test_vector_db\")\n", + " embeddings = test_mgr.create_embeddings(\"bert\")\n", + " assert test_mgr.embeddings is not None\n", + " print(\"✓ Embeddings created successfully\")\n", + "\n", + " vectorstore = test_mgr.create_vector_store(test_docs, recreate=True)\n", + " assert vectorstore is not None\n", + " assert test_mgr.vectorstore._collection.count() == len(test_docs)\n", + " print(f\"✓ Vector store created with {len(test_docs)} documents\")\n", + "\n", + " retriever = vectorstore.as_retriever(search_kwargs={\"k\": 2})\n", + " results = retriever.invoke(\"Python programming\")\n", + " assert len(results) > 0\n", + " print(f\"✓ Retrieval works: found {len(results)} relevant documents\")\n", + "\n", + " if os.path.exists(\"test_vector_db\"):\n", + " shutil.rmtree(\"test_vector_db\")\n", + "\n", + " return True\n", + "\n", + "try:\n", + " test_vector_database_manager()\n", + " print(\"\\n✅ VectorDatabaseManager test PASSED\")\n", + "except Exception as e:\n", + " print(f\"\\n❌ VectorDatabaseManager test FAILED: {e}\")\n", + "\n", + "# Test 3: EmailProcessor\n", + "print(\"\\n\\n📧 Test 3: EmailProcessor\")\n", + "print(\"-\" * 40)\n", + "\n", + "def test_email_processor():\n", + " \"\"\"Test EmailProcessor class.\"\"\"\n", + " test_docs = [\n", + " Document(\n", + " page_content=\"Subject: Project Update\\nFrom: boss@company.com\\nTo: me@company.com\\nDate: 2025-10-20\\n\\nPlease review the quarterly report.\",\n", + " metadata={'subject': 'Project Update', 'sender': 'boss@company.com', 'message_id': '001', 'date': '2025-10-20'}\n", + " ),\n", + " Document(\n", + " page_content=\"Subject: Newsletter\\nFrom: marketing@spam.com\\nTo: me@company.com\\nDate: 2025-10-19\\n\\nCheck out our latest deals!\",\n", + " metadata={'subject': 'Newsletter', 'sender': 'marketing@spam.com', 'message_id': '002', 'date': '2025-10-19'}\n", + " ),\n", + " Document(\n", + " page_content=\"Subject: Team Meeting\\nFrom: colleague@company.com\\nTo: me@company.com\\nDate: 2025-10-21\\n\\nMeeting tomorrow at 10am.\",\n", + " metadata={'subject': 'Team Meeting', 'sender': 'colleague@company.com', 'message_id': '003', 'date': '2025-10-21'}\n", + " )\n", + " ]\n", + "\n", + " processor = EmailProcessor()\n", + "\n", + " chunks = processor.chunk_documents(test_docs, chunk_size=100, chunk_overlap=20)\n", + " assert len(chunks) >= len(test_docs)\n", + " print(f\"✓ Chunking works: created {len(chunks)} chunks from {len(test_docs)} documents\")\n", + "\n", + " stats = processor.get_statistics(test_docs)\n", + " assert stats['total_emails'] == 3\n", + " assert stats['unique_senders'] == 3\n", + " print(f\"✓ Statistics calculation works: {stats['total_emails']} emails, {stats['unique_senders']} unique senders\")\n", + "\n", + " df = processor.emails_to_dataframe(test_docs, add_select_column=True)\n", + " assert len(df) == 3\n", + " assert 'Topics' in df.columns\n", + " assert 'Subject' in df.columns\n", + " assert 'Sender' in df.columns\n", + " assert 'Select' in df.columns\n", + " print(f\"✓ DataFrame conversion works: {len(df)} rows, {len(df.columns)} columns\")\n", + "\n", + " return True\n", + "\n", + "try:\n", + " test_email_processor()\n", + " print(\"\\n✅ EmailProcessor test PASSED\")\n", + "except Exception as e:\n", + " print(f\"\\n❌ EmailProcessor test FAILED: {e}\")\n", + "\n", + "# Test 4: Mock IMAP Connection\n", + "print(\"\\n\\n🔌 Test 4: Mock IMAP Connection\")\n", + "print(\"-\" * 40)\n", + "\n", + "def test_mock_connection():\n", + " \"\"\"Test the connection interface with a mock implementation.\"\"\"\n", + "\n", + " class MockIMAPConnection(GmailConnection):\n", + " \"\"\"Mock implementation for testing.\"\"\"\n", + "\n", + " def connect(self) -> bool:\n", + " self.auth_info = {\n", + " 'email': 'test@example.com',\n", + " 'total_messages': 100,\n", + " 'auth_method': 'Mock'\n", + " }\n", + " self.connection = \"mock_connection\"\n", + " return True\n", + "\n", + " def fetch_emails(self, max_emails: Optional[int] = None) -> Tuple[List[Document], str]:\n", + " limit = max_emails if max_emails else 10\n", + " docs = [\n", + " Document(\n", + " page_content=f\"Mock email {i}\",\n", + " metadata={\n", + " 'message_id': f'mock_{i}',\n", + " 'subject': f'Test Subject {i}',\n", + " 'sender': f'sender{i}@example.com',\n", + " 'date': '2025-10-21'\n", + " }\n", + " )\n", + " for i in range(min(limit, 5))\n", + " ]\n", + " return docs, f\"✓ Fetched {len(docs)} mock emails\"\n", + "\n", + " def delete_emails(self, documents: List[Document]) -> Tuple[int, int]:\n", + " return len(documents), 0\n", + "\n", + " mock_conn = MockIMAPConnection()\n", + "\n", + " assert mock_conn.connect()\n", + " print(\"✓ Mock connection established\")\n", + "\n", + " assert mock_conn.is_connected()\n", + " print(\"✓ Connection status check works\")\n", + "\n", + " info = mock_conn.get_auth_info()\n", + " assert info['email'] == 'test@example.com'\n", + " print(f\"✓ Auth info retrieved: {info['email']}\")\n", + "\n", + " emails, diagnostics = mock_conn.fetch_emails(max_emails=3)\n", + " assert len(emails) == 3\n", + " print(f\"✓ Fetched {len(emails)} mock emails\")\n", + " print(f\" Diagnostics: {diagnostics}\")\n", + "\n", + " success, failed = mock_conn.delete_emails(emails)\n", + " assert success == 3 and failed == 0\n", + " print(f\"✓ Mock deletion: {success} successful, {failed} failed\")\n", + "\n", + " return True\n", + "\n", + "try:\n", + " test_mock_connection()\n", + " print(\"\\n✅ Mock connection test PASSED\")\n", + "except Exception as e:\n", + " print(f\"\\n❌ Mock connection test FAILED: {e}\")\n", + "\n", + "print(\"\\n\" + \"=\" * 60)\n", + "print(\"✅ ALL UNIT TESTS COMPLETED\")\n", + "print(\"=\" * 60)\n" + ], + "metadata": { + "id": "NQjxVtZl-sNm" + }, + "id": "NQjxVtZl-sNm", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "##Integration Test (with Mock Data)" + ], + "metadata": { + "id": "sA6A8f2Q-r_2" + }, + "id": "sA6A8f2Q-r_2" + }, + { + "cell_type": "code", + "source": [ + "print(\"\\n\\n\" + \"=\" * 60)\n", + "print(\"INTEGRATION TEST - Full Workflow with Mock Data\")\n", + "print(\"=\" * 60)\n", + "\n", + "def run_integration_test():\n", + " \"\"\"Run a complete workflow test with mock data.\"\"\"\n", + "\n", + " print(\"\\n🚀 Starting integration test...\")\n", + "\n", + " # Step 1: Create mock connection\n", + " print(\"\\n1️⃣ Creating mock Gmail connection...\")\n", + "\n", + " class TestGmailConnection(GmailConnection):\n", + " def connect(self):\n", + " self.connection = True\n", + " self.auth_info = {'email': 'test@example.com', 'total_messages': 20, 'auth_method': 'Test'}\n", + " return True\n", + "\n", + " def fetch_emails(self, max_emails=None):\n", + " # Generate realistic mock emails\n", + " topics = [\n", + " (\"Work Project\", \"manager@company.com\", \"Need your input on Q4 planning and budget allocation.\"),\n", + " (\"Team Meeting\", \"colleague@company.com\", \"Weekly sync tomorrow at 10am to discuss progress.\"),\n", + " (\"Newsletter\", \"marketing@newsletter.com\", \"Top 10 deals this week! Don't miss out!\"),\n", + " (\"Spam Offer\", \"deals@promo.com\", \"You've won a million dollars! Click here now!\"),\n", + " (\"Client Update\", \"client@business.com\", \"Regarding the proposal you sent last week.\"),\n", + " (\"Training Course\", \"learning@company.com\", \"New Python course available for employees.\"),\n", + " (\"Marketing Email\", \"ads@shopping.com\", \"Summer sale - 50% off everything!\"),\n", + " (\"Boss Email\", \"ceo@company.com\", \"Great job on the presentation yesterday!\"),\n", + " (\"Junk\", \"random@spam.com\", \"Make money fast with this one weird trick!\"),\n", + " (\"Important Notice\", \"hr@company.com\", \"Annual review meeting scheduled for next month.\")\n", + " ]\n", + "\n", + " limit = min(max_emails if max_emails else 10, len(topics))\n", + "\n", + " docs = [\n", + " Document(\n", + " page_content=f\"Subject: {subj}\\nFrom: {sender}\\nTo: test@example.com\\nDate: 2025-10-{20-i}\\n\\n{body}\",\n", + " metadata={\n", + " 'message_id': f'test_{i}',\n", + " 'subject': subj,\n", + " 'sender': sender,\n", + " 'recipient': 'test@example.com',\n", + " 'date': f'2025-10-{20-i}',\n", + " 'source': 'test'\n", + " }\n", + " )\n", + " for i, (subj, sender, body) in enumerate(topics[:limit])\n", + " ]\n", + " return docs, f\"✓ Fetched {len(docs)} test emails\"\n", + "\n", + " def delete_emails(self, documents):\n", + " return len(documents), 0\n", + "\n", + " test_conn = TestGmailConnection()\n", + " test_conn.connect()\n", + " print(f\" ✓ Connected as: {test_conn.get_auth_info()['email']}\")\n", + "\n", + " # Step 2: Fetch emails\n", + " print(\"\\n2️⃣ Fetching mock emails...\")\n", + " emails, diagnostics = test_conn.fetch_emails(max_emails=10)\n", + " print(f\" ✓ Fetched {len(emails)} emails\")\n", + " print(f\" {diagnostics}\")\n", + "\n", + " # Step 3: Process emails\n", + " print(\"\\n3️⃣ Processing emails...\")\n", + " processor = EmailProcessor()\n", + " chunks = processor.chunk_documents(emails)\n", + " print(f\" ✓ Created {len(chunks)} chunks\")\n", + "\n", + " stats = processor.get_statistics(emails)\n", + " print(f\" ✓ Statistics: {stats['total_emails']} emails, {stats['unique_senders']} senders\")\n", + "\n", + " # Step 4: Create vector store\n", + " print(\"\\n4️⃣ Creating vector store...\")\n", + " vector_mgr = VectorDatabaseManager(db_name=\"test_integration_db\")\n", + " vector_mgr.create_embeddings(\"bert\") # Use BERT to avoid API costs\n", + " vector_mgr.create_vector_store(chunks, recreate=True)\n", + " print(f\" ✓ Vector store created with {vector_mgr.vectorstore._collection.count()} documents\")\n", + "\n", + " # Step 5: Analyze topics (simulated - would normally use LLM)\n", + " print(\"\\n5️⃣ Analyzing topics...\")\n", + " processor.topics = \"\"\"\n", + "Based on the email analysis:\n", + "1. Work Projects - Manager communications about planning and budgets\n", + "2. Team Collaboration - Meeting invites and team sync-ups\n", + "3. Client Relations - Important client communications\n", + "4. Professional Development - Training and learning opportunities\n", + "5. Company Announcements - HR and leadership communications\n", + "\"\"\"\n", + " print(\" Topics identified (mock analysis)\")\n", + "\n", + " # Step 6: Classify emails\n", + " print(\"\\n6️⃣ Classifying emails...\")\n", + " # Simulate classification based on sender domains\n", + " work_domains = ['company.com', 'business.com']\n", + " spam_domains = ['newsletter.com', 'promo.com', 'spam.com', 'shopping.com']\n", + "\n", + " keep_emails = [email for email in emails if any(domain in email.metadata.get('sender', '') for domain in work_domains)]\n", + " delete_emails = [email for email in emails if any(domain in email.metadata.get('sender', '') for domain in spam_domains)]\n", + "\n", + " processor.classified_emails = {'keep': keep_emails, 'delete': delete_emails}\n", + " print(f\" ✓ Classification complete:\")\n", + " print(f\" - Keep: {len(keep_emails)} emails\")\n", + " print(f\" - Delete: {len(delete_emails)} emails\")\n", + "\n", + " # Step 7: Create archive\n", + " print(\"\\n7️⃣ Creating archive...\")\n", + " if delete_emails:\n", + " archive_path = processor.create_archive(delete_emails)\n", + " print(f\" ✓ Archive created: {archive_path}\")\n", + " archive_exists = os.path.exists(archive_path)\n", + " print(f\" ✓ Archive file exists: {archive_exists}\")\n", + "\n", + " # Step 8: Simulate deletion\n", + " print(\"\\n8️⃣ Simulating deletion...\")\n", + " success, failed = test_conn.delete_emails(delete_emails)\n", + " print(f\" ✓ Deletion complete: {success} successful, {failed} failed\")\n", + "\n", + " # Step 9: Display results as DataFrame\n", + " print(\"\\n9️⃣ Generating reports...\")\n", + " keep_df = processor.emails_to_dataframe(keep_emails)\n", + " delete_df = processor.emails_to_dataframe(delete_emails)\n", + " print(f\" ✓ Keep DataFrame: {len(keep_df)} rows\")\n", + " print(f\" ✓ Delete DataFrame: {len(delete_df)} rows\")\n", + "\n", + " # Cleanup\n", + " print(\"\\n🧹 Cleaning up test files...\")\n", + " if os.path.exists(\"test_integration_db\"):\n", + " shutil.rmtree(\"test_integration_db\")\n", + " if delete_emails and os.path.exists(archive_path):\n", + " os.remove(archive_path)\n", + " print(\" ✓ Cleanup complete\")\n", + "\n", + " print(\"\\n\" + \"=\" * 60)\n", + " print(\"✅ INTEGRATION TEST COMPLETED SUCCESSFULLY!\")\n", + " print(\"=\" * 60)\n", + " print(\"\\n📊 Summary:\")\n", + " print(f\" • Total emails processed: {len(emails)}\")\n", + " print(f\" • Emails to keep: {len(keep_emails)}\")\n", + " print(f\" • Emails to delete: {len(delete_emails)}\")\n", + " print(f\" • Archive created: ✓\")\n", + " print(f\" • Deletion simulated: ✓\")\n", + " print(\"\\n💡 The refactored architecture makes testing easy!\")\n", + "\n", + " return True\n", + "\n", + "try:\n", + " run_integration_test()\n", + "except Exception as e:\n", + " print(f\"\\n❌ INTEGRATION TEST FAILED: {e}\")\n", + " import traceback\n", + " traceback.print_exc()" + ], + "metadata": { + "id": "5MBAXKSW-9qp" + }, + "id": "5MBAXKSW-9qp", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "##Performance Test" + ], + "metadata": { + "id": "zpaJTrOp_BdP" + }, + "id": "zpaJTrOp_BdP" + }, + { + "cell_type": "code", + "source": [ + "\n", + "print(\"\\n\\n\" + \"=\" * 60)\n", + "print(\"PERFORMANCE TEST - Component Benchmarks\")\n", + "print(\"=\" * 60)\n", + "\n", + "import time\n", + "\n", + "def benchmark_component(name, func, *args, **kwargs):\n", + " \"\"\"Benchmark a component function.\"\"\"\n", + " start = time.time()\n", + " result = func(*args, **kwargs)\n", + " elapsed = time.time() - start\n", + " print(f\" {name}: {elapsed:.3f}s\")\n", + " return result, elapsed\n", + "\n", + "def run_performance_tests():\n", + " \"\"\"Run performance benchmarks.\"\"\"\n", + "\n", + " # Generate test data\n", + " print(\"\\n📊 Generating test data...\")\n", + " test_emails = [\n", + " Document(\n", + " page_content=f\"Subject: Test {i}\\nFrom: sender{i % 10}@example.com\\n\\n\" + \" \".join([\"word\"] * 100),\n", + " metadata={\n", + " 'message_id': f'perf_{i}',\n", + " 'subject': f'Test {i}',\n", + " 'sender': f'sender{i % 10}@example.com',\n", + " 'date': f'2025-10-{(i % 30) + 1:02d}'\n", + " }\n", + " )\n", + " for i in range(100)\n", + " ]\n", + " print(f\" ✓ Created {len(test_emails)} test emails\")\n", + "\n", + " # Benchmark EmailProcessor\n", + " print(\"\\n⏱️ Benchmarking EmailProcessor...\")\n", + " processor = EmailProcessor()\n", + "\n", + " chunks, t1 = benchmark_component(\"Chunking\", processor.chunk_documents, test_emails)\n", + " stats, t2 = benchmark_component(\"Statistics\", processor.get_statistics, test_emails)\n", + " df, t3 = benchmark_component(\"DataFrame conversion\", processor.emails_to_dataframe, test_emails)\n", + "\n", + " # Benchmark VectorDatabaseManager\n", + " print(\"\\n⏱️ Benchmarking VectorDatabaseManager...\")\n", + " vector_mgr = VectorDatabaseManager(db_name=\"test_perf_db\")\n", + "\n", + " emb, t4 = benchmark_component(\"Embedding creation\", vector_mgr.create_embeddings, \"bert\")\n", + " vs, t5 = benchmark_component(\"Vector store creation\", vector_mgr.create_vector_store, chunks[:50]) # Limit for speed\n", + "\n", + " # Cleanup\n", + " if os.path.exists(\"test_perf_db\"):\n", + " shutil.rmtree(\"test_perf_db\")\n", + "\n", + " print(\"\\n\" + \"=\" * 60)\n", + " print(\"✅ PERFORMANCE TEST COMPLETED\")\n", + " print(\"=\" * 60)\n", + " print(f\"\\n📈 Total time: {t1 + t2 + t3 + t4 + t5:.3f}s\")\n", + " print(f\" Fastest operation: DataFrame conversion ({t3:.3f}s)\")\n", + " print(f\" Slowest operation: Vector store creation ({t5:.3f}s)\")\n", + "\n", + "try:\n", + " run_performance_tests()\n", + "except Exception as e:\n", + " print(f\"\\n❌ PERFORMANCE TEST FAILED: {e}\")\n", + "\n" + ], + "metadata": { + "id": "41w8FGJ9_CCU" + }, + "id": "41w8FGJ9_CCU", + "execution_count": null, + "outputs": [] + } + ], + "metadata": { + "language_info": { + "name": "python" + }, + "colab": { + "provenance": [], + "include_colab_link": true + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file