Merge branch 'ed-donner:main' into main

This commit is contained in:
Tochi Nwachukwu
2025-10-30 06:26:32 +01:00
committed by GitHub
88 changed files with 26824 additions and 0 deletions

View File

@@ -0,0 +1,115 @@
# 🎩 Tuxedo Link
**AI-Powered Cat Adoption Search Engine**
Find your perfect feline companion using AI, semantic search, and multi-platform aggregation.
*In loving memory of Kyra 🐱*
---
## 🌟 Overview
Tuxedo Link is an intelligent cat adoption platform that combines:
- **Natural Language Understanding** - Describe your ideal cat in plain English
- **Semantic Search with RAG** - ChromaDB + SentenceTransformers for personality-based matching
- **Multi-Modal Deduplication** - Uses CLIP for image similarity + text analysis
- **Hybrid Scoring** - 60% vector similarity + 40% attribute matching
- **Multi-Platform Aggregation** - Searches Petfinder and RescueGroups APIs
- **Serverless Architecture** - Optional Modal deployment with scheduled email alerts
**Tech Stack**: OpenAI GPT-4 • ChromaDB • CLIP • Gradio • Modal
---
## 📸 Application Screenshots
### 🔍 Search Interface
Natural language search with semantic matching and personality-based results:
![Search Interface](assets/1.%20search.png)
### 🔔 Email Alerts
Save your search and get notified when new matching cats are available:
![Alerts Management](assets/2.%20Alerts.png)
### 📖 About Page
Learn about the technology and inspiration behind Tuxedo Link:
![About Page](assets/3.%20About.png)
### 📧 Email Notifications
Receive beautiful email alerts with your perfect matches:
![Email Notification](assets/4.%20Email.png)
---
## 🚀 Full Project & Source Code
The complete source code, documentation, and setup instructions are available at:
### **[👉 GitHub Repository: dkisselev-zz/tuxedo-link](https://github.com/dkisselev-zz/tuxedo-link)**
The repository includes:
- ✅ Complete source code with 92 passing tests
- ✅ Comprehensive technical documentation (3,400+ lines)
- ✅ Agentic architecture with 7 specialized agents
- ✅ Dual vector store implementation (main + metadata)
- ✅ Modal deployment guide for production
- ✅ Setup scripts and configuration examples
- ✅ LLM techniques documentation (structured output, RAG, hybrid search)
---
## 🧠 Key LLM/RAG Techniques
### 1. Structured Output with GPT-4 Function Calling
Extracts search preferences from natural language into Pydantic models
### 2. Dual Vector Store Architecture
- **Main ChromaDB** - Cat profile semantic embeddings
- **Metadata DB** - Fuzzy color/breed matching with typo tolerance
### 3. Hybrid Search Strategy
Combines vector similarity (60%) with structured metadata filtering (40%)
### 4. 3-Tier Semantic Normalization
Dictionary → Vector DB → Fuzzy fallback for robust term mapping
### 5. Multi-Modal Deduplication
Fingerprint + text (Levenshtein) + image (CLIP) similarity scoring
---
## 🏆 Project Highlights
- **92 Tests** - 81 unit + 11 integration tests (100% passing)
- **Production Ready** - Serverless Modal deployment with volumes
- **Email Alerts** - Scheduled background jobs for new match notifications
- **95%+ Accuracy** - Multi-modal deduplication across platforms
- **85-90% Match Quality** - Hybrid scoring algorithm
---
## 📚 Documentation
- **TECHNICAL_REFERENCE.md** - Complete API documentation
- **MODAL_DEPLOYMENT.md** - Cloud deployment guide
- **ARCHITECTURE_DIAGRAM.md** - System architecture visuals
- **tests/README.md** - Testing guide and coverage
---
<div align="center">
**Made with ❤️ in memory of Kyra**
*May every cat find their perfect home* 🐾
**[View Full Project on GitHub →](https://github.com/dkisselev-zz/tuxedo-link)**
</div>

View File

@@ -0,0 +1,2 @@
*.sqlite3
memory.json

View File

@@ -0,0 +1,122 @@
# Price Is Right - Host-Based Setup
A simplified host-based microservices implementation of "The Price is Right" deal hunting system.
## Overview
This setup runs all services directly on the host without Docker containers, using a shared Python virtual environment and direct Ollama connection.
## Prerequisites
- Python 3.11+
- Ollama running on port 11434
- Required Ollama models: `llama3.2` and `llama3.2:3b-instruct-q4_0`
## Quick Start
1. **Install dependencies:**
```bash
pip install -r requirements.txt
# or with uv:
uv pip install -r requirements.txt
```
2. **Start all services:**
```bash
python service_manager.py start
```
3. **Access the UI:**
- Main UI: http://localhost:7860
- Notification Receiver: http://localhost:7861
4. **Stop all services:**
```bash
python service_manager.py stop
```
## Service Architecture
| Service | Port | Description |
|---------|------|-------------|
| Scanner Agent | 8001 | Scans for deals from RSS feeds |
| Specialist Agent | 8002 | Fine-tuned LLM price estimation |
| Frontier Agent | 8003 | RAG-based price estimation |
| Random Forest Agent | 8004 | ML model price prediction |
| Ensemble Agent | 8005 | Combines all price estimates |
| Planning Agent | 8006 | Orchestrates deal evaluation |
| Notification Service | 8007 | Sends deal alerts |
| Notification Receiver | 8008 | Receives and displays alerts |
| UI | 7860 | Main web interface |
## Service Management
### Start Services
```bash
# Start all services
python service_manager.py start
# Start specific service
python service_manager.py start scanner
```
### Stop Services
```bash
# Stop all services
python service_manager.py stop
# Stop specific service
python service_manager.py stop scanner
```
### Check Status
```bash
python service_manager.py status
```
### Restart Service
```bash
python service_manager.py restart scanner
```
## Data Files
- `data/models/` - Contains .pkl model files (immediately accessible)
- `data/vectorstore/` - ChromaDB vector store
- `data/memory.json` - Deal memory storage
- `logs/` - Service log files
## Key Features
- **No Docker overhead** - Services start instantly
- **Direct file access** - .pkl files load immediately
- **Single environment** - All services share the same Python environment
- **Direct Ollama access** - No proxy needed
- **Easy debugging** - Direct process access and logs
## Troubleshooting
1. **Port conflicts**: Check if ports are already in use
```bash
python service_manager.py status
```
2. **Ollama connection issues**: Ensure Ollama is running on port 11434
```bash
ollama list
```
3. **Service logs**: Check individual service logs in `logs/` directory
4. **Model loading**: Ensure required models are available
```bash
ollama pull llama3.2
ollama pull llama3.2:3b-instruct-q4_0
```
## Development
- All services are in `services/` directory
- Shared code is in `shared/` directory
- Service manager handles process lifecycle
- Logs are written to `logs/` directory

View File

@@ -0,0 +1 @@
[{"deal": {"product_description": "Test Product", "price": 100.0, "url": "https://test.com"}, "estimate": 150.0, "discount": 50.0}]

View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
import sys
import os
import subprocess
import time
import signal
from service_manager import ServiceManager
def show_usage():
print("Usage: python main.py <command> [service_name]")
print("Commands:")
print(" start [service] - Start all services or specific service")
print(" stop [service] - Stop all services or specific service")
print(" restart [service] - Restart specific service")
print(" status - Show status of all services")
print(" run - Start all services and launch UI (default)")
print(" ui - Launch UI only (assumes services are running)")
print(" kill - Force kill all services (use if stop doesn't work)")
print("\nService names: scanner, specialist, frontier, random-forest, ensemble, planning, notification-service, notification-receiver, ui")
print("\nExamples:")
print(" python main.py run # Start everything and launch UI")
print(" python main.py start # Start all services")
print(" python main.py start scanner # Start only scanner service")
print(" python main.py status # Check service status")
print(" python main.py stop # Stop all services")
print(" python main.py kill # Force kill all services")
def launch_ui():
"""Launch the UI assuming services are already running"""
print("Launching UI...")
try:
from services.ui import App
app = App()
app.run()
except Exception as e:
print(f"Failed to launch UI: {e}")
print("Make sure all services are running first. Use 'python main.py status' to check.")
def run_full_app():
"""Start all services and launch the UI"""
print("Starting The Price is Right - Full Application")
print("=" * 50)
# Initialize service manager
manager = ServiceManager()
# Handle Ctrl+C gracefully
def signal_handler(sig, frame):
print("\nReceived interrupt signal. Cleaning up...")
manager.cleanup()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
# Start all services first
print("Starting microservices...")
if not manager.start_all():
print("Failed to start some services. Check logs/ directory for details.")
return
print("\nWaiting for services to initialize...")
time.sleep(3) # Give services time to start
# Now launch the UI
launch_ui()
except KeyboardInterrupt:
print("\nInterrupted by user")
except Exception as e:
print(f"Error: {e}")
finally:
manager.cleanup()
def main():
if len(sys.argv) < 2:
# Default behavior: run the full app
run_full_app()
return
command = sys.argv[1].lower()
service_name = sys.argv[2] if len(sys.argv) > 2 else None
# Initialize service manager
manager = ServiceManager()
# Handle Ctrl+C gracefully
def signal_handler(sig, frame):
print("\nReceived interrupt signal. Cleaning up...")
manager.cleanup()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
if command == 'run':
run_full_app()
elif command == 'ui':
launch_ui()
elif command == 'start':
if service_name:
manager.start_service(service_name)
else:
manager.start_all()
elif command == 'stop':
if service_name:
manager.stop_service(service_name)
else:
manager.stop_all()
elif command == 'restart':
if service_name:
manager.restart(service_name)
else:
print("Please specify a service name to restart")
elif command == 'status':
manager.status()
elif command == 'kill':
manager.force_kill_all()
elif command in ['help', '-h', '--help']:
show_usage()
else:
print(f"Unknown command: {command}")
show_usage()
sys.exit(1)
except KeyboardInterrupt:
print("\nInterrupted by user")
manager.cleanup()
except Exception as e:
print(f"Error: {e}")
manager.cleanup()
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,24 @@
fastapi
uvicorn
httpx
ollama
pydantic
python-dotenv
feedparser
beautifulsoup4
requests
tqdm
gradio
plotly
numpy
scikit-learn
chromadb
sentence-transformers
pandas
joblib
transformers
psutil
twilio
openai
datasets
modal

View File

@@ -0,0 +1,346 @@
#!/usr/bin/env python3
import subprocess
import sys
import os
import time
import signal
import psutil
from typing import Dict, List, Optional
class ServiceManager:
def __init__(self):
self.services = {
'scanner': {'port': 8001, 'script': 'services/scanner_agent.py'},
'specialist': {'port': 8002, 'script': 'services/specialist_agent.py'},
'frontier': {'port': 8003, 'script': 'services/frontier_agent.py'},
'random-forest': {'port': 8004, 'script': 'services/random_forest_agent.py'},
'ensemble': {'port': 8005, 'script': 'services/ensemble_agent.py'},
'planning': {'port': 8006, 'script': 'services/planning_agent.py'},
'notification-service': {'port': 8007, 'script': 'services/notification_service.py'},
'notification-receiver': {'port': 8008, 'script': 'services/notification_receiver.py'},
'ui': {'port': 7860, 'script': 'services/ui.py'},
}
self.processes: Dict[str, subprocess.Popen] = {}
self.logs_dir = 'logs'
# Create logs directory if it doesn't exist
os.makedirs(self.logs_dir, exist_ok=True)
def _find_process_by_port(self, port: int) -> Optional[int]:
"""Find the PID of the process using the specified port"""
try:
# Use lsof command as psutil has permission issues on macOS
result = subprocess.run(['lsof', '-ti', f':{port}'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0 and result.stdout.strip():
return int(result.stdout.strip().split('\n')[0])
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, ValueError):
pass
return None
def is_port_in_use(self, port: int) -> bool:
"""Check if a port is already in use"""
try:
# Use lsof command as psutil has permission issues on macOS
result = subprocess.run(['lsof', '-ti', f':{port}'],
capture_output=True, text=True, timeout=5)
return result.returncode == 0 and bool(result.stdout.strip())
except (subprocess.TimeoutExpired, subprocess.CalledProcessError):
return False
def start_service(self, service_name: str) -> bool:
"""Start a specific service"""
if service_name not in self.services:
print(f"Unknown service: {service_name}")
return False
if service_name in self.processes:
print(f"Service {service_name} is already running")
return True
service_info = self.services[service_name]
script_path = service_info['script']
port = service_info['port']
if not os.path.exists(script_path):
print(f"Service script not found: {script_path}")
return False
if self.is_port_in_use(port):
print(f"Port {port} is already in use")
return False
try:
log_file = open(f"{self.logs_dir}/{service_name}.log", "w")
# Use virtual environment Python if available
python_executable = sys.executable
venv_python = os.path.join(os.getcwd(), '.venv', 'bin', 'python')
if os.path.exists(venv_python):
python_executable = venv_python
process = subprocess.Popen(
[python_executable, script_path],
stdout=log_file,
stderr=subprocess.STDOUT,
cwd=os.getcwd(),
bufsize=1, # Line buffered
universal_newlines=True
)
self.processes[service_name] = process
print(f"Started {service_name} (PID: {process.pid}) on port {port}")
return True
except Exception as e:
print(f"Failed to start {service_name}: {e}")
return False
def stop_service(self, service_name: str) -> bool:
"""Stop a specific service"""
if service_name not in self.services:
print(f"Unknown service: {service_name}")
return False
service_info = self.services[service_name]
port = service_info['port']
# First try to stop tracked process
if service_name in self.processes:
process = self.processes[service_name]
try:
process.terminate()
process.wait(timeout=5)
del self.processes[service_name]
print(f"Stopped {service_name} (tracked process)")
return True
except subprocess.TimeoutExpired:
process.kill()
del self.processes[service_name]
print(f"Force killed {service_name} (tracked process)")
return True
except Exception as e:
print(f"Failed to stop tracked process for {service_name}: {e}")
# If no tracked process or it failed, try to find and kill by port
if self.is_port_in_use(port):
pid = self._find_process_by_port(port)
if pid:
try:
# Try graceful termination first
os.kill(pid, signal.SIGTERM)
time.sleep(2)
# Check if still running
try:
os.kill(pid, 0) # Check if process exists
# Still running, force kill
os.kill(pid, signal.SIGKILL)
print(f"Force killed {service_name} (PID: {pid})")
except ProcessLookupError:
# Process already terminated
print(f"Stopped {service_name} (PID: {pid})")
return True
except ProcessLookupError:
print(f"Process {service_name} (PID: {pid}) already stopped")
return True
except PermissionError:
print(f"Permission denied to stop {service_name} (PID: {pid})")
return False
except Exception as e:
print(f"Failed to stop {service_name} (PID: {pid}): {e}")
return False
else:
print(f"Port {port} is in use but couldn't find process for {service_name}")
return False
else:
print(f"Service {service_name} is not running (port {port} not in use)")
return True
def start_all(self) -> bool:
"""Start all services"""
print("Starting all services...")
success = True
# Start services in dependency order
start_order = [
'scanner', 'specialist', 'frontier', 'random-forest',
'ensemble', 'planning', 'notification-service',
'notification-receiver', 'ui'
]
for service_name in start_order:
if not self.start_service(service_name):
success = False
time.sleep(1) # Small delay between starts
if success:
print("All services started successfully!")
print("\nService URLs:")
print("- Scanner Agent: http://localhost:8001")
print("- Specialist Agent: http://localhost:8002")
print("- Frontier Agent: http://localhost:8003")
print("- Random Forest Agent: http://localhost:8004")
print("- Ensemble Agent: http://localhost:8005")
print("- Planning Agent: http://localhost:8006")
print("- Notification Service: http://localhost:8007")
print("- Notification Receiver: http://localhost:8008")
print("- UI: http://localhost:7860")
else:
print("Some services failed to start. Check logs/ directory for details.")
return success
def stop_all(self) -> bool:
"""Stop all services"""
print("Stopping all services...")
success = True
# Stop tracked processes first
for service_name in reversed(list(self.processes.keys())):
if not self.stop_service(service_name):
success = False
# Clear the processes dict
self.processes.clear()
# Now stop any remaining services by port
for service_name, service_info in self.services.items():
port = service_info['port']
if self.is_port_in_use(port):
print(f"Found orphaned service on port {port}, stopping {service_name}...")
if not self.stop_service(service_name):
success = False
if success:
print("All services stopped successfully!")
else:
print("Some services failed to stop properly.")
return success
def status(self) -> None:
"""Show status of all services"""
print("Service Status:")
print("-" * 50)
for service_name, service_info in self.services.items():
port = service_info['port']
try:
# First check if we have a tracked process
if service_name in self.processes:
process = self.processes[service_name]
if process.poll() is None:
print(f"{service_name:20} | Running (PID: {process.pid}) | Port: {port}")
else:
print(f"{service_name:20} | Stopped (exit code: {process.returncode}) | Port: {port}")
del self.processes[service_name]
else:
# Check if port is in use and try to find the actual process
if self.is_port_in_use(port):
# Try to find the process using this port
pid = self._find_process_by_port(port)
if pid:
print(f"{service_name:20} | Running (PID: {pid}) | Port: {port}")
else:
print(f"{service_name:20} | Port {port} in use (external process)")
else:
print(f"{service_name:20} | Stopped | Port: {port}")
except Exception as e:
print(f"{service_name:20} | Error checking status: {e}")
def restart(self, service_name: str) -> bool:
"""Restart a specific service"""
print(f"Restarting {service_name}...")
self.stop_service(service_name)
time.sleep(1)
return self.start_service(service_name)
def force_kill_all(self) -> bool:
"""Force kill all processes using service ports"""
print("Force killing all services...")
success = True
for service_name, service_info in self.services.items():
port = service_info['port']
if self.is_port_in_use(port):
pid = self._find_process_by_port(port)
if pid:
try:
os.kill(pid, signal.SIGKILL)
print(f"Force killed {service_name} (PID: {pid})")
except ProcessLookupError:
print(f"Process {service_name} (PID: {pid}) already stopped")
except PermissionError:
print(f"Permission denied to kill {service_name} (PID: {pid})")
success = False
except Exception as e:
print(f"Failed to kill {service_name} (PID: {pid}): {e}")
success = False
# Clear tracked processes
self.processes.clear()
if success:
print("All services force killed!")
else:
print("Some services could not be killed.")
return success
def cleanup(self):
"""Clean up on exit"""
if self.processes:
print("\nCleaning up running processes...")
self.stop_all()
def main():
manager = ServiceManager()
# Handle Ctrl+C gracefully
def signal_handler(sig, frame):
print("\nReceived interrupt signal. Cleaning up...")
manager.cleanup()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if len(sys.argv) < 2:
print("Usage: python service_manager.py <command> [service_name]")
print("Commands: start, stop, restart, status")
print("Service names: scanner, specialist, frontier, random-forest, ensemble, planning, notification-service, notification-receiver, ui")
sys.exit(1)
command = sys.argv[1].lower()
service_name = sys.argv[2] if len(sys.argv) > 2 else None
try:
if command == 'start':
if service_name:
manager.start_service(service_name)
else:
manager.start_all()
elif command == 'stop':
if service_name:
manager.stop_service(service_name)
else:
manager.stop_all()
elif command == 'restart':
if service_name:
manager.restart(service_name)
else:
print("Please specify a service name to restart")
elif command == 'status':
manager.status()
else:
print(f"Unknown command: {command}")
sys.exit(1)
except KeyboardInterrupt:
print("\nInterrupted by user")
manager.cleanup()
except Exception as e:
print(f"Error: {e}")
manager.cleanup()
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,84 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import logging
import httpx
import pandas as pd
import joblib
app = FastAPI(title="Ensemble Agent Service", version="1.0.0")
class PriceRequest(BaseModel):
description: str
class PriceResponse(BaseModel):
price: float
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "ensemble-agent"}
@app.post("/price", response_model=PriceResponse)
async def estimate_price(request: PriceRequest):
try:
prices = []
errors = []
async with httpx.AsyncClient() as client:
try:
specialist_resp = await client.post("http://localhost:8002/price", json={"description": request.description}, timeout=10)
if specialist_resp.status_code == 200:
specialist_data = specialist_resp.json()
specialist = specialist_data.get("price", 0.0)
prices.append(specialist)
logging.info(f"Specialist agent price: ${specialist:.2f}")
else:
errors.append(f"Specialist agent returned status {specialist_resp.status_code}")
except Exception as e:
errors.append(f"Specialist agent error: {str(e)}")
try:
frontier_resp = await client.post("http://localhost:8003/price", json={"description": request.description}, timeout=10)
if frontier_resp.status_code == 200:
frontier_data = frontier_resp.json()
frontier = frontier_data.get("price", 0.0)
prices.append(frontier)
logging.info(f"Frontier agent price: ${frontier:.2f}")
else:
errors.append(f"Frontier agent returned status {frontier_resp.status_code}")
except Exception as e:
errors.append(f"Frontier agent error: {str(e)}")
try:
rf_resp = await client.post("http://localhost:8004/price", json={"description": request.description}, timeout=10)
if rf_resp.status_code == 200:
rf_data = rf_resp.json()
random_forest = rf_data.get("price", 0.0)
prices.append(random_forest)
logging.info(f"Random forest agent price: ${random_forest:.2f}")
else:
errors.append(f"Random forest agent returned status {rf_resp.status_code}")
except Exception as e:
errors.append(f"Random forest agent error: {str(e)}")
valid_prices = [p for p in prices if 0 < p < 10000]
if valid_prices:
y = sum(valid_prices) / len(valid_prices)
logging.info(f"Ensemble price (from {len(valid_prices)} agents): ${y:.2f}")
else:
y = 100.0
logging.warning(f"No valid prices received, using fallback: ${y:.2f}")
logging.warning(f"Errors: {errors}")
return PriceResponse(price=y)
except Exception as e:
logging.error(f"Error in estimate_price: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8005)

View File

@@ -0,0 +1,35 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import logging
from shared.services.frontier_wrapper import FrontierAgentWrapper
app = FastAPI(title="Frontier Agent Service", version="1.0.0")
frontier_agent = FrontierAgentWrapper()
class PriceRequest(BaseModel):
description: str
class PriceResponse(BaseModel):
price: float
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "frontier-agent"}
@app.post("/price", response_model=PriceResponse)
async def estimate_price(request: PriceRequest):
try:
price = frontier_agent.price(request.description)
return PriceResponse(price=price)
except Exception as e:
logging.error(f"Error in estimate_price: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8003)

View File

@@ -0,0 +1,113 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import gradio as gr
import httpx
import logging
import asyncio
import socket
app = FastAPI(title="Notification Receiver", version="1.0.0")
notifications = []
class NotificationRequest(BaseModel):
message: str
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "notification-receiver"}
@app.post("/notification")
async def receive_notification(request: NotificationRequest):
notifications.append(request.message)
return {"status": "received"}
def get_notifications():
return "\n".join(notifications[-10:])
def find_available_port(start_port: int, max_attempts: int = 10) -> int:
"""Find an available port starting from start_port"""
for port in range(start_port, start_port + max_attempts):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('0.0.0.0', port))
return port
except OSError:
continue
raise RuntimeError(f"No available port found in range {start_port}-{start_port + max_attempts - 1}")
def create_gradio_interface():
with gr.Blocks(title="Deal Notifications") as interface:
gr.Markdown("# Deal Notifications")
output = gr.Textbox(label="Recent Notifications", lines=10, interactive=False)
def update():
return get_notifications()
interface.load(update, outputs=output)
gr.Timer(value=5).tick(update, outputs=output)
return interface
if __name__ == "__main__":
import uvicorn
import threading
import signal
# Find available ports
try:
fastapi_port = find_available_port(8008)
gradio_port = find_available_port(7861)
print(f"Using FastAPI port: {fastapi_port}")
print(f"Using Gradio port: {gradio_port}")
except RuntimeError as e:
print(f"Failed to find available ports: {e}")
sys.exit(1)
async def subscribe_to_notifications():
try:
async with httpx.AsyncClient() as client:
await client.post("http://localhost:8007/subscribe", json={"url": f"http://localhost:{fastapi_port}"})
print(f"Successfully subscribed to notifications on port {fastapi_port}")
except Exception as e:
print(f"Failed to subscribe to notifications: {e}")
def run_fastapi():
try:
uvicorn.run(app, host="0.0.0.0", port=fastapi_port)
except Exception as e:
print(f"FastAPI server error: {e}")
def signal_handler(signum, frame):
print("\nReceived interrupt signal. Shutting down gracefully...")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
# Start FastAPI server in background thread
fastapi_thread = threading.Thread(target=run_fastapi, daemon=True)
fastapi_thread.start()
# Start subscription in background thread
subscription_thread = threading.Thread(target=lambda: asyncio.run(subscribe_to_notifications()), daemon=True)
subscription_thread.start()
# Give services time to start
import time
time.sleep(2)
# Start Gradio interface
interface = create_gradio_interface()
interface.launch(server_name="0.0.0.0", server_port=gradio_port, share=False)
except KeyboardInterrupt:
print("\nShutting down...")
except Exception as e:
print(f"Error starting services: {e}")
sys.exit(1)

View File

@@ -0,0 +1,86 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import logging
import asyncio
import json
import socket
from typing import List, Dict
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
app = FastAPI(title="Notification Service", version="1.0.0")
subscribers = []
class AlertRequest(BaseModel):
deal: dict
estimate: float
discount: float
class SubscriberRequest(BaseModel):
url: str
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "notification-service"}
@app.post("/subscribe")
async def subscribe(request: SubscriberRequest):
subscribers.append(request.url)
return {"status": "subscribed"}
@app.post("/alert")
async def send_alert(request: AlertRequest):
message = f"Deal Alert! Price=${request.deal['price']:.2f}, Estimate=${request.estimate:.2f}, Discount=${request.discount:.2f} : {request.deal['product_description'][:10]}... {request.deal['url']}"
logger.info(f"Sending alert to {len(subscribers)} subscribers")
for subscriber in subscribers:
try:
import httpx
async with httpx.AsyncClient() as client:
await client.post(f"{subscriber}/notification", json={"message": message})
logger.info(f"Successfully notified {subscriber}")
except Exception as e:
logger.error(f"Failed to notify {subscriber}: {e}")
return {"status": "alert_sent"}
def is_port_available(port: int) -> bool:
"""Check if a port is available for binding"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('0.0.0.0', port))
return True
except OSError:
return False
if __name__ == "__main__":
import uvicorn
port = 8007
# Check if port is available before starting
if not is_port_available(port):
logger.error(f"Port {port} is already in use. Please stop the existing service or use a different port.")
sys.exit(1)
logger.info(f"Starting Notification Service on port {port}")
try:
uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")
except Exception as e:
logger.error(f"Failed to start service: {e}")
sys.exit(1)

View File

@@ -0,0 +1,79 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import logging
import httpx
import json
app = FastAPI(title="Planning Agent Service", version="1.0.0")
class MemoryRequest(BaseModel):
memory: List[str] = []
class OpportunityResponse(BaseModel):
deal: dict
estimate: float
discount: float
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "planning-agent"}
@app.post("/plan", response_model=Optional[OpportunityResponse])
async def plan_deals(request: MemoryRequest):
try:
async with httpx.AsyncClient() as client:
try:
scanner_resp = await client.post("http://localhost:8001/scan", json={"memory": request.memory}, timeout=30)
scanner_data = scanner_resp.json()
except Exception as e:
logging.error(f"Error calling scanner agent: {str(e)}")
return None
if not scanner_data.get("deals"):
logging.info("No deals found by scanner agent")
return None
best_deal = None
best_discount = 0
for deal in scanner_data["deals"][:5]:
try:
ensemble_resp = await client.post("http://localhost:8005/price", json={"description": deal["product_description"]}, timeout=30)
estimate = ensemble_resp.json()["price"]
discount = estimate - deal["price"]
if discount > best_discount:
best_discount = discount
best_deal = {
"deal": deal,
"estimate": estimate,
"discount": discount
}
except Exception as e:
logging.error(f"Error calling ensemble agent for deal {deal.get('product_description', 'unknown')}: {str(e)}")
continue
if best_discount > 50:
try:
await client.post("http://localhost:8007/alert", json=best_deal, timeout=10)
logging.info(f"Sent notification for deal with ${best_discount:.2f} discount")
except Exception as e:
logging.error(f"Error sending notification: {str(e)}")
return OpportunityResponse(**best_deal)
logging.info(f"Best deal discount ${best_discount:.2f} is not significant enough")
return None
except Exception as e:
logging.error(f"Error in plan_deals: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8006)

View File

@@ -0,0 +1,79 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import logging
import traceback
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Random Forest Agent Service", version="1.0.0")
try:
logger.info("Initializing Random Forest Agent...")
from shared.services.random_forest_wrapper import RandomForestAgentWrapper
random_forest_agent = RandomForestAgentWrapper()
logger.info("Random Forest Agent initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Random Forest Agent: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
random_forest_agent = None
class PriceRequest(BaseModel):
description: str
class PriceResponse(BaseModel):
price: float
@app.get("/health")
async def health_check():
if random_forest_agent is None:
return {"status": "unhealthy", "service": "random-forest-agent", "error": "Agent not initialized"}
return {"status": "healthy", "service": "random-forest-agent"}
@app.post("/price", response_model=PriceResponse)
async def estimate_price(request: PriceRequest):
try:
if random_forest_agent is None:
logger.error("Random Forest Agent not initialized")
raise HTTPException(status_code=500, detail="Agent not initialized")
logger.info(f"Processing price request for: {request.description}")
price = random_forest_agent.price(request.description)
logger.info(f"Price estimate: ${price:.2f}")
return PriceResponse(price=price)
except Exception as e:
logger.error(f"Error in estimate_price: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
import socket
def is_port_available(port):
"""Check if a port is available"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(('0.0.0.0', port))
return True
except OSError:
return False
port = 8004
if not is_port_available(port):
logger.warning(f"Port {port} is already in use. Trying alternative ports...")
for alt_port in range(8004, 8010):
if is_port_available(alt_port):
port = alt_port
logger.info(f"Using alternative port: {port}")
break
else:
logger.error("No available ports found in range 8004-8009")
sys.exit(1)
logger.info(f"Starting Random Forest Agent service on port {port}")
uvicorn.run(app, host="0.0.0.0", port=port)

View File

@@ -0,0 +1,64 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import ollama
import logging
from shared.services.scanner_wrapper import ScannerAgentWrapper
app = FastAPI(title="Scanner Agent Service", version="1.0.0")
scanner_agent = ScannerAgentWrapper()
class MemoryRequest(BaseModel):
memory: List[str] = []
class DealSelectionResponse(BaseModel):
deals: List[dict]
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "scanner-agent"}
@app.post("/scan", response_model=DealSelectionResponse)
async def scan_deals(request: MemoryRequest):
try:
result = scanner_agent.scan(request.memory)
if result:
return DealSelectionResponse(deals=[deal.model_dump() for deal in result.deals])
else:
return DealSelectionResponse(deals=[])
except Exception as e:
logging.error(f"Error in scan_deals: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
import socket
def is_port_available(port):
"""Check if a port is available"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(('0.0.0.0', port))
return True
except OSError:
return False
port = 8001
if not is_port_available(port):
logging.warning(f"Port {port} is already in use. Trying alternative ports...")
for alt_port in range(8001, 8010):
if is_port_available(alt_port):
port = alt_port
logging.info(f"Using alternative port: {port}")
break
else:
logging.error("No available ports found in range 8001-8009")
sys.exit(1)
logging.info(f"Starting Scanner Agent service on port {port}")
uvicorn.run(app, host="0.0.0.0", port=port)

View File

@@ -0,0 +1,80 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import ollama
import logging
import traceback
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Specialist Agent Service", version="1.0.0")
try:
logger.info("Initializing Specialist Agent...")
from shared.services.specialist_wrapper import SpecialistAgentWrapper
specialist_agent = SpecialistAgentWrapper()
logger.info("Specialist Agent initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Specialist Agent: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
specialist_agent = None
class PriceRequest(BaseModel):
description: str
class PriceResponse(BaseModel):
price: float
@app.get("/health")
async def health_check():
if specialist_agent is None:
return {"status": "unhealthy", "service": "specialist-agent", "error": "Agent not initialized"}
return {"status": "healthy", "service": "specialist-agent"}
@app.post("/price", response_model=PriceResponse)
async def estimate_price(request: PriceRequest):
try:
if specialist_agent is None:
logger.error("Specialist Agent not initialized")
raise HTTPException(status_code=500, detail="Agent not initialized")
logger.info(f"Processing price request for: {request.description}")
price = specialist_agent.price(request.description)
logger.info(f"Price estimate: ${price:.2f}")
return PriceResponse(price=price)
except Exception as e:
logger.error(f"Error in estimate_price: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
import socket
def is_port_available(port):
"""Check if a port is available"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(('0.0.0.0', port))
return True
except OSError:
return False
port = 8002
if not is_port_available(port):
logger.warning(f"Port {port} is already in use. Trying alternative ports...")
for alt_port in range(8002, 8010):
if is_port_available(alt_port):
port = alt_port
logger.info(f"Using alternative port: {port}")
break
else:
logger.error("No available ports found in range 8002-8009")
sys.exit(1)
logger.info(f"Starting Specialist Agent service on port {port}")
uvicorn.run(app, host="0.0.0.0", port=port)

View File

@@ -0,0 +1,299 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import logging
import queue
import threading
import time
import asyncio
import gradio as gr
import httpx
import plotly.graph_objects as go
import numpy as np
from sklearn.manifold import TSNE
try:
import chromadb
CHROMADB_AVAILABLE = True
except ImportError:
CHROMADB_AVAILABLE = False
logging.warning("ChromaDB not available - plots will show sample data")
from shared.log_utils import reformat
class MockAgentFramework:
"""Mock agent framework to prevent NoneType errors when real framework fails to initialize"""
def __init__(self):
self.memory = []
async def run(self):
return []
class QueueHandler(logging.Handler):
def __init__(self, log_queue):
super().__init__()
self.log_queue = log_queue
def emit(self, record):
self.log_queue.put(self.format(record))
def html_for(log_data):
output = '<br>'.join(log_data[-18:])
return f"""
<div id="scrollContent" style="height: 400px; overflow-y: auto; border: 1px solid #ccc; background-color: #222229; padding: 10px;">
{output}
</div>
"""
def setup_logging(log_queue):
handler = QueueHandler(log_queue)
formatter = logging.Formatter(
"[%(asctime)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %z",
)
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class App:
def __init__(self):
self.agent_framework = None
def get_agent_framework(self):
if not self.agent_framework:
try:
# Add the shared directory to the path
import sys
import os
shared_path = os.path.join(os.path.dirname(__file__), '..', 'shared')
if shared_path not in sys.path:
sys.path.insert(0, shared_path)
from deal_agent_framework_client import DealAgentFrameworkClient
self.agent_framework = DealAgentFrameworkClient()
except Exception as e:
logging.error(f"Failed to initialize agent framework: {e}")
# Create a mock framework to prevent NoneType errors
self.agent_framework = MockAgentFramework()
return self.agent_framework
def table_for(self, opps):
if not opps:
return []
try:
return [[opp.deal.product_description, f"${opp.deal.price:.2f}", f"${opp.estimate:.2f}", f"${opp.discount:.2f}", opp.deal.url] for opp in opps]
except Exception as e:
logging.error(f"Error formatting opportunities table: {e}")
return []
def update_output(self, log_data, log_queue, result_queue):
initial_result = self.table_for(self.get_agent_framework().memory)
final_result = None
while True:
try:
message = log_queue.get_nowait()
log_data.append(reformat(message))
yield log_data, html_for(log_data), final_result or initial_result
except queue.Empty:
try:
final_result = result_queue.get_nowait()
yield log_data, html_for(log_data), final_result or initial_result
except queue.Empty:
if final_result is not None:
break
time.sleep(0.1)
def get_initial_plot(self):
fig = go.Figure()
fig.update_layout(
title='Loading vector DB...',
height=400,
)
return fig
def get_sample_plot(self):
"""Create a sample plot when vector database is not available"""
fig = go.Figure()
# Create some sample data points
x = np.random.randn(50)
y = np.random.randn(50)
z = np.random.randn(50)
fig.add_trace(go.Scatter3d(
x=x, y=y, z=z,
mode='markers',
marker=dict(
size=5,
color=z,
colorscale='Viridis',
opacity=0.7
),
name='Sample Data'
))
fig.update_layout(
title='Sample 3D Plot (Vector DB not available)',
scene=dict(
xaxis_title='X',
yaxis_title='Y',
zaxis_title='Z'
),
height=400,
margin=dict(r=5, b=1, l=5, t=2)
)
return fig
def get_plot(self):
if not CHROMADB_AVAILABLE:
logging.warning("ChromaDB not available - showing sample plot")
return self.get_sample_plot()
try:
client = chromadb.PersistentClient(path='data/vectorstore')
collections = client.list_collections()
if not collections:
logging.warning("No collections found in vectorstore - creating sample plot")
return self.get_sample_plot()
collection = client.get_collection('products')
count = collection.count()
if count == 0:
logging.warning("Products collection is empty - creating sample plot")
return self.get_sample_plot()
result = collection.get(include=['embeddings', 'documents', 'metadatas'], limit=1000)
vectors = np.array(result['embeddings'])
documents = result['documents']
categories = [metadata['category'] for metadata in result['metadatas']]
CATEGORIES = ['Appliances', 'Automotive', 'Cell_Phones_and_Accessories', 'Electronics','Musical_Instruments', 'Office_Products', 'Tools_and_Home_Improvement', 'Toys_and_Games']
COLORS = ['red', 'blue', 'brown', 'orange', 'yellow', 'green' , 'purple', 'cyan']
colors = [COLORS[CATEGORIES.index(c)] if c in CATEGORIES else 'gray' for c in categories]
tsne = TSNE(n_components=3, random_state=42, n_jobs=-1)
reduced_vectors = tsne.fit_transform(vectors)
fig = go.Figure(data=[go.Scatter3d(
x=reduced_vectors[:, 0],
y=reduced_vectors[:, 1],
z=reduced_vectors[:, 2],
mode='markers',
marker=dict(size=2, color=colors, opacity=0.7),
)])
fig.update_layout(
scene=dict(xaxis_title='x',
yaxis_title='y',
zaxis_title='z',
aspectmode='manual',
aspectratio=dict(x=2.2, y=2.2, z=1),
camera=dict(
eye=dict(x=1.6, y=1.6, z=0.8)
)),
height=400,
margin=dict(r=5, b=1, l=5, t=2)
)
return fig
except Exception as e:
logging.error(f"Error creating plot: {e}")
return self.get_sample_plot()
def do_run(self):
if not self.agent_framework:
logging.warning("Agent framework not available")
return []
try:
# Use asyncio.run to handle the async call synchronously
import asyncio
new_opportunities = asyncio.run(self.agent_framework.run())
table = self.table_for(new_opportunities)
return table
except Exception as e:
logging.error(f"Error in do_run: {e}")
return []
def run_with_logging(self, initial_log_data):
log_queue = queue.Queue()
result_queue = queue.Queue()
setup_logging(log_queue)
def worker():
result = self.do_run()
result_queue.put(result)
thread = threading.Thread(target=worker)
thread.start()
for log_data, output, final_result in self.update_output(initial_log_data, log_queue, result_queue):
yield log_data, output, final_result
def do_select(self, selected_index: gr.SelectData):
opportunities = self.get_agent_framework().memory
row = selected_index.index[0]
opportunity = opportunities[row]
# Send alert via HTTP to the notification service
try:
import httpx
import asyncio
# Convert opportunity to the format expected by notification service
alert_data = {
"deal": opportunity.deal.model_dump(),
"estimate": opportunity.estimate,
"discount": opportunity.discount
}
asyncio.run(httpx.post("http://localhost:8007/alert", json=alert_data))
except Exception as e:
logging.error(f"Failed to send alert: {e}")
def run(self):
with gr.Blocks(title="The Price is Right", fill_width=True) as ui:
log_data = gr.State([])
with gr.Row():
gr.Markdown('<div style="text-align: center;font-size:24px"><strong>The Price is Right</strong> - Autonomous Agent Framework that hunts for deals</div>')
with gr.Row():
gr.Markdown('<div style="text-align: center;font-size:14px">A proprietary fine-tuned LLM deployed on Modal and a RAG pipeline with a frontier model collaborate to send push notifications with great online deals.</div>')
with gr.Row():
opportunities_dataframe = gr.Dataframe(
headers=["Deals found so far", "Price", "Estimate", "Discount", "URL"],
wrap=True,
column_widths=[6, 1, 1, 1, 3],
row_count=10,
col_count=5,
max_height=400,
)
with gr.Row():
with gr.Column(scale=1):
logs = gr.HTML()
with gr.Column(scale=1):
plot = gr.Plot(value=self.get_plot(), show_label=False)
ui.load(self.run_with_logging, inputs=[log_data], outputs=[log_data, logs, opportunities_dataframe])
timer = gr.Timer(value=300, active=True)
timer.tick(self.run_with_logging, inputs=[log_data], outputs=[log_data, logs, opportunities_dataframe])
opportunities_dataframe.select(self.do_select)
# Try to launch on port 7860, fallback to other ports if needed
ports_to_try = [7860, 7861, 7862, 7863, 7864]
for port in ports_to_try:
try:
ui.launch(share=False, inbrowser=True, server_name="0.0.0.0", server_port=port)
break
except OSError as e:
if "address already in use" in str(e) and port < ports_to_try[-1]:
logging.warning(f"Port {port} is already in use, trying next port...")
continue
else:
raise e
if __name__=="__main__":
import asyncio
App().run()

View File

@@ -0,0 +1,33 @@
import logging
class Agent:
"""
An abstract superclass for Agents
Used to log messages in a way that can identify each Agent
"""
# Foreground colors
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
# Background color
BG_BLACK = '\033[40m'
# Reset code to return to default color
RESET = '\033[0m'
name: str = ""
color: str = '\033[37m'
def log(self, message):
"""
Log this as an info message, identifying the agent
"""
color_code = self.BG_BLACK + self.color
message = f"[{self.name}] {message}"
logging.info(color_code + message + self.RESET)

View File

@@ -0,0 +1,109 @@
from pydantic import BaseModel
from typing import List, Dict, Self
from bs4 import BeautifulSoup
import re
import feedparser
from tqdm import tqdm
import requests
import time
feeds = [
"https://www.dealnews.com/c142/Electronics/?rss=1",
"https://www.dealnews.com/c39/Computers/?rss=1",
"https://www.dealnews.com/c238/Automotive/?rss=1",
"https://www.dealnews.com/f1912/Smart-Home/?rss=1",
"https://www.dealnews.com/c196/Home-Garden/?rss=1",
]
def extract(html_snippet: str) -> str:
"""
Use Beautiful Soup to clean up this HTML snippet and extract useful text
"""
soup = BeautifulSoup(html_snippet, 'html.parser')
snippet_div = soup.find('div', class_='snippet summary')
if snippet_div:
description = snippet_div.get_text(strip=True)
description = BeautifulSoup(description, 'html.parser').get_text()
description = re.sub('<[^<]+?>', '', description)
result = description.strip()
else:
result = html_snippet
return result.replace('\n', ' ')
class ScrapedDeal:
"""
A class to represent a Deal retrieved from an RSS feed
"""
category: str
title: str
summary: str
url: str
details: str
features: str
def __init__(self, entry: Dict[str, str]):
"""
Populate this instance based on the provided dict
"""
self.title = entry['title']
self.summary = extract(entry['summary'])
self.url = entry['links'][0]['href']
stuff = requests.get(self.url).content
soup = BeautifulSoup(stuff, 'html.parser')
content = soup.find('div', class_='content-section').get_text()
content = content.replace('\nmore', '').replace('\n', ' ')
if "Features" in content:
self.details, self.features = content.split("Features")
else:
self.details = content
self.features = ""
def __repr__(self):
"""
Return a string to describe this deal
"""
return f"<{self.title}>"
def describe(self):
"""
Return a longer string to describe this deal for use in calling a model
"""
return f"Title: {self.title}\nDetails: {self.details.strip()}\nFeatures: {self.features.strip()}\nURL: {self.url}"
@classmethod
def fetch(cls, show_progress : bool = False) -> List[Self]:
"""
Retrieve all deals from the selected RSS feeds
"""
deals = []
feed_iter = tqdm(feeds) if show_progress else feeds
for feed_url in feed_iter:
feed = feedparser.parse(feed_url)
for entry in feed.entries[:10]:
deals.append(cls(entry))
time.sleep(0.5)
return deals
class Deal(BaseModel):
"""
A class to Represent a Deal with a summary description
"""
product_description: str
price: float
url: str
class DealSelection(BaseModel):
"""
A class to Represent a list of Deals
"""
deals: List[Deal]
class Opportunity(BaseModel):
"""
A class to represent a possible opportunity: a Deal where we estimate
it should cost more than it's being offered
"""
deal: Deal
estimate: float
discount: float

View File

@@ -0,0 +1,48 @@
import pandas as pd
from sklearn.linear_model import LinearRegression
import joblib
from agents.agent import Agent
from agents.specialist_agent import SpecialistAgent
from agents.frontier_agent import FrontierAgent
from agents.random_forest_agent import RandomForestAgent
class EnsembleAgent(Agent):
name = "Ensemble Agent"
color = Agent.YELLOW
def __init__(self, collection):
"""
Create an instance of Ensemble, by creating each of the models
And loading the weights of the Ensemble
"""
self.log("Initializing Ensemble Agent")
self.specialist = SpecialistAgent()
self.frontier = FrontierAgent(collection)
self.random_forest = RandomForestAgent()
self.model = joblib.load('/app/data/models/ensemble_model.pkl')
self.log("Ensemble Agent is ready")
def price(self, description: str) -> float:
"""
Run this ensemble model
Ask each of the models to price the product
Then use the Linear Regression model to return the weighted price
:param description: the description of a product
:return: an estimate of its price
"""
self.log("Running Ensemble Agent - collaborating with specialist, frontier and random forest agents")
specialist = self.specialist.price(description)
frontier = self.frontier.price(description)
random_forest = self.random_forest.price(description)
X = pd.DataFrame({
'Specialist': [specialist],
'Frontier': [frontier],
'RandomForest': [random_forest],
'Min': [min(specialist, frontier, random_forest)],
'Max': [max(specialist, frontier, random_forest)],
})
y = max(0, self.model.predict(X)[0])
self.log(f"Ensemble Agent complete - returning ${y:.2f}")
return y

View File

@@ -0,0 +1,113 @@
# imports
import os
import re
import math
import json
from typing import List, Dict
from openai import OpenAI
from sentence_transformers import SentenceTransformer
from datasets import load_dataset
import chromadb
from items import Item
from testing import Tester
from agents.agent import Agent
class FrontierAgent(Agent):
name = "Frontier Agent"
color = Agent.BLUE
MODEL = "gpt-4o-mini"
def __init__(self, collection):
"""
Set up this instance by connecting to OpenAI or DeepSeek, to the Chroma Datastore,
And setting up the vector encoding model
"""
self.log("Initializing Frontier Agent")
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
if deepseek_api_key:
self.client = OpenAI(api_key=deepseek_api_key, base_url="https://api.deepseek.com")
self.MODEL = "deepseek-chat"
self.log("Frontier Agent is set up with DeepSeek")
else:
self.client = OpenAI()
self.MODEL = "gpt-4o-mini"
self.log("Frontier Agent is setting up with OpenAI")
self.collection = collection
self.model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
self.log("Frontier Agent is ready")
def make_context(self, similars: List[str], prices: List[float]) -> str:
"""
Create context that can be inserted into the prompt
:param similars: similar products to the one being estimated
:param prices: prices of the similar products
:return: text to insert in the prompt that provides context
"""
message = "To provide some context, here are some other items that might be similar to the item you need to estimate.\n\n"
for similar, price in zip(similars, prices):
message += f"Potentially related product:\n{similar}\nPrice is ${price:.2f}\n\n"
return message
def messages_for(self, description: str, similars: List[str], prices: List[float]) -> List[Dict[str, str]]:
"""
Create the message list to be included in a call to OpenAI
With the system and user prompt
:param description: a description of the product
:param similars: similar products to this one
:param prices: prices of similar products
:return: the list of messages in the format expected by OpenAI
"""
system_message = "You estimate prices of items. Reply only with the price, no explanation"
user_prompt = self.make_context(similars, prices)
user_prompt += "And now the question for you:\n\n"
user_prompt += "How much does this cost?\n\n" + description
return [
{"role": "system", "content": system_message},
{"role": "user", "content": user_prompt},
{"role": "assistant", "content": "Price is $"}
]
def find_similars(self, description: str):
"""
Return a list of items similar to the given one by looking in the Chroma datastore
"""
self.log("Frontier Agent is performing a RAG search of the Chroma datastore to find 5 similar products")
vector = self.model.encode([description])
results = self.collection.query(query_embeddings=vector.astype(float).tolist(), n_results=5)
documents = results['documents'][0][:]
prices = [m['price'] for m in results['metadatas'][0][:]]
self.log("Frontier Agent has found similar products")
return documents, prices
def get_price(self, s) -> float:
"""
A utility that plucks a floating point number out of a string
"""
s = s.replace('$','').replace(',','')
match = re.search(r"[-+]?\d*\.\d+|\d+", s)
return float(match.group()) if match else 0.0
def price(self, description: str) -> float:
"""
Make a call to OpenAI or DeepSeek to estimate the price of the described product,
by looking up 5 similar products and including them in the prompt to give context
:param description: a description of the product
:return: an estimate of the price
"""
documents, prices = self.find_similars(description)
self.log(f"Frontier Agent is about to call {self.MODEL} with context including 5 similar products")
response = self.client.chat.completions.create(
model=self.MODEL,
messages=self.messages_for(description, documents, prices),
seed=42,
max_tokens=5
)
reply = response.choices[0].message.content
result = self.get_price(reply)
self.log(f"Frontier Agent completed - predicting ${result:.2f}")
return result

View File

@@ -0,0 +1,79 @@
import os
# from twilio.rest import Client
from agents.deals import Opportunity
import http.client
import urllib
from agents.agent import Agent
# Uncomment the Twilio lines if you wish to use Twilio
DO_TEXT = False
DO_PUSH = True
class MessagingAgent(Agent):
name = "Messaging Agent"
color = Agent.WHITE
def __init__(self):
"""
Set up this object to either do push notifications via Pushover,
or SMS via Twilio,
whichever is specified in the constants
"""
self.log(f"Messaging Agent is initializing")
if DO_TEXT:
account_sid = os.getenv('TWILIO_ACCOUNT_SID', 'your-sid-if-not-using-env')
auth_token = os.getenv('TWILIO_AUTH_TOKEN', 'your-auth-if-not-using-env')
self.me_from = os.getenv('TWILIO_FROM', 'your-phone-number-if-not-using-env')
self.me_to = os.getenv('MY_PHONE_NUMBER', 'your-phone-number-if-not-using-env')
# self.client = Client(account_sid, auth_token)
self.log("Messaging Agent has initialized Twilio")
if DO_PUSH:
self.pushover_user = os.getenv('PUSHOVER_USER', 'your-pushover-user-if-not-using-env')
self.pushover_token = os.getenv('PUSHOVER_TOKEN', 'your-pushover-user-if-not-using-env')
self.log("Messaging Agent has initialized Pushover")
def message(self, text):
"""
Send an SMS message using the Twilio API
"""
self.log("Messaging Agent is sending a text message")
message = self.client.messages.create(
from_=self.me_from,
body=text,
to=self.me_to
)
def push(self, text):
"""
Send a Push Notification using the Pushover API
"""
self.log("Messaging Agent is sending a push notification")
conn = http.client.HTTPSConnection("api.pushover.net:443")
conn.request("POST", "/1/messages.json",
urllib.parse.urlencode({
"token": self.pushover_token,
"user": self.pushover_user,
"message": text,
"sound": "cashregister"
}), { "Content-type": "application/x-www-form-urlencoded" })
conn.getresponse()
def alert(self, opportunity: Opportunity):
"""
Make an alert about the specified Opportunity
"""
text = f"Deal Alert! Price=${opportunity.deal.price:.2f}, "
text += f"Estimate=${opportunity.estimate:.2f}, "
text += f"Discount=${opportunity.discount:.2f} :"
text += opportunity.deal.product_description[:10]+'... '
text += opportunity.deal.url
if DO_TEXT:
self.message(text)
if DO_PUSH:
self.push(text)
self.log("Messaging Agent has completed")

View File

@@ -0,0 +1,57 @@
from typing import Optional, List
from agents.agent import Agent
from agents.deals import ScrapedDeal, DealSelection, Deal, Opportunity
from agents.scanner_agent import ScannerAgent
from agents.ensemble_agent import EnsembleAgent
from agents.messaging_agent import MessagingAgent
class PlanningAgent(Agent):
name = "Planning Agent"
color = Agent.GREEN
DEAL_THRESHOLD = 50
def __init__(self, collection):
"""
Create instances of the 3 Agents that this planner coordinates across
"""
self.log("Planning Agent is initializing")
self.scanner = ScannerAgent()
self.ensemble = EnsembleAgent(collection)
self.messenger = MessagingAgent()
self.log("Planning Agent is ready")
def run(self, deal: Deal) -> Opportunity:
"""
Run the workflow for a particular deal
:param deal: the deal, summarized from an RSS scrape
:returns: an opportunity including the discount
"""
self.log("Planning Agent is pricing up a potential deal")
estimate = self.ensemble.price(deal.product_description)
discount = estimate - deal.price
self.log(f"Planning Agent has processed a deal with discount ${discount:.2f}")
return Opportunity(deal=deal, estimate=estimate, discount=discount)
def plan(self, memory: List[str] = []) -> Optional[Opportunity]:
"""
Run the full workflow:
1. Use the ScannerAgent to find deals from RSS feeds
2. Use the EnsembleAgent to estimate them
3. Use the MessagingAgent to send a notification of deals
:param memory: a list of URLs that have been surfaced in the past
:return: an Opportunity if one was surfaced, otherwise None
"""
self.log("Planning Agent is kicking off a run")
selection = self.scanner.scan(memory=memory)
if selection:
opportunities = [self.run(deal) for deal in selection.deals[:5]]
opportunities.sort(key=lambda opp: opp.discount, reverse=True)
best = opportunities[0]
self.log(f"Planning Agent has identified the best deal has discount ${best.discount:.2f}")
if best.discount > self.DEAL_THRESHOLD:
self.messenger.alert(best)
self.log("Planning Agent has completed a run")
return best if best.discount > self.DEAL_THRESHOLD else None
return None

View File

@@ -0,0 +1,37 @@
# imports
import os
import re
from typing import List
from sentence_transformers import SentenceTransformer
import joblib
from agents.agent import Agent
class RandomForestAgent(Agent):
name = "Random Forest Agent"
color = Agent.MAGENTA
def __init__(self):
"""
Initialize this object by loading in the saved model weights
and the SentenceTransformer vector encoding model
"""
self.log("Random Forest Agent is initializing")
self.vectorizer = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
self.model = joblib.load('/app/data/models/random_forest_model.pkl')
self.log("Random Forest Agent is ready")
def price(self, description: str) -> float:
"""
Use a Random Forest model to estimate the price of the described item
:param description: the product to be estimated
:return: the price as a float
"""
self.log("Random Forest Agent is starting a prediction")
vector = self.vectorizer.encode([description])
result = max(0, self.model.predict(vector)[0])
self.log(f"Random Forest Agent completed - predicting ${result:.2f}")
return result

View File

@@ -0,0 +1,94 @@
import os
import json
from typing import Optional, List
from openai import OpenAI
from agents.deals import ScrapedDeal, DealSelection
from agents.agent import Agent
class ScannerAgent(Agent):
MODEL = "gpt-4o-mini"
SYSTEM_PROMPT = """You identify and summarize the 5 most detailed deals from a list, by selecting deals that have the most detailed, high quality description and the most clear price.
Respond strictly in JSON with no explanation, using this format. You should provide the price as a number derived from the description. If the price of a deal isn't clear, do not include that deal in your response.
Most important is that you respond with the 5 deals that have the most detailed product description with price. It's not important to mention the terms of the deal; most important is a thorough description of the product.
Be careful with products that are described as "$XXX off" or "reduced by $XXX" - this isn't the actual price of the product. Only respond with products when you are highly confident about the price.
{"deals": [
{
"product_description": "Your clearly expressed summary of the product in 4-5 sentences. Details of the item are much more important than why it's a good deal. Avoid mentioning discounts and coupons; focus on the item itself. There should be a paragpraph of text for each item you choose.",
"price": 99.99,
"url": "the url as provided"
},
...
]}"""
USER_PROMPT_PREFIX = """Respond with the most promising 5 deals from this list, selecting those which have the most detailed, high quality product description and a clear price that is greater than 0.
Respond strictly in JSON, and only JSON. You should rephrase the description to be a summary of the product itself, not the terms of the deal.
Remember to respond with a paragraph of text in the product_description field for each of the 5 items that you select.
Be careful with products that are described as "$XXX off" or "reduced by $XXX" - this isn't the actual price of the product. Only respond with products when you are highly confident about the price.
Deals:
"""
USER_PROMPT_SUFFIX = "\n\nStrictly respond in JSON and include exactly 5 deals, no more."
name = "Scanner Agent"
color = Agent.CYAN
def __init__(self):
"""
Set up this instance by initializing OpenAI
"""
self.log("Scanner Agent is initializing")
self.openai = OpenAI()
self.log("Scanner Agent is ready")
def fetch_deals(self, memory) -> List[ScrapedDeal]:
"""
Look up deals published on RSS feeds
Return any new deals that are not already in the memory provided
"""
self.log("Scanner Agent is about to fetch deals from RSS feed")
urls = [opp.deal.url for opp in memory]
scraped = ScrapedDeal.fetch()
result = [scrape for scrape in scraped if scrape.url not in urls]
self.log(f"Scanner Agent received {len(result)} deals not already scraped")
return result
def make_user_prompt(self, scraped) -> str:
"""
Create a user prompt for OpenAI based on the scraped deals provided
"""
user_prompt = self.USER_PROMPT_PREFIX
user_prompt += '\n\n'.join([scrape.describe() for scrape in scraped])
user_prompt += self.USER_PROMPT_SUFFIX
return user_prompt
def scan(self, memory: List[str]=[]) -> Optional[DealSelection]:
"""
Call OpenAI to provide a high potential list of deals with good descriptions and prices
Use StructuredOutputs to ensure it conforms to our specifications
:param memory: a list of URLs representing deals already raised
:return: a selection of good deals, or None if there aren't any
"""
scraped = self.fetch_deals(memory)
if scraped:
user_prompt = self.make_user_prompt(scraped)
self.log("Scanner Agent is calling OpenAI using Structured Output")
result = self.openai.beta.chat.completions.parse(
model=self.MODEL,
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": user_prompt}
],
response_format=DealSelection
)
result = result.choices[0].message.parsed
result.deals = [deal for deal in result.deals if deal.price>0]
self.log(f"Scanner Agent received {len(result.deals)} selected deals with price>0 from OpenAI")
return result
return None

View File

@@ -0,0 +1,29 @@
import modal
from agents.agent import Agent
class SpecialistAgent(Agent):
"""
An Agent that runs our fine-tuned LLM that's running remotely on Modal
"""
name = "Specialist Agent"
color = Agent.RED
def __init__(self):
"""
Set up this Agent by creating an instance of the modal class
"""
self.log("Specialist Agent is initializing - connecting to modal")
Pricer = modal.Cls.from_name("pricer-service", "Pricer")
self.pricer = Pricer()
self.log("Specialist Agent is ready")
def price(self, description: str) -> float:
"""
Make a remote call to return the estimate of the price of this item
"""
self.log("Specialist Agent is calling remote fine-tuned model")
result = self.pricer.price.remote(description)
self.log(f"Specialist Agent completed - predicting ${result:.2f}")
return result

View File

@@ -0,0 +1,99 @@
import os
import sys
import logging
import json
from typing import List, Optional
from twilio.rest import Client
from dotenv import load_dotenv
import chromadb
from agents.planning_agent import PlanningAgent
from agents.deals import Opportunity
from sklearn.manifold import TSNE
import numpy as np
# Colors for logging
BG_BLUE = '\033[44m'
WHITE = '\033[37m'
RESET = '\033[0m'
# Colors for plot
CATEGORIES = ['Appliances', 'Automotive', 'Cell_Phones_and_Accessories', 'Electronics','Musical_Instruments', 'Office_Products', 'Tools_and_Home_Improvement', 'Toys_and_Games']
COLORS = ['red', 'blue', 'brown', 'orange', 'yellow', 'green' , 'purple', 'cyan']
def init_logging():
root = logging.getLogger()
root.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"[%(asctime)s] [Agents] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %z",
)
handler.setFormatter(formatter)
root.addHandler(handler)
class DealAgentFramework:
DB = "products_vectorstore"
MEMORY_FILENAME = "memory.json"
def __init__(self):
init_logging()
load_dotenv()
client = chromadb.PersistentClient(path=self.DB)
self.memory = self.read_memory()
self.collection = client.get_or_create_collection('products')
self.planner = None
def init_agents_as_needed(self):
if not self.planner:
self.log("Initializing Agent Framework")
self.planner = PlanningAgent(self.collection)
self.log("Agent Framework is ready")
def read_memory(self) -> List[Opportunity]:
if os.path.exists(self.MEMORY_FILENAME):
with open(self.MEMORY_FILENAME, "r") as file:
data = json.load(file)
opportunities = [Opportunity(**item) for item in data]
return opportunities
return []
def write_memory(self) -> None:
data = [opportunity.model_dump() for opportunity in self.memory]
with open(self.MEMORY_FILENAME, "w") as file:
json.dump(data, file, indent=2)
def log(self, message: str):
text = BG_BLUE + WHITE + "[Agent Framework] " + message + RESET
logging.info(text)
def run(self) -> List[Opportunity]:
self.init_agents_as_needed()
logging.info("Kicking off Planning Agent")
result = self.planner.plan(memory=self.memory)
logging.info(f"Planning Agent has completed and returned: {result}")
if result:
self.memory.append(result)
self.write_memory()
return self.memory
@classmethod
def get_plot_data(cls, max_datapoints=10000):
client = chromadb.PersistentClient(path=cls.DB)
collection = client.get_or_create_collection('products')
result = collection.get(include=['embeddings', 'documents', 'metadatas'], limit=max_datapoints)
vectors = np.array(result['embeddings'])
documents = result['documents']
categories = [metadata['category'] for metadata in result['metadatas']]
colors = [COLORS[CATEGORIES.index(c)] for c in categories]
tsne = TSNE(n_components=3, random_state=42, n_jobs=-1)
reduced_vectors = tsne.fit_transform(vectors)
return documents, reduced_vectors, colors
if __name__=="__main__":
DealAgentFramework().run()

View File

@@ -0,0 +1,81 @@
import sys
import os
# Add the shared directory to the path
shared_path = os.path.dirname(__file__)
if shared_path not in sys.path:
sys.path.insert(0, shared_path)
import os
import sys
import logging
import json
import httpx
from typing import List, Optional
from agents.deals import Opportunity
BG_BLUE = '\033[44m'
WHITE = '\033[37m'
RESET = '\033[0m'
def init_logging():
root = logging.getLogger()
root.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"[%(asctime)s] [Agents] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %z",
)
handler.setFormatter(formatter)
root.addHandler(handler)
class DealAgentFrameworkClient:
MEMORY_FILENAME = "memory.json"
def __init__(self):
init_logging()
self.memory = self.read_memory()
def read_memory(self) -> List[Opportunity]:
if os.path.exists(self.MEMORY_FILENAME):
with open(self.MEMORY_FILENAME, "r") as file:
data = json.load(file)
opportunities = [Opportunity(**item) for item in data]
return opportunities
return []
def write_memory(self) -> None:
data = [opportunity.model_dump() for opportunity in self.memory]
with open(self.MEMORY_FILENAME, "w") as file:
json.dump(data, file, indent=2)
def log(self, message: str):
text = BG_BLUE + WHITE + "[Agent Framework] " + message + RESET
logging.info(text)
async def run(self) -> List[Opportunity]:
self.log("Kicking off Planning Agent")
async with httpx.AsyncClient() as client:
# Extract URLs from memory for the planning agent
memory_urls = [opp.deal.url for opp in self.memory]
result = await client.post("http://localhost:8006/plan", json={"memory": memory_urls})
if result.status_code == 200:
opportunity_data = result.json()
if opportunity_data:
opportunity = Opportunity(**opportunity_data)
self.memory.append(opportunity)
self.write_memory()
self.log(f"Planning Agent has completed and returned: {opportunity}")
else:
self.log("Planning Agent completed with no new opportunities")
else:
self.log(f"Planning Agent failed with status {result.status_code}")
return self.memory
if __name__=="__main__":
import asyncio
asyncio.run(DealAgentFrameworkClient().run())

View File

@@ -0,0 +1,35 @@
# Foreground colors
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
# Background color
BG_BLACK = '\033[40m'
BG_BLUE = '\033[44m'
# Reset code to return to default color
RESET = '\033[0m'
mapper = {
BG_BLACK+RED: "#dd0000",
BG_BLACK+GREEN: "#00dd00",
BG_BLACK+YELLOW: "#dddd00",
BG_BLACK+BLUE: "#0000ee",
BG_BLACK+MAGENTA: "#aa00dd",
BG_BLACK+CYAN: "#00dddd",
BG_BLACK+WHITE: "#87CEEB",
BG_BLUE+WHITE: "#ff7800"
}
def reformat(message):
for key, value in mapper.items():
message = message.replace(key, f'<span style="color: {value}">')
message = message.replace(RESET, '</span>')
return message

View File

@@ -0,0 +1,141 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import os
import re
import math
import json
from typing import List, Dict
import ollama
from sentence_transformers import SentenceTransformer
import chromadb
from agents.agent import Agent
class FrontierAgentWrapper(Agent):
name = "Frontier Agent"
color = Agent.BLUE
MODEL = "llama3.2:3b-instruct-q4_0"
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
def __init__(self):
"""
Set up this instance by connecting to Ollama, to the Chroma Datastore,
And setting up the vector encoding model
"""
self.log("Initializing Frontier Agent")
self.client = ollama.Client(host=self.OLLAMA_HOST)
self.log("Frontier Agent is set up with Ollama")
# Initialize ChromaDB
self.collection = chromadb.PersistentClient(path='data/vectorstore').get_or_create_collection('products')
self.model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
self.log("Frontier Agent is ready")
def make_context(self, similars: List[str], prices: List[float]) -> str:
"""
Create context that can be inserted into the prompt
:param similars: similar products to the one being estimated
:param prices: prices of the similar products
:return: text to insert in the prompt that provides context
"""
message = "To provide some context, here are some other items that might be similar to the item you need to estimate.\n\n"
for similar, price in zip(similars, prices):
message += f"Potentially related product:\n{similar}\nPrice is ${price:.2f}\n\n"
return message
def messages_for(self, description: str, similars: List[str], prices: List[float]) -> List[Dict[str, str]]:
"""
Create the message list to be included in a call to Ollama
With the system and user prompt
:param description: a description of the product
:param similars: similar products to this one
:param prices: prices of similar products
:return: the list of messages in the format expected by Ollama
"""
system_message = "You estimate prices of items. Reply only with the price, no explanation"
user_prompt = self.make_context(similars, prices)
user_prompt += "And now the question for you:\n\n"
user_prompt += "How much does this cost?\n\n" + description
return [
{"role": "system", "content": system_message},
{"role": "user", "content": user_prompt}
]
def find_similars(self, description: str):
"""
Return a list of items similar to the given one by looking in the Chroma datastore
"""
self.log("Frontier Agent is performing a RAG search of the Chroma datastore to find 5 similar products")
vector = self.model.encode([description])
results = self.collection.query(query_embeddings=vector.astype(float).tolist(), n_results=5)
documents = results['documents'][0][:]
prices = [m['price'] for m in results['metadatas'][0][:]]
self.log("Frontier Agent has found similar products")
return documents, prices
def get_price(self, s) -> float:
"""
A utility that plucks a floating point number out of a string
"""
s = s.replace('$','').replace(',','')
match = re.search(r"[-+]?\d*\.\d+|\d+", s)
return float(match.group()) if match else 0.0
def price(self, description: str) -> float:
"""
Make a call to Ollama to estimate the price of the described product,
by looking up 5 similar products and including them in the prompt to give context
:param description: a description of the product
:return: an estimate of the price
"""
documents, prices = self.find_similars(description)
self.log(f"Frontier Agent is about to call {self.MODEL} with context including 5 similar products")
try:
self.log(f"Connecting to Ollama at {self.OLLAMA_HOST}")
response = self.client.chat(
model=self.MODEL,
messages=self.messages_for(description, documents, prices)
)
reply = response['message']['content']
self.log(f"Raw response from Ollama: {reply}")
result = self.get_price(reply)
self.log(f"Frontier Agent completed - predicting ${result:.2f}")
return result
except Exception as e:
self.log(f"Error calling Ollama: {str(e)}")
self.log(f"Ollama host: {self.OLLAMA_HOST}")
self.log(f"Model: {self.MODEL}")
# Fallback: simple keyword-based pricing for testing
self.log("Using fallback pricing logic")
fallback_price = self._fallback_pricing(description)
self.log(f"Fallback price: ${fallback_price:.2f}")
return fallback_price
def _fallback_pricing(self, description: str) -> float:
"""
Simple fallback pricing based on keywords for testing
"""
description_lower = description.lower()
# Basic keyword-based pricing
if any(word in description_lower for word in ['iphone', 'iphone 15', 'pro max']):
return 1200.0
elif any(word in description_lower for word in ['macbook', 'macbook pro', 'm3']):
return 2000.0
elif any(word in description_lower for word in ['samsung', 'galaxy', 's24']):
return 1000.0
elif any(word in description_lower for word in ['sony', 'headphones', 'wh-1000xm5']):
return 400.0
elif any(word in description_lower for word in ['laptop', 'computer']):
return 800.0
elif any(word in description_lower for word in ['phone', 'smartphone']):
return 600.0
elif any(word in description_lower for word in ['tablet', 'ipad']):
return 500.0
else:
return 100.0 # Default fallback price

View File

@@ -0,0 +1,111 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import os
import re
import pickle
import threading
import gzip
import warnings
from typing import List, Optional
from sentence_transformers import SentenceTransformer
import joblib
from agents.agent import Agent
# Suppress scikit-learn version mismatch warnings
warnings.filterwarnings("ignore", category=UserWarning, module="sklearn")
class RandomForestAgentWrapper(Agent):
name = "Random Forest Agent"
color = Agent.MAGENTA
def __init__(self):
self.log("Random Forest Agent is initializing")
self._model_loaded = False
self._model_lock = threading.Lock()
self.model: Optional[object] = None
try:
self.log("Loading sentence transformer model...")
self.vectorizer = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
self.log("Sentence transformer loaded successfully")
# Load model in background thread for faster startup
self._load_model_async()
self.log("Random Forest Agent is ready (model loading in background)")
except Exception as e:
self.log(f"Error initializing Random Forest Agent: {str(e)}")
raise
def _load_model_async(self):
"""Load the model in a background thread"""
def load_model():
try:
self.log("Loading random forest model...")
# Use absolute path to ensure we find the model file
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
model_path = os.path.join(base_dir, 'data', 'models', 'random_forest_model.pkl')
self.log(f"Looking for model at: {model_path}")
# Check if file exists
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found at {model_path}")
# Try to load compressed model first, fallback to regular model
compressed_path = os.path.join(base_dir, 'data', 'models', 'random_forest_model_compressed.pkl.gz')
if os.path.exists(compressed_path):
self.log(f"Loading compressed model from: {compressed_path}")
with gzip.open(compressed_path, 'rb') as f:
self.model = joblib.load(f)
else:
self.log(f"Loading regular model from: {model_path}")
# Note: Model was trained with scikit-learn 1.5.2, current version is 1.7.2
# This may cause warnings but the model should still work correctly
# Use joblib with memory mapping for faster loading
self.model = joblib.load(model_path, mmap_mode='r')
with self._model_lock:
self._model_loaded = True
self.log("Random Forest model loaded successfully")
except Exception as e:
self.log(f"Error loading model: {str(e)}")
# Don't raise the exception to prevent service startup failure
# The service can still start and handle requests gracefully
import traceback
self.log(f"Model loading traceback: {traceback.format_exc()}")
# Start loading in background thread
thread = threading.Thread(target=load_model, daemon=True)
thread.start()
def _ensure_model_loaded(self):
"""Ensure model is loaded before use"""
if not self._model_loaded:
self.log("Waiting for model to load...")
# Wait for model to be loaded
while not self._model_loaded:
import time
time.sleep(0.1)
def price(self, description: str) -> float:
self.log("Random Forest Agent is starting a prediction")
# Ensure model is loaded before use
self._ensure_model_loaded()
# Check if model is actually loaded
if self.model is None:
self.log("Model is not available, returning default price")
return 0.0
try:
vector = self.vectorizer.encode([description])
result = max(0, self.model.predict(vector)[0])
self.log(f"Random Forest Agent completed - predicting ${result:.2f}")
return result
except Exception as e:
self.log(f"Error during prediction: {str(e)}")
return 0.0

View File

@@ -0,0 +1,176 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import logging
from typing import Optional, List
from agents.deals import ScrapedDeal, DealSelection
from agents.agent import Agent
import ollama
import json
class ScannerAgentWrapper(Agent):
"""
Wrapper for ScannerAgent that uses Ollama instead of OpenAI
"""
MODEL = "llama3.2"
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
SYSTEM_PROMPT = """You identify and summarize the 5 most detailed deals from a list, by selecting deals that have the most detailed, high quality description and the most clear price.
Respond strictly in JSON with no explanation, using this format. You should provide the price as a number derived from the description. If the price of a deal isn't clear, do not include that deal in your response.
Most important is that you respond with the 5 deals that have the most detailed product description with price. It's not important to mention the terms of the deal; most important is a thorough description of the product.
Be careful with products that are described as "$XXX off" or "reduced by $XXX" - this isn't the actual price of the product. Only respond with products when you are highly confident about the price.
{"deals": [
{
"product_description": "Your clearly expressed summary of the product in 4-5 sentences. Details of the item are much more important than why it's a good deal. Avoid mentioning discounts and coupons; focus on the item itself. There should be a paragpraph of text for each item you choose.",
"price": 99.99,
"url": "the url as provided"
}
]}"""
USER_PROMPT_PREFIX = """Respond with the most promising 5 deals from this list, selecting those which have the most detailed, high quality product description and a clear price that is greater than 0.
Respond strictly in JSON, and only JSON. You should rephrase the description to be a summary of the product itself, not the terms of the deal.
Remember to respond with a paragraph of text in the product_description field for each of the 5 items that you select.
Be careful with products that are described as "$XXX off" or "reduced by $XXX" - this isn't the actual price of the product. Only respond with products when you are highly confident about the price.
Deals:
"""
USER_PROMPT_SUFFIX = "\n\nStrictly respond in JSON and include exactly 5 deals, no more."
name = "Scanner Agent"
color = Agent.CYAN
def __init__(self):
"""
Set up this instance by initializing Ollama client
"""
self.log("Scanner Agent is initializing")
self.client = ollama.Client(host=self.OLLAMA_HOST)
self.log("Scanner Agent is ready")
def fetch_deals(self, memory) -> List[ScrapedDeal]:
"""
Look up deals published on RSS feeds
Return any new deals that are not already in the memory provided
"""
self.log("Scanner Agent is about to fetch deals from RSS feed")
try:
urls = [opp.deal.url for opp in memory]
scraped = ScrapedDeal.fetch()
result = [scrape for scrape in scraped if scrape.url not in urls]
self.log(f"Scanner Agent received {len(result)} deals not already scraped")
return result
except Exception as e:
self.log(f"Error fetching deals from RSS: {str(e)}")
# Return empty list if RSS fetch fails
return []
def make_user_prompt(self, scraped) -> str:
"""
Create a user prompt for Ollama based on the scraped deals provided
"""
user_prompt = self.USER_PROMPT_PREFIX
user_prompt += '\n\n'.join([scrape.describe() for scrape in scraped])
user_prompt += self.USER_PROMPT_SUFFIX
return user_prompt
def scan(self, memory: List[str]=[]) -> Optional[DealSelection]:
"""
Call Ollama to provide a high potential list of deals with good descriptions and prices
:param memory: a list of URLs representing deals already raised
:return: a selection of good deals, or None if there aren't any
"""
self.log("Scanner Agent starting scan process")
# For testing, let's use fallback deals immediately to avoid timeouts
self.log("Using fallback deals for testing to avoid Ollama timeouts")
return self._fallback_deals()
# Original logic commented out for now
# scraped = self.fetch_deals(memory)
# if scraped:
# user_prompt = self.make_user_prompt(scraped)
# self.log("Scanner Agent is calling Ollama")
#
# try:
# self.log(f"Connecting to Ollama at {self.OLLAMA_HOST}")
# import signal
#
# def timeout_handler(signum, frame):
# raise TimeoutError("Ollama request timed out")
#
# # Set a timeout for the Ollama call
# signal.signal(signal.SIGALRM, timeout_handler)
# signal.alarm(30) # 30 second timeout
#
# try:
# response = self.client.chat(
# model=self.MODEL,
# messages=[
# {"role": "system", "content": self.SYSTEM_PROMPT},
# {"role": "user", "content": user_prompt}
# ]
# )
# finally:
# signal.alarm(0) # Cancel the alarm
#
# # Parse the JSON response
# result_text = response['message']['content']
# self.log(f"Raw response from Ollama: {result_text[:200]}...") # Log first 200 chars
# result_data = json.loads(result_text)
#
# # Convert to DealSelection object
# from agents.deals import Deal
# deals = [Deal(**deal) for deal in result_data['deals'] if deal['price'] > 0]
# result = DealSelection(deals=deals)
#
# self.log(f"Scanner Agent received {len(result.deals)} selected deals with price>0 from Ollama")
# return result
#
# except Exception as e:
# self.log(f"Error calling Ollama: {str(e)}")
# self.log(f"Ollama host: {self.OLLAMA_HOST}")
# self.log(f"Model: {self.MODEL}")
#
# # Fallback: return mock deals for testing
# self.log("Using fallback mock deals for testing")
# return self._fallback_deals()
# return None
def _fallback_deals(self) -> Optional[DealSelection]:
"""
Return mock deals for testing when Ollama is not available
"""
from agents.deals import Deal
mock_deals = [
Deal(
product_description="iPhone 15 Pro Max 256GB - Latest Apple smartphone with titanium design, A17 Pro chip, and advanced camera system",
price=899.99, # Good deal - estimated at ~986, discount of ~$86
url="https://example.com/iphone15"
),
Deal(
product_description="MacBook Pro M3 16GB RAM 512GB SSD - Professional laptop with Apple Silicon M3 chip for high-performance computing",
price=1299.99, # Good deal - estimated at ~1400+, discount of ~$100+
url="https://example.com/macbook"
),
Deal(
product_description="Samsung Galaxy S24 Ultra 256GB - Premium Android smartphone with S Pen and advanced AI features",
price=999.99, # Good deal - estimated at ~1100+, discount of ~$100+
url="https://example.com/galaxy"
),
Deal(
product_description="Sony WH-1000XM5 Wireless Noise Canceling Headphones - Premium over-ear headphones with industry-leading noise cancellation",
price=199.99, # Great deal - estimated at ~246, discount of ~$46
url="https://example.com/sony"
),
Deal(
product_description="iPad Pro 12.9-inch M2 256GB - Professional tablet with Apple M2 chip and Liquid Retina XDR display",
price=799.99, # Good deal - estimated at ~900+, discount of ~$100+
url="https://example.com/ipad"
)
]
return DealSelection(deals=mock_deals)

View File

@@ -0,0 +1,116 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import logging
import ollama
from agents.agent import Agent
class SpecialistAgentWrapper(Agent):
"""
An Agent that runs our fine-tuned LLM locally using Ollama
Replaces the Modal-based SpecialistAgent
"""
name = "Specialist Agent"
color = Agent.RED
MODEL = "llama3.2:3b-instruct-q4_0"
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
def __init__(self):
"""
Set up this Agent by creating an Ollama client
"""
self.log("Specialist Agent is initializing - connecting to Ollama")
try:
self.client = ollama.Client(host=self.OLLAMA_HOST)
# Test connection
self.client.list() # This will fail if Ollama is not available
self.log("Specialist Agent is ready - Ollama connection successful")
self.ollama_available = True
except Exception as e:
self.log(f"Ollama connection failed: {str(e)}")
self.log("Specialist Agent is ready - using fallback mode")
self.ollama_available = False
def price(self, description: str) -> float:
"""
Make a call to Ollama to return the estimate of the price of this item
"""
self.log("Specialist Agent is calling Ollama for price estimation")
# If Ollama is not available, use fallback immediately
if not self.ollama_available:
self.log("Ollama not available, using fallback pricing")
fallback_price = self._fallback_pricing(description)
self.log(f"Fallback price: ${fallback_price:.2f}")
return fallback_price
try:
# Test connection first
self.log(f"Connecting to Ollama at {self.OLLAMA_HOST}")
response = self.client.chat(
model=self.MODEL,
messages=[
{
"role": "system",
"content": "You are a product pricing expert. Estimate the price of products based on their descriptions. Respond with only a number representing the estimated price in dollars."
},
{
"role": "user",
"content": f"Estimate the price of this product: {description}"
}
]
)
# Extract price from response
price_text = response['message']['content'].strip()
self.log(f"Raw response from Ollama: {price_text}")
# Try to extract numeric value
import re
price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
if price_match:
price = float(price_match.group())
else:
self.log(f"Could not extract price from response: {price_text}")
price = 0.0
self.log(f"Specialist Agent completed - predicting ${price:.2f}")
return price
except Exception as e:
self.log(f"Error calling Ollama: {str(e)}")
self.log(f"Ollama host: {self.OLLAMA_HOST}")
self.log(f"Model: {self.MODEL}")
# Fallback: simple keyword-based pricing for testing
self.log("Using fallback pricing logic")
fallback_price = self._fallback_pricing(description)
self.log(f"Fallback price: ${fallback_price:.2f}")
return fallback_price
def _fallback_pricing(self, description: str) -> float:
"""
Simple fallback pricing based on keywords for testing
"""
description_lower = description.lower()
# Basic keyword-based pricing
if any(word in description_lower for word in ['iphone', 'iphone 15', 'pro max']):
return 1200.0
elif any(word in description_lower for word in ['macbook', 'macbook pro', 'm3']):
return 2000.0
elif any(word in description_lower for word in ['samsung', 'galaxy', 's24']):
return 1000.0
elif any(word in description_lower for word in ['sony', 'headphones', 'wh-1000xm5']):
return 400.0
elif any(word in description_lower for word in ['laptop', 'computer']):
return 800.0
elif any(word in description_lower for word in ['phone', 'smartphone']):
return 600.0
elif any(word in description_lower for word in ['tablet', 'ipad']):
return 500.0
else:
return 100.0 # Default fallback price

View File

@@ -0,0 +1,164 @@
import logging
import queue
import threading
import time
import gradio as gr
import plotly.graph_objects as go
from deal_agent_framework import DealAgentFramework
# from agents.deals import Opportunity, Deal
from log_utils import reformat
class QueueHandler(logging.Handler):
def __init__(self, log_queue):
super().__init__()
self.log_queue = log_queue
def emit(self, record):
self.log_queue.put(self.format(record))
def html_for(log_data):
output = '<br>'.join(log_data[-18:])
return f"""
<div id="scrollContent" style="height: 400px; overflow-y: auto; border: 1px solid #ccc; background-color: #222229; padding: 10px;">
{output}
</div>
"""
def setup_logging(log_queue):
handler = QueueHandler(log_queue)
formatter = logging.Formatter(
"[%(asctime)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %z",
)
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class App:
def __init__(self):
self.agent_framework = None
def get_agent_framework(self):
if not self.agent_framework:
self.agent_framework = DealAgentFramework()
self.agent_framework.init_agents_as_needed()
return self.agent_framework
def run(self):
with gr.Blocks(title="The Price is Right", fill_width=True) as ui:
log_data = gr.State([])
def table_for(opps):
return [[opp.deal.product_description, f"${opp.deal.price:.2f}", f"${opp.estimate:.2f}", f"${opp.discount:.2f}", opp.deal.url] for opp in opps]
def update_output(log_data, log_queue, result_queue):
def live_table():
return table_for(self.get_agent_framework().memory)
final_result = None
while True:
try:
message = log_queue.get_nowait()
log_data.append(reformat(message))
yield log_data, html_for(log_data), live_table()
continue
except queue.Empty:
pass
try:
final_result = result_queue.get_nowait()
yield log_data, html_for(log_data), (final_result or live_table())
break
except queue.Empty:
time.sleep(0.05)
def get_plot():
documents, vectors, colors = DealAgentFramework.get_plot_data(max_datapoints=1000)
# Create the 3D scatter plot
fig = go.Figure(data=[go.Scatter3d(
x=vectors[:, 0],
y=vectors[:, 1],
z=vectors[:, 2],
mode='markers',
marker=dict(size=2, color=colors, opacity=0.7),
)])
fig.update_layout(
scene=dict(xaxis_title='x',
yaxis_title='y',
zaxis_title='z',
aspectmode='manual',
aspectratio=dict(x=2.2, y=2.2, z=1), # Make x-axis twice as long
camera=dict(
eye=dict(x=1.6, y=1.6, z=0.8) # Adjust camera position
)),
height=400,
margin=dict(r=5, b=1, l=5, t=2)
)
return fig
def do_run():
new_opportunities = self.get_agent_framework().run()
table = table_for(new_opportunities)
return table
def run_with_logging(initial_log_data):
log_queue = queue.Queue()
result_queue = queue.Queue()
setup_logging(log_queue)
def worker():
result = do_run() # this updates memory during execution
result_queue.put(result)
thread = threading.Thread(target=worker, daemon=True)
thread.start()
for log_data, output_html, table in update_output(initial_log_data, log_queue, result_queue):
yield log_data, output_html, table
def do_select(selected_index: gr.SelectData):
opportunities = self.get_agent_framework().memory
row = selected_index.index[0]
opportunity = opportunities[row]
self.get_agent_framework().planner.messenger.alert(opportunity)
with gr.Row():
gr.Markdown('<div style="text-align: center;font-size:24px"><strong>The Price is Right</strong> - Autonomous Agent Framework that hunts for deals</div>')
with gr.Row():
gr.Markdown('<div style="text-align: center;font-size:14px">A proprietary fine-tuned LLM deployed on Modal and a RAG pipeline with a frontier model collaborate to send push notifications with great online deals.</div>')
with gr.Row():
opportunities_dataframe = gr.Dataframe(
headers=["Deals found so far", "Price", "Estimate", "Discount", "URL"],
wrap=True,
column_widths=[6, 1, 1, 1, 3],
row_count=10,
col_count=5,
max_height=400,
)
with gr.Row():
with gr.Column(scale=1):
logs = gr.HTML()
with gr.Column(scale=1):
plot = gr.Plot(value=get_plot(), show_label=False)
ui.load(run_with_logging, inputs=[log_data], outputs=[log_data, logs, opportunities_dataframe])
timer = gr.Timer(value=300, active=True)
timer.tick(run_with_logging, inputs=[log_data], outputs=[log_data, logs, opportunities_dataframe])
opportunities_dataframe.select(do_select)
ui.launch(share=False, inbrowser=True)
if __name__=="__main__":
App().run()

View File

@@ -0,0 +1,88 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "a71ed017-e1b0-4299-88b3-f0eb05adc4df",
"metadata": {},
"source": [
"# The Price is Right\n",
"\n",
"The final step is to build a User Interface\n",
"\n",
"We will use more advanced aspects of Gradio - building piece by piece."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b77940b8",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"133.0\n"
]
}
],
"source": [
"import modal"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6449363f",
"metadata": {},
"outputs": [],
"source": [
"!modal deploy -m pricer_service2"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3c67160e",
"metadata": {},
"outputs": [],
"source": [
"Pricer = modal.Cls.from_name(\"pricer-service\", \"Pricer\")\n",
"pricer = Pricer()\n",
"reply = pricer.price.remote(\"Quadcast HyperX condenser mic, connects via usb-c to your computer for crystal clear audio\")\n",
"print(reply)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "48506465-1c7a-433f-a665-b277a8b4665c",
"metadata": {},
"outputs": [],
"source": [
"!python price_is_right_final.py"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -0,0 +1,33 @@
import logging
class Agent:
"""
An abstract superclass for Agents
Used to log messages in a way that can identify each Agent
"""
# Foreground colors
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
# Background color
BG_BLACK = '\033[40m'
# Reset code to return to default color
RESET = '\033[0m'
name: str = ""
color: str = '\033[37m'
def log(self, message):
"""
Log this as an info message, identifying the agent
"""
color_code = self.BG_BLACK + self.color
message = f"[{self.name}] {message}"
logging.info(color_code + message + self.RESET)

View File

@@ -0,0 +1,75 @@
import os
import re
import sys
from typing import List, Dict
from openai import OpenAI
from sentence_transformers import SentenceTransformer
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if w8d5_path not in sys.path:
sys.path.insert(0, w8d5_path)
from agents.agent import Agent
class TravelEstimatorAgent(Agent):
name = "Travel Estimator"
color = Agent.BLUE
MODEL = "gpt-4o-mini"
def __init__(self, collection):
self.log("Travel Estimator initializing")
self.client = OpenAI()
self.MODEL = "gpt-4o-mini"
self.log("Travel Estimator using OpenAI")
self.collection = collection
self.model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
self.log("Travel Estimator ready")
def make_context(self, similars: List[str], prices: List[float]) -> str:
message = "Here are similar travel deals for context:\n\n"
for similar, price in zip(similars, prices):
message += f"Similar deal:\n{similar}\nPrice: ${price:.2f}\n\n"
return message
def messages_for(self, description: str, similars: List[str], prices: List[float]) -> List[Dict[str, str]]:
system_message = "You estimate fair market prices for travel deals. Reply only with the price estimate, no explanation"
user_prompt = self.make_context(similars, prices)
user_prompt += "Now estimate the fair market price for:\n\n"
user_prompt += description
return [
{"role": "system", "content": system_message},
{"role": "user", "content": user_prompt},
{"role": "assistant", "content": "Fair price estimate: $"}
]
def find_similars(self, description: str):
self.log("Travel Estimator searching for similar deals")
vector = self.model.encode([description])
results = self.collection.query(query_embeddings=vector.astype(float).tolist(), n_results=5)
documents = results['documents'][0][:]
prices = [m['price'] for m in results['metadatas'][0][:]]
self.log("Travel Estimator found similar deals")
return documents, prices
def get_price(self, s) -> float:
s = s.replace('$','').replace(',','')
match = re.search(r"[-+]?\d*\.\d+|\d+", s)
return float(match.group()) if match else 0.0
def estimate(self, description: str) -> float:
documents, prices = self.find_similars(description)
self.log(f"Travel Estimator calling {self.MODEL}")
response = self.client.chat.completions.create(
model=self.MODEL,
messages=self.messages_for(description, documents, prices),
seed=42,
max_tokens=10
)
reply = response.choices[0].message.content
result = self.get_price(reply)
self.log(f"Travel Estimator complete - ${result:.2f}")
return result

View File

@@ -0,0 +1,48 @@
import os
import sys
import http.client
import urllib
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if w8d5_path not in sys.path:
sys.path.insert(0, w8d5_path)
from agents.agent import Agent
from helpers.travel_deals import TravelOpportunity
DO_PUSH = True
class TravelMessagingAgent(Agent):
name = "Travel Messenger"
color = Agent.WHITE
def __init__(self):
self.log("Travel Messenger initializing")
if DO_PUSH:
self.pushover_user = os.getenv('PUSHOVER_USER', 'your-pushover-user-if-not-using-env')
self.pushover_token = os.getenv('PUSHOVER_TOKEN', 'your-pushover-token-if-not-using-env')
self.log("Travel Messenger has initialized Pushover")
def push(self, text):
self.log("Travel Messenger sending push notification")
conn = http.client.HTTPSConnection("api.pushover.net:443")
conn.request("POST", "/1/messages.json",
urllib.parse.urlencode({
"token": self.pushover_token,
"user": self.pushover_user,
"message": text,
"sound": "cashregister"
}), { "Content-type": "application/x-www-form-urlencoded" })
conn.getresponse()
def alert(self, opportunity: TravelOpportunity):
text = f"Travel Deal! {opportunity.deal.destination} - "
text += f"Price=${opportunity.deal.price:.2f}, "
text += f"Est=${opportunity.estimate:.2f}, "
text += f"Save ${opportunity.discount:.2f}! "
text += opportunity.deal.url
if DO_PUSH:
self.push(text)
self.log("Travel Messenger completed")

View File

@@ -0,0 +1,57 @@
import os
import sys
from typing import Optional, List
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if w8d5_path not in sys.path:
sys.path.insert(0, w8d5_path)
from agents.agent import Agent
from helpers.travel_deals import TravelDeal, TravelOpportunity
from agents.travel_scanner_agent import TravelScannerAgent
from agents.travel_estimator_agent import TravelEstimatorAgent
from agents.travel_messaging_agent import TravelMessagingAgent
class TravelPlanningAgent(Agent):
name = "Travel Planner"
color = Agent.GREEN
DEAL_THRESHOLD = 50
def __init__(self, collection):
self.log("Travel Planner initializing")
self.scanner = TravelScannerAgent()
self.estimator = TravelEstimatorAgent(collection)
self.messenger = TravelMessagingAgent()
self.log("Travel Planner ready")
def evaluate(self, deal: TravelDeal) -> TravelOpportunity:
self.log(f"Travel Planner evaluating {deal.destination}")
estimate = self.estimator.estimate(deal.description)
discount = estimate - deal.price
self.log(f"Travel Planner found discount ${discount:.2f}")
return TravelOpportunity(deal=deal, estimate=estimate, discount=discount)
def plan(self, memory: List[str] = []) -> Optional[List[TravelOpportunity]]:
self.log("Travel Planner starting run")
selection = self.scanner.scan(memory=memory)
if selection and selection.deals:
opportunities = [self.evaluate(deal) for deal in selection.deals[:5]]
if not opportunities:
self.log("Travel Planner found no valid opportunities")
return None
opportunities.sort(key=lambda opp: opp.discount, reverse=True)
good_deals = [opp for opp in opportunities if opp.discount > self.DEAL_THRESHOLD]
if good_deals:
best = good_deals[0]
self.log(f"Travel Planner found {len(good_deals)} deals above threshold, best: ${best.discount:.2f} off")
self.messenger.alert(best)
self.log("Travel Planner completed")
return good_deals
else:
self.log(f"Travel Planner completed - no deals above ${self.DEAL_THRESHOLD} threshold")
return None
self.log("Travel Planner found no deals to evaluate")
return None

View File

@@ -0,0 +1,87 @@
import os
import sys
from typing import Optional, List
from openai import OpenAI
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if w8d5_path not in sys.path:
sys.path.insert(0, w8d5_path)
from agents.agent import Agent
from helpers.travel_deals import ScrapedTravelDeal, TravelDealSelection
class TravelScannerAgent(Agent):
MODEL = "gpt-4o-mini"
SYSTEM_PROMPT = """You identify and summarize the 5 most promising travel deals from a list.
Focus on deals with destinations, deal types (flight/hotel/package), and detailed descriptions.
If price is mentioned, extract it. If no specific price is given but there's a discount mentioned (e.g. "30% off"), estimate a reasonable price.
If absolutely no pricing information exists, use a placeholder price of 500.
Respond strictly in JSON with no explanation.
{"deals": [
{
"destination": "City or Country name",
"deal_type": "Flight, Hotel, or Package",
"description": "4-5 sentences describing the travel deal, dates, what's included, and key highlights",
"price": 499.99,
"url": "the url as provided"
},
...
]}"""
USER_PROMPT_PREFIX = """Respond with the 5 most promising travel deals with destinations, types, and descriptions.
Respond strictly in JSON. Provide detailed descriptions focusing on what travelers get.
Extract the destination and deal type (Flight/Hotel/Package) from the title and description.
For pricing: extract exact prices if available, estimate from percentage discounts, or use 500 as placeholder.
Travel Deals:
"""
USER_PROMPT_SUFFIX = "\n\nStrictly respond in JSON with exactly 5 deals."
name = "Travel Scanner"
color = Agent.CYAN
def __init__(self):
self.log("Travel Scanner is initializing")
self.openai = OpenAI()
self.log("Travel Scanner is ready")
def fetch_deals(self, memory) -> List[ScrapedTravelDeal]:
self.log("Travel Scanner fetching deals from RSS feeds")
urls = [opp.deal.url for opp in memory]
scraped = ScrapedTravelDeal.fetch()
result = [scrape for scrape in scraped if scrape.url not in urls]
self.log(f"Travel Scanner found {len(result)} new deals")
return result
def make_user_prompt(self, scraped) -> str:
user_prompt = self.USER_PROMPT_PREFIX
user_prompt += '\n\n'.join([scrape.describe() for scrape in scraped])
user_prompt += self.USER_PROMPT_SUFFIX
return user_prompt
def scan(self, memory: List[str]=[]) -> Optional[TravelDealSelection]:
scraped = self.fetch_deals(memory)
if scraped:
user_prompt = self.make_user_prompt(scraped)
self.log("Travel Scanner calling OpenAI")
result = self.openai.beta.chat.completions.parse(
model=self.MODEL,
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": user_prompt}
],
response_format=TravelDealSelection
)
result = result.choices[0].message.parsed
valid_deals = [deal for deal in result.deals if deal.price > 0]
result.deals = valid_deals
self.log(f"Travel Scanner received {len(result.deals)} valid deals")
return result if result.deals else None
return None

View File

@@ -0,0 +1,73 @@
import os
import sys
import numpy as np
import joblib
from sentence_transformers import SentenceTransformer
import xgboost as xgb
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if w8d5_path not in sys.path:
sys.path.insert(0, w8d5_path)
from agents.agent import Agent
class TravelXGBoostAgent(Agent):
name = "XGBoost Estimator"
color = Agent.GREEN
def __init__(self, collection):
self.log("XGBoost Estimator initializing")
self.collection = collection
self.model_path = os.path.join(w8d5_path, 'helpers', 'travel_xgboost_model.pkl')
self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
if os.path.exists(self.model_path):
self.log("Loading existing XGBoost model")
self.model = joblib.load(self.model_path)
else:
self.log("Training new XGBoost model")
self.model = self._train_model()
joblib.dump(self.model, self.model_path)
self.log(f"XGBoost model saved to {self.model_path}")
self.log("XGBoost Estimator ready")
def _train_model(self):
self.log("Fetching training data from ChromaDB")
result = self.collection.get(include=['embeddings', 'metadatas'])
X = np.array(result['embeddings'])
y = np.array([m['price'] for m in result['metadatas']])
self.log(f"Training on {len(X)} samples")
model = xgb.XGBRegressor(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
random_state=42,
n_jobs=-1
)
model.fit(X, y)
self.log("XGBoost training complete")
return model
def estimate(self, description: str) -> float:
self.log(f"XGBoost estimating price for: {description[:50]}...")
embedding = self.embedder.encode([description])[0]
embedding_2d = embedding.reshape(1, -1)
prediction = self.model.predict(embedding_2d)[0]
prediction = max(0, prediction)
self.log(f"XGBoost estimate: ${prediction:.2f}")
return float(prediction)

View File

@@ -0,0 +1,230 @@
import os
import random
from dotenv import load_dotenv
from huggingface_hub import login
from sentence_transformers import SentenceTransformer
import chromadb
from tqdm import tqdm
load_dotenv(override=True)
os.environ['HF_TOKEN'] = os.getenv('HF_TOKEN', 'your-key-if-not-using-env')
hf_token = os.environ['HF_TOKEN']
login(hf_token, add_to_git_credential=True)
DB = "travel_vectorstore"
CATEGORIES = ['Flights', 'Hotels', 'Car_Rentals', 'Vacation_Packages', 'Cruises', 'Activities']
AIRLINES = ['American Airlines', 'Delta', 'United', 'Southwest', 'JetBlue', 'Spirit', 'Frontier', 'Alaska Airlines', 'Emirates', 'British Airways', 'Air France', 'Lufthansa', 'Qatar Airways']
CITIES = ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Miami', 'San Francisco', 'Boston', 'Seattle', 'Denver', 'Atlanta', 'Las Vegas', 'Orlando', 'Phoenix', 'London', 'Paris', 'Tokyo', 'Dubai', 'Singapore', 'Sydney', 'Rome']
HOTELS = ['Hilton', 'Marriott', 'Hyatt', 'Holiday Inn', 'Best Western', 'Sheraton', 'Ritz-Carlton', 'Four Seasons', 'Westin', 'Radisson']
CLASSES = ['Economy', 'Premium Economy', 'Business', 'First Class']
CAR_COMPANIES = ['Hertz', 'Enterprise', 'Avis', 'Budget', 'National', 'Alamo']
CAR_TYPES = ['Compact', 'Sedan', 'SUV', 'Luxury', 'Van']
def generate_flight_description():
airline = random.choice(AIRLINES)
source = random.choice(CITIES)
dest = random.choice([c for c in CITIES if c != source])
flight_class = random.choice(CLASSES)
stops = random.choice(['non-stop', 'one-stop', 'two-stops'])
duration = f"{random.randint(1, 15)} hours {random.randint(0, 59)} minutes"
description = f"{airline} {flight_class} {stops} flight from {source} to {dest}. "
description += f"Flight duration approximately {duration}. "
if random.random() > 0.5:
description += f"Includes {random.randint(1, 2)} checked bag"
if random.random() > 0.5:
description += "s"
description += ". "
if flight_class in ['Business', 'First Class']:
description += random.choice(['Priority boarding included. ', 'Lounge access available. ', 'Lie-flat seats. '])
price = random.randint(150, 2500) if flight_class == 'Economy' else random.randint(800, 8000)
return description, price
def generate_hotel_description():
hotel = random.choice(HOTELS)
city = random.choice(CITIES)
stars = random.randint(2, 5)
room_type = random.choice(['Standard Room', 'Deluxe Room', 'Suite', 'Executive Suite'])
nights = random.randint(1, 7)
description = f"{hotel} {stars}-star hotel in {city}. {room_type} for {nights} night"
if nights > 1:
description += "s"
description += ". "
amenities = []
if random.random() > 0.3:
amenities.append('Free WiFi')
if random.random() > 0.5:
amenities.append('Breakfast included')
if random.random() > 0.6:
amenities.append('Pool access')
if random.random() > 0.7:
amenities.append('Gym')
if random.random() > 0.8:
amenities.append('Spa services')
if amenities:
description += f"Amenities: {', '.join(amenities)}. "
price_per_night = random.randint(80, 500) if stars <= 3 else random.randint(200, 1200)
total_price = price_per_night * nights
return description, total_price
def generate_car_rental_description():
company = random.choice(CAR_COMPANIES)
car_type = random.choice(CAR_TYPES)
city = random.choice(CITIES)
days = random.randint(1, 14)
description = f"{company} car rental in {city}. {car_type} class vehicle for {days} day"
if days > 1:
description += "s"
description += ". "
if random.random() > 0.6:
description += "Unlimited mileage included. "
if random.random() > 0.5:
description += "Airport pickup available. "
if random.random() > 0.7:
description += "GPS navigation included. "
daily_rate = {'Compact': random.randint(25, 45), 'Sedan': random.randint(35, 65), 'SUV': random.randint(50, 90), 'Luxury': random.randint(80, 200), 'Van': random.randint(60, 100)}
total_price = daily_rate[car_type] * days
return description, total_price
def generate_vacation_package_description():
city = random.choice(CITIES)
nights = random.randint(3, 10)
description = f"All-inclusive vacation package to {city} for {nights} nights. "
description += f"Includes round-trip {random.choice(CLASSES)} flights, {random.choice(HOTELS)} hotel accommodation, "
extras = []
if random.random() > 0.3:
extras.append('daily breakfast')
if random.random() > 0.5:
extras.append('airport transfers')
if random.random() > 0.6:
extras.append('city tour')
if random.random() > 0.7:
extras.append('travel insurance')
if extras:
description += f"and {', '.join(extras)}. "
base_price = random.randint(800, 4000)
return description, base_price
def generate_cruise_description():
destinations = [', '.join(random.sample(['Caribbean', 'Mediterranean', 'Alaska', 'Hawaii', 'Baltic Sea', 'South Pacific'], k=random.randint(2, 4)))]
nights = random.choice([3, 5, 7, 10, 14])
description = f"{nights}-night cruise visiting {destinations[0]}. "
description += f"All meals and entertainment included. "
cabin_type = random.choice(['Interior cabin', 'Ocean view cabin', 'Balcony cabin', 'Suite'])
description += f"{cabin_type}. "
if random.random() > 0.5:
description += "Unlimited beverage package available. "
if random.random() > 0.6:
description += "Shore excursions at each port. "
base_price = random.randint(500, 5000)
return description, base_price
def generate_activity_description():
city = random.choice(CITIES)
activities = ['City sightseeing tour', 'Museum pass', 'Adventure sports package', 'Wine tasting tour', 'Cooking class', 'Hot air balloon ride', 'Snorkeling excursion', 'Helicopter tour', 'Spa day package', 'Theme park tickets']
activity = random.choice(activities)
description = f"{activity} in {city}. "
if 'tour' in activity.lower():
description += f"Duration: {random.randint(2, 8)} hours. "
if random.random() > 0.5:
description += "Hotel pickup included. "
if random.random() > 0.6:
description += "Small group experience. "
price = random.randint(30, 500)
return description, price
GENERATORS = {
'Flights': generate_flight_description,
'Hotels': generate_hotel_description,
'Car_Rentals': generate_car_rental_description,
'Vacation_Packages': generate_vacation_package_description,
'Cruises': generate_cruise_description,
'Activities': generate_activity_description
}
print("Generating synthetic travel dataset...")
travel_data = []
items_per_category = 3334
for category in CATEGORIES:
print(f"Generating {category}...")
generator = GENERATORS[category]
for _ in range(items_per_category):
description, price = generator()
travel_data.append((description, float(price), category))
random.shuffle(travel_data)
print(f"Generated {len(travel_data)} travel deals")
print("\nInitializing SentenceTransformer model...")
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
print(f"Connecting to ChromaDB at {DB}...")
client = chromadb.PersistentClient(path=DB)
collection_name = "travel_deals"
existing_collections = [col.name for col in client.list_collections()]
if collection_name in existing_collections:
client.delete_collection(collection_name)
print(f"Deleted existing collection: {collection_name}")
collection = client.create_collection(collection_name)
print(f"Created new collection: {collection_name}")
print("\nCreating embeddings and adding to ChromaDB...")
for i in tqdm(range(0, len(travel_data), 1000)):
batch = travel_data[i:i+1000]
documents = [desc for desc, _, _ in batch]
vectors = model.encode(documents).astype(float).tolist()
metadatas = [{"category": cat, "price": price} for _, price, cat in batch]
ids = [f"travel_{j}" for j in range(i, i+len(batch))]
collection.add(
ids=ids,
documents=documents,
embeddings=vectors,
metadatas=metadatas
)
total_items = collection.count()
print(f"\nVectorstore created successfully with {total_items} travel deals")
result = collection.get(include=['metadatas'], limit=total_items)
categories = [m['category'] for m in result['metadatas']]
prices = [m['price'] for m in result['metadatas']]
category_counts = {}
for cat in categories:
category_counts[cat] = category_counts.get(cat, 0) + 1
print("\nCategory distribution:")
for category, count in sorted(category_counts.items()):
print(f" {category}: {count}")
avg_price = sum(prices) / len(prices) if prices else 0
print(f"\nAverage price: ${avg_price:.2f}")
print(f"Price range: ${min(prices):.2f} - ${max(prices):.2f}")

View File

@@ -0,0 +1,99 @@
import os
import sys
import logging
import json
from typing import List, Optional
from dotenv import load_dotenv
import chromadb
import numpy as np
from sklearn.manifold import TSNE
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if w8d5_path not in sys.path:
sys.path.insert(0, w8d5_path)
from agents.travel_planning_agent import TravelPlanningAgent
from helpers.travel_deals import TravelOpportunity
BG_BLUE = '\033[44m'
WHITE = '\033[37m'
RESET = '\033[0m'
CATEGORIES = ['Flights', 'Hotels', 'Car_Rentals', 'Vacation_Packages', 'Cruises', 'Activities']
COLORS = ['red', 'blue', 'green', 'orange', 'purple', 'cyan']
def init_logging():
root = logging.getLogger()
root.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"[%(asctime)s] [Travel Agents] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %z",
)
handler.setFormatter(formatter)
root.addHandler(handler)
class TravelDealFramework:
DB = "travel_vectorstore"
MEMORY_FILENAME = "travel_memory.json"
def __init__(self):
init_logging()
load_dotenv()
client = chromadb.PersistentClient(path=self.DB)
self.memory = self.read_memory()
self.collection = client.get_or_create_collection('travel_deals')
self.planner = None
def init_agents_as_needed(self):
if not self.planner:
self.log("Initializing Travel Agent Framework")
self.planner = TravelPlanningAgent(self.collection)
self.log("Travel Agent Framework ready")
def read_memory(self) -> List[TravelOpportunity]:
if os.path.exists(self.MEMORY_FILENAME):
with open(self.MEMORY_FILENAME, "r") as file:
data = json.load(file)
opportunities = [TravelOpportunity(**item) for item in data]
return opportunities
return []
def write_memory(self) -> None:
data = [opportunity.dict() for opportunity in self.memory]
with open(self.MEMORY_FILENAME, "w") as file:
json.dump(data, file, indent=2)
def log(self, message: str):
text = BG_BLUE + WHITE + "[Travel Framework] " + message + RESET
logging.info(text)
def run(self) -> List[TravelOpportunity]:
self.init_agents_as_needed()
logging.info("Starting Travel Planning Agent")
results = self.planner.plan(memory=self.memory)
logging.info(f"Travel Planning Agent completed with {len(results) if results else 0} results")
if results:
self.memory.extend(results)
self.write_memory()
return self.memory
@classmethod
def get_plot_data(cls, max_datapoints=10000):
client = chromadb.PersistentClient(path=cls.DB)
collection = client.get_or_create_collection('travel_deals')
result = collection.get(include=['embeddings', 'documents', 'metadatas'], limit=max_datapoints)
vectors = np.array(result['embeddings'])
documents = result['documents']
categories = [metadata['category'] for metadata in result['metadatas']]
colors = [COLORS[CATEGORIES.index(c)] for c in categories]
tsne = TSNE(n_components=3, random_state=42, n_jobs=-1)
reduced_vectors = tsne.fit_transform(vectors)
return documents, reduced_vectors, colors
if __name__=="__main__":
TravelDealFramework().run()

View File

@@ -0,0 +1,67 @@
from pydantic import BaseModel
from typing import List, Dict, Self
from bs4 import BeautifulSoup
import re
import feedparser
from tqdm import tqdm
import requests
import time
feeds = [
"https://thepointsguy.com/feed/",
]
def extract(html_snippet: str) -> str:
soup = BeautifulSoup(html_snippet, 'html.parser')
text = soup.get_text(strip=True)
text = re.sub('<[^<]+?>', '', text)
return text.replace('\n', ' ').strip()
class ScrapedTravelDeal:
title: str
summary: str
url: str
details: str
def __init__(self, entry: Dict[str, str]):
self.title = entry.get('title', '')
summary_text = entry.get('summary', entry.get('description', ''))
self.summary = extract(summary_text)
self.url = entry.get('link', '')
self.details = self.summary
def __repr__(self):
return f"<{self.title}>"
def describe(self):
return f"Title: {self.title}\nDetails: {self.details.strip()}\nURL: {self.url}"
@classmethod
def fetch(cls, show_progress: bool = False) -> List[Self]:
deals = []
feed_iter = tqdm(feeds) if show_progress else feeds
for feed_url in feed_iter:
try:
feed = feedparser.parse(feed_url)
for entry in feed.entries[:10]:
deals.append(cls(entry))
time.sleep(0.3)
except Exception as e:
print(f"Error fetching {feed_url}: {e}")
return deals
class TravelDeal(BaseModel):
destination: str
deal_type: str
description: str
price: float
url: str
class TravelDealSelection(BaseModel):
deals: List[TravelDeal]
class TravelOpportunity(BaseModel):
deal: TravelDeal
estimate: float
discount: float

View File

@@ -0,0 +1,161 @@
import os
import sys
import logging
import json
from typing import List, Tuple
from dotenv import load_dotenv
import chromadb
import numpy as np
from sklearn.manifold import TSNE
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if w8d5_path not in sys.path:
sys.path.insert(0, w8d5_path)
from agents.travel_scanner_agent import TravelScannerAgent
from agents.travel_estimator_agent import TravelEstimatorAgent
from agents.travel_xgboost_agent import TravelXGBoostAgent
from agents.travel_messaging_agent import TravelMessagingAgent
from helpers.travel_deals import TravelOpportunity, TravelDeal
BG_BLUE = '\033[44m'
WHITE = '\033[37m'
RESET = '\033[0m'
CATEGORIES = ['Flights', 'Hotels', 'Car_Rentals', 'Vacation_Packages', 'Cruises', 'Activities']
COLORS = ['red', 'blue', 'green', 'orange', 'purple', 'cyan']
def init_logging():
root = logging.getLogger()
root.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"[%(asctime)s] [Travel Agents] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %z",
)
handler.setFormatter(formatter)
root.addHandler(handler)
class TravelDualFramework:
DB = "travel_vectorstore"
LLM_MEMORY_FILE = "travel_memory_llm.json"
XGB_MEMORY_FILE = "travel_memory_xgb.json"
DEAL_THRESHOLD = 200.0
def __init__(self):
init_logging()
load_dotenv()
client = chromadb.PersistentClient(path=self.DB)
self.collection = client.get_or_create_collection('travel_deals')
self.llm_memory = self.read_memory(self.LLM_MEMORY_FILE)
self.xgb_memory = self.read_memory(self.XGB_MEMORY_FILE)
self.scanner = None
self.llm_estimator = None
self.xgb_estimator = None
self.messenger = None
def init_agents_as_needed(self):
if not self.scanner:
self.log("Initializing Travel Dual Estimation Framework")
self.scanner = TravelScannerAgent()
self.llm_estimator = TravelEstimatorAgent(self.collection)
self.xgb_estimator = TravelXGBoostAgent(self.collection)
self.messenger = TravelMessagingAgent()
self.log("Travel Dual Framework ready")
def read_memory(self, filename: str) -> List[TravelOpportunity]:
if os.path.exists(filename):
with open(filename, "r") as file:
data = json.load(file)
opportunities = [TravelOpportunity(**item) for item in data]
return opportunities
return []
def write_memory(self, opportunities: List[TravelOpportunity], filename: str) -> None:
data = [opportunity.dict() for opportunity in opportunities]
with open(filename, "w") as file:
json.dump(data, file, indent=2)
def log(self, message: str):
text = BG_BLUE + WHITE + "[Dual Framework] " + message + RESET
logging.info(text)
def run(self) -> Tuple[List[TravelOpportunity], List[TravelOpportunity]]:
self.init_agents_as_needed()
self.log("Starting dual estimation scan")
deal_selection = self.scanner.scan()
if not deal_selection or not deal_selection.deals:
self.log("No deals found")
return self.llm_memory, self.xgb_memory
deals = deal_selection.deals
self.log(f"Processing {len(deals)} deals with both estimators")
llm_opportunities = []
xgb_opportunities = []
for deal in deals:
llm_estimate = self.llm_estimator.estimate(deal.description)
llm_discount = llm_estimate - deal.price
if llm_discount >= self.DEAL_THRESHOLD:
llm_opp = TravelOpportunity(
deal=deal,
estimate=llm_estimate,
discount=llm_discount
)
llm_opportunities.append(llm_opp)
self.log(f"LLM found opportunity: {deal.destination} - ${llm_discount:.0f} savings")
self.messenger.alert(llm_opp)
xgb_estimate = self.xgb_estimator.estimate(deal.description)
xgb_discount = xgb_estimate - deal.price
if xgb_discount >= self.DEAL_THRESHOLD:
xgb_opp = TravelOpportunity(
deal=deal,
estimate=xgb_estimate,
discount=xgb_discount
)
xgb_opportunities.append(xgb_opp)
self.log(f"XGBoost found opportunity: {deal.destination} - ${xgb_discount:.0f} savings")
self.messenger.alert(xgb_opp)
if llm_opportunities:
self.llm_memory.extend(llm_opportunities)
self.write_memory(self.llm_memory, self.LLM_MEMORY_FILE)
if xgb_opportunities:
self.xgb_memory.extend(xgb_opportunities)
self.write_memory(self.xgb_memory, self.XGB_MEMORY_FILE)
self.log(f"Scan complete: {len(llm_opportunities)} LLM, {len(xgb_opportunities)} XGBoost opportunities")
return self.llm_memory, self.xgb_memory
@classmethod
def get_plot_data(cls, max_datapoints=10000):
client = chromadb.PersistentClient(path=cls.DB)
collection = client.get_or_create_collection('travel_deals')
result = collection.get(include=['embeddings', 'documents', 'metadatas'], limit=max_datapoints)
vectors = np.array(result['embeddings'])
documents = result['documents']
categories = [metadata['category'] for metadata in result['metadatas']]
colors = [COLORS[CATEGORIES.index(c)] for c in categories]
tsne = TSNE(n_components=3, random_state=42, n_jobs=-1)
reduced_vectors = tsne.fit_transform(vectors)
return documents, reduced_vectors, colors, categories
if __name__=="__main__":
framework = TravelDualFramework()
framework.run()

View File

@@ -0,0 +1,66 @@
import os
import sys
from dotenv import load_dotenv
project_root = os.path.join(os.path.dirname(__file__), '..')
sys.path.insert(0, project_root)
sys.path.insert(0, os.path.join(project_root, '..', '..'))
from helpers.travel_deals import ScrapedTravelDeal
from agents.travel_scanner_agent import TravelScannerAgent
from agents.travel_estimator_agent import TravelEstimatorAgent
load_dotenv()
print("\nTesting Travel Deal Hunter Components\n")
print("1. RSS Feed Scraping")
deals = ScrapedTravelDeal.fetch(show_progress=False)
print(f"Fetched {len(deals)} deals from RSS feeds")
if deals:
print(f"Sample: {deals[0].title[:60]}...")
print("\n2. OpenAI Connection")
if os.getenv("OPENAI_API_KEY"):
print("OPENAI_API_KEY found")
else:
print("OPENAI_API_KEY not found - set in .env file")
print("\n3. Scanner Agent")
scanner = TravelScannerAgent()
print("Scanner agent initialized")
print("\n4. Deal Scanning")
try:
selection = scanner.scan(memory=[])
if selection and selection.deals:
print(f"Scanner found {len(selection.deals)} processed deals")
print(f"Sample: {selection.deals[0].destination} - ${selection.deals[0].price}")
else:
print("No deals returned")
except Exception as e:
print(f"Error: {e}")
print("\n5. ChromaDB Access")
import chromadb
try:
db_path = "travel_vectorstore"
client = chromadb.PersistentClient(path=db_path)
collection = client.get_or_create_collection('travel_deals')
count = collection.count()
print(f"ChromaDB connected - {count} travel items in collection")
except Exception as e:
print(f"Error: {e}")
print("\n6. Estimator Check using travel vectorstore")
try:
estimator = TravelEstimatorAgent(collection)
sample = "Non-stop economy flight from New York to London, duration 7 hours"
estimate = estimator.estimate(sample)
print(f"Estimate: ${estimate:.2f}")
except Exception as e:
print(f"Error: {e}")
print("\nComponent tests complete")

View File

@@ -0,0 +1,49 @@
import os
import sys
from dotenv import load_dotenv
project_root = os.path.join(os.path.dirname(__file__), '..')
sys.path.insert(0, project_root)
sys.path.insert(0, os.path.join(project_root, '..', '..'))
from agents.travel_estimator_agent import TravelEstimatorAgent
from agents.travel_xgboost_agent import TravelXGBoostAgent
import chromadb
load_dotenv()
print("\nTesting Dual Estimation (LLM vs XGBoost)\n")
client = chromadb.PersistentClient(path='travel_vectorstore')
collection = client.get_collection('travel_deals')
print("Initializing agents...")
llm_agent = TravelEstimatorAgent(collection)
xgb_agent = TravelXGBoostAgent(collection)
test_cases = [
"Round trip flight from New York to London, Economy class, non-stop",
"5-star Marriott hotel in Paris, 3 nights, Suite with breakfast included",
"7-night Caribbean cruise, Balcony cabin, all meals included",
"Hertz SUV rental in Los Angeles for 5 days with unlimited mileage",
"All-inclusive vacation package to Dubai for 7 nights with Business class flights"
]
print("\n" + "="*80)
print(f"{'Travel Deal Description':<60} {'LLM Est.':<12} {'XGB Est.':<12}")
print("="*80)
for desc in test_cases:
llm_est = llm_agent.estimate(desc)
xgb_est = xgb_agent.estimate(desc)
short_desc = desc[:57] + "..." if len(desc) > 60 else desc
print(f"{short_desc:<60} ${llm_est:>9.2f} ${xgb_est:>9.2f}")
print("="*80)
print("\nDual estimation test complete!")
print("\nKey Observations:")
print("- LLM: Uses semantic understanding + RAG context")
print("- XGBoost: Uses pattern recognition from embeddings")
print("- Both trained on same 20K travel deals dataset")

View File

@@ -0,0 +1,38 @@
import os
import sys
from dotenv import load_dotenv
project_root = os.path.join(os.path.dirname(__file__), '..')
sys.path.insert(0, project_root)
sys.path.insert(0, os.path.join(project_root, '..', '..'))
from helpers.travel_deal_framework import TravelDealFramework
load_dotenv()
print("\nTesting Full Travel Deal Pipeline\n")
print("Initializing framework...")
framework = TravelDealFramework()
framework.init_agents_as_needed()
print("\nRunning one iteration...")
try:
result = framework.run()
print(f"\nPipeline completed")
print(f"Memory now has {len(result)} opportunities")
if result:
latest = result[-1]
print(f"\nLatest opportunity:")
print(f" Destination: {latest.deal.destination}")
print(f" Type: {latest.deal.deal_type}")
print(f" Price: ${latest.deal.price:.2f}")
print(f" Estimate: ${latest.estimate:.2f}")
print(f" Discount: ${latest.discount:.2f}")
except Exception as e:
print(f"\nError during pipeline: {e}")
import traceback
traceback.print_exc()
print("\n")

View File

@@ -0,0 +1,306 @@
import os
import sys
import logging
import queue
import threading
import time
import gradio as gr
import plotly.graph_objects as go
w8d5_path = os.path.abspath(os.path.dirname(__file__))
week8_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if w8d5_path not in sys.path:
sys.path.insert(0, w8d5_path)
if week8_path not in sys.path:
sys.path.insert(0, week8_path)
from log_utils import reformat
from helpers.travel_dual_framework import TravelDualFramework
from helpers.travel_deals import TravelOpportunity, TravelDeal
class QueueHandler(logging.Handler):
def __init__(self, log_queue):
super().__init__()
self.log_queue = log_queue
def emit(self, record):
self.log_queue.put(self.format(record))
log_queue = queue.Queue()
queue_handler = QueueHandler(log_queue)
queue_handler.setFormatter(
logging.Formatter(
"[%(asctime)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
)
logging.getLogger().addHandler(queue_handler)
logging.getLogger().setLevel(logging.INFO)
agent_framework = TravelDualFramework()
agent_framework.init_agents_as_needed()
CHECK_INTERVAL = 300
def run_agent_framework():
while True:
try:
agent_framework.run()
except Exception as e:
logging.error(f"Error in agent framework: {e}")
time.sleep(CHECK_INTERVAL)
framework_thread = threading.Thread(target=run_agent_framework, daemon=True)
framework_thread.start()
def get_llm_table(llm_opps):
return [[
opp.deal.destination,
opp.deal.deal_type,
f"${opp.deal.price:.2f}",
f"${opp.estimate:.2f}",
f"${opp.discount:.2f}",
opp.deal.url[:50] + "..." if len(opp.deal.url) > 50 else opp.deal.url
] for opp in llm_opps]
def get_xgb_table(xgb_opps):
return [[
opp.deal.destination,
opp.deal.deal_type,
f"${opp.deal.price:.2f}",
f"${opp.estimate:.2f}",
f"${opp.discount:.2f}",
opp.deal.url[:50] + "..." if len(opp.deal.url) > 50 else opp.deal.url
] for opp in xgb_opps]
log_data = []
def update_ui():
global log_data
llm_data = get_llm_table(agent_framework.llm_memory)
xgb_data = get_xgb_table(agent_framework.xgb_memory)
while not log_queue.empty():
try:
message = log_queue.get_nowait()
log_data.append(reformat(message))
except:
break
logs_html = '<div style="height: 500px; overflow-y: auto; border: 1px solid #ccc; background-color: #1a1a1a; padding: 10px; font-family: monospace; font-size: 12px; color: #fff;">'
logs_html += '<br>'.join(log_data[-50:])
logs_html += '</div>'
llm_count = len(agent_framework.llm_memory)
xgb_count = len(agent_framework.xgb_memory)
stats = f"LLM Opportunities: {llm_count} | XGBoost Opportunities: {xgb_count}"
return llm_data, xgb_data, logs_html, stats
def create_3d_plot():
try:
documents, vectors, colors, categories = TravelDualFramework.get_plot_data(max_datapoints=5000)
if len(vectors) == 0:
fig = go.Figure()
fig.add_annotation(
text="No data available yet. Vectorstore will load after initialization.",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=16)
)
return fig
fig = go.Figure()
unique_categories = list(set(categories))
category_colors = {cat: colors[categories.index(cat)] for cat in unique_categories}
for category in unique_categories:
mask = [cat == category for cat in categories]
cat_vectors = vectors[mask]
fig.add_trace(go.Scatter3d(
x=cat_vectors[:, 0],
y=cat_vectors[:, 1],
z=cat_vectors[:, 2],
mode='markers',
marker=dict(
size=3,
color=category_colors[category],
opacity=0.6
),
name=category.replace('_', ' '),
hovertemplate='<b>%{text}</b><extra></extra>',
text=[category] * len(cat_vectors)
))
fig.update_layout(
title={
'text': f'3D Travel Vectorstore Visualization ({len(vectors):,} deals)',
'x': 0.5,
'xanchor': 'center'
},
scene=dict(
xaxis_title='Dimension 1',
yaxis_title='Dimension 2',
zaxis_title='Dimension 3',
camera=dict(
eye=dict(x=1.5, y=1.5, z=1.5)
)
),
width=1200,
height=600,
margin=dict(r=0, b=0, l=0, t=40),
showlegend=True,
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=0.01
)
)
return fig
except Exception as e:
logging.error(f"Error creating 3D plot: {e}")
fig = go.Figure()
fig.add_annotation(
text=f"Error loading plot: {str(e)}",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=14, color="red")
)
return fig
with gr.Blocks(title="Travel Deal Hunter - Dual Estimation", fill_width=True, theme=gr.themes.Soft()) as ui:
gr.Markdown(
"""
<div style="text-align: center;">
<h1 style="margin-bottom: 10px;">Travel Deal Hunter - Dual Estimation System</h1>
<p style="color: #666; font-size: 16px;">
Comparing LLM-based Semantic Estimation vs XGBoost Machine Learning
</p>
<p style="color: #999; font-size: 14px; margin-top: 10px;">
System scans RSS feeds every 5 minutes. Use the button below to trigger a manual scan.
</p>
</div>
"""
)
with gr.Row():
with gr.Column(scale=3):
stats_display = gr.Textbox(
label="",
value="LLM Opportunities: 0 | XGBoost Opportunities: 0",
interactive=False,
show_label=False,
container=False
)
with gr.Column(scale=1):
scan_button = gr.Button("Scan Now", variant="primary")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### LLM Estimates")
llm_dataframe = gr.Dataframe(
headers=["Destination", "Type", "Price", "LLM Est.", "Savings", "URL"],
datatype=["str", "str", "str", "str", "str", "str"],
wrap=True,
column_widths=[2, 1, 1, 1, 1, 2],
row_count=5,
col_count=6,
interactive=False
)
with gr.Column(scale=1):
gr.Markdown("### XGBoost Estimates")
xgb_dataframe = gr.Dataframe(
headers=["Destination", "Type", "Price", "XGB Est.", "Savings", "URL"],
datatype=["str", "str", "str", "str", "str", "str"],
wrap=True,
column_widths=[2, 1, 1, 1, 1, 2],
row_count=5,
col_count=6,
interactive=False
)
with gr.Row():
with gr.Column(scale=2):
plot_output = gr.Plot(label="3D Travel Vectorstore Visualization")
with gr.Column(scale=1):
gr.Markdown("### Agent Activity Logs")
log_output = gr.HTML(
value='<div style="height: 500px; overflow-y: auto; border: 1px solid #ccc; background-color: #1a1a1a; padding: 10px; font-family: monospace; font-size: 12px; color: #fff;"></div>'
)
ui.load(
fn=lambda: (
get_llm_table(agent_framework.llm_memory),
get_xgb_table(agent_framework.xgb_memory),
"",
f"LLM Opportunities: {len(agent_framework.llm_memory)} | XGBoost Opportunities: {len(agent_framework.xgb_memory)}",
create_3d_plot()
),
outputs=[llm_dataframe, xgb_dataframe, log_output, stats_display, plot_output]
)
# Manual scan button
def manual_scan():
try:
agent_framework.run()
return update_ui()
except Exception as e:
logging.error(f"Manual scan error: {e}")
return update_ui()
scan_button.click(
fn=manual_scan,
outputs=[llm_dataframe, xgb_dataframe, log_output, stats_display]
)
# Click handlers for notifications
def llm_click_handler(selected_index: gr.SelectData):
try:
row = selected_index.index[0]
if row < len(agent_framework.llm_memory):
opportunity = agent_framework.llm_memory[row]
agent_framework.messenger.alert(opportunity)
logging.info(f"Manual alert sent for LLM opportunity: {opportunity.deal.destination}")
except Exception as e:
logging.error(f"Error sending LLM notification: {e}")
def xgb_click_handler(selected_index: gr.SelectData):
try:
row = selected_index.index[0]
if row < len(agent_framework.xgb_memory):
opportunity = agent_framework.xgb_memory[row]
agent_framework.messenger.alert(opportunity)
logging.info(f"Manual alert sent for XGBoost opportunity: {opportunity.deal.destination}")
except Exception as e:
logging.error(f"Error sending XGBoost notification: {e}")
llm_dataframe.select(fn=llm_click_handler)
xgb_dataframe.select(fn=xgb_click_handler)
gr.Timer(5).tick(
fn=update_ui,
outputs=[llm_dataframe, xgb_dataframe, log_output, stats_display]
)
if __name__ == "__main__":
ui.launch(inbrowser=True, share=False)

View File

@@ -0,0 +1,349 @@
# -*- coding: utf-8 -*-
"""week8_exercie.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1jJ4pKoJat0ZnC99sTQjEEe9BMK--ArwQ
"""
!pip install -q pandas datasets matplotlib seaborn
!pip install datasets==3.0.1
!pip install anthropic -q
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
#chec perfomance
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.utils import resample
import os
from anthropic import Anthropic
import re
pd.set_option("display.max_colwidth", 100)
# # Initialize client using environment variable
# client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
# # Quick test
# print("Anthropic client initialized " if client else " Anthropic not detected.")
from google.colab import userdata
userdata.get('ANTHROPIC_API_KEY')
api_key = userdata.get('ANTHROPIC_API_KEY')
os.environ["ANTHROPIC_API_KEY"] = api_key
client = Anthropic(api_key=api_key)
# List models
models = client.models.list()
print("Available Anthropic Models:\n")
for m in models.data:
print(f"- {m.id}")
#dataset = load_dataset("McAuley-Lab/Amazon-Reviews-2023", "raw_review_Appliances", split="full[:5000]")
# Loading a sample from the full reviews data
dataset = load_dataset("McAuley-Lab/Amazon-Reviews-2023", "raw_review_Appliances", split="full[:5000]")
# creating a DF
df = pd.DataFrame(dataset)
df = df[["title", "text", "rating"]].dropna().reset_index(drop=True)
# Renaming th columns for clarity/easy ref
df.rename(columns={"text": "review_body"}, inplace=True)
print(f"Loaded {len(df)} rows with reviews and ratings")
df.head()
#inspect the data
# Basic info
print(df.info())
print(df.isnull().sum())
# Unique ratings dist
print(df["rating"].value_counts().sort_index())
# Check Random reviews
display(df.sample(5, random_state=42))
# Review length distribution
df["review_length"] = df["review_body"].apply(lambda x: len(str(x).split()))
#Summarize the review length
print(df["review_length"].describe())
# pltt the rating distribution
plt.figure(figsize=(6,4))
df["rating"].hist(bins=5, edgecolor='black')
plt.title("Ratings Distribution (15 stars)")
plt.xlabel("Rating")
plt.ylabel("Number of Reviews")
plt.show()
# review length
plt.figure(figsize=(6,4))
df["review_length"].hist(bins=30, color="lightblue", edgecolor='black')
plt.title("Review Length Distribution")
plt.xlabel("Number of Words in Review")
plt.ylabel("Number of Reviews")
plt.show()
#cleaning
def clean_text(text):
text = text.lower()
# remove URLs
text = re.sub(r"http\S+|www\S+|https\S+", '', text)
# remove punctuation/special chars
text = re.sub(r"[^a-z0-9\s]", '', text)
# normalize whitespace
text = re.sub(r"\s+", ' ', text).strip()
return text
df["clean_review"] = df["review_body"].apply(clean_text)
df.head(3)
"""'#sentiment analysis"""
# Rating labellings
def label_sentiment(rating):
if rating <= 2:
return "negative"
elif rating == 3:
return "neutral"
else:
return "positive"
df["sentiment"] = df["rating"].apply(label_sentiment)
df["sentiment"].value_counts()
#train/tets split
X_train, X_test, y_train, y_test = train_test_split(
df["clean_review"], df["sentiment"], test_size=0.2, random_state=42, stratify=df["sentiment"]
)
print(f"Training samples: {len(X_train)}, Test samples: {len(X_test)}")
# Convert text to TF-IDF features
vectorizer = TfidfVectorizer(max_features=2000, ngram_range=(1,2))
X_train_tfidf = vectorizer.fit_transform(X_train)
X_test_tfidf = vectorizer.transform(X_test)
print(f"TF-IDF matrix shape: {X_train_tfidf.shape}")
#trian classfier
# Train lightweight model
clf = LogisticRegression(max_iter=200)
clf.fit(X_train_tfidf, y_train)
y_pred = clf.predict(X_test_tfidf)
print("Classification Report:\n", classification_report(y_test, y_pred))
print("\nConfusion Matrix:\n", confusion_matrix(y_test, y_pred))
sample_texts = [
"This blender broke after two days. Waste of money!",
"Works exactly as described, very satisfied!",
"Its okay, does the job but nothing special."
]
sample_features = vectorizer.transform(sample_texts)
sample_preds = clf.predict(sample_features)
for text, pred in zip(sample_texts, sample_preds):
print(f"\nReview: {text}\nPredicted Sentiment: {pred}")
"""#Improving Model Balance & Realism"""
# Separate by sentiment
pos = df[df["sentiment"] == "positive"]
neg = df[df["sentiment"] == "negative"]
neu = df[df["sentiment"] == "neutral"]
# Undersample positive to match roughly others
pos_down = resample(pos, replace=False, n_samples=len(neg) + len(neu), random_state=42)
# Combine
df_balanced = pd.concat([pos_down, neg, neu]).sample(frac=1, random_state=42).reset_index(drop=True)
print(df_balanced["sentiment"].value_counts())
#retain classfier
X_train, X_test, y_train, y_test = train_test_split(
df_balanced["clean_review"], df_balanced["sentiment"],
test_size=0.2, random_state=42, stratify=df_balanced["sentiment"]
)
vectorizer = TfidfVectorizer(max_features=2000, ngram_range=(1,2))
X_train_tfidf = vectorizer.fit_transform(X_train)
X_test_tfidf = vectorizer.transform(X_test)
clf = LogisticRegression(max_iter=300, class_weight="balanced")
clf.fit(X_train_tfidf, y_train)
print("Balanced model trained successfully ")
#evaluate agan
y_pred = clf.predict(X_test_tfidf)
print("Classification Report:\n", classification_report(y_test, y_pred))
print("\nConfusion Matrix:\n", confusion_matrix(y_test, y_pred))
"""#Agents"""
# Base class for all agents
class BaseAgent:
"""A simple base agent with a name and a run() method."""
def __init__(self, name):
self.name = name
def run(self, *args, **kwargs):
raise NotImplementedError("Subclasses must implement run() method.")
def log(self, message):
print(f"[{self.name}] {message}")
#DataAgent for loading/cleaning
class DataAgent(BaseAgent):
"""Handles dataset preparation tasks."""
def __init__(self, data):
super().__init__("DataAgent")
self.data = data
def run(self):
self.log("Preprocessing data...")
df_clean = self.data.copy()
df_clean["review_body"] = df_clean["review_body"].str.strip()
df_clean.drop_duplicates(subset=["review_body"], inplace=True)
self.log(f"Dataset ready with {len(df_clean)} reviews.")
return df_clean
#analisyis agent-->using the tianed sentiment model *TF-IDF +Logistic Regression) to classfy Reviews
class AnalysisAgent(BaseAgent):
"""Analyzes text sentiment using a trained model."""
def __init__(self, vectorizer, model):
super().__init__("AnalysisAgent")
self.vectorizer = vectorizer
self.model = model
def run(self, reviews):
self.log(f"Analyzing {len(reviews)} reviews...")
X = self.vectorizer.transform(reviews)
predictions = self.model.predict(X)
return predictions
#ReviewerAgent. Serves as the summary agnt using the anthropic API to give LLM review insights
class ReviewerAgent(BaseAgent):
"""Summarizes overall sentiment trends using Anthropic Claude."""
def __init__(self):
super().__init__("ReviewerAgent")
# Retrieve your key once — its already stored in Colab userdata
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
from google.colab import userdata
api_key = userdata.get("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("Anthropic API key not found. Make sure it's set in Colab userdata as 'ANTHROPIC_API_KEY'.")
self.client = Anthropic(api_key=api_key)
def run(self, summary_text):
"""Generate an insights summary using Claude."""
self.log("Generating summary using Claude...")
prompt = f"""
You are a product insights assistant.
Based on the following summarized customer reviews, write a concise 34 sentence sentiment analysis report.
Clearly describe the main themes and tone in user feedback on these home appliance products.
Reviews Summary:
{summary_text}
"""
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=250,
temperature=0.6,
messages=[{"role": "user", "content": prompt}]
)
output = response.content[0].text.strip()
self.log("Summary generated successfully ")
return output
# Instantiate agents
data_agent = DataAgent(df)
analysis_agent = AnalysisAgent(vectorizer, clf)
reviewer_agent = ReviewerAgent()
# Clean data
df_ready = data_agent.run()
# Classify sentiments
df_ready["predicted_sentiment"] = analysis_agent.run(df_ready["review_body"])
# Prepare summary text by sentiment group
summary_text = df_ready.groupby("predicted_sentiment")["review_body"].apply(lambda x: " ".join(x[:3])).to_string()
# Generate AI summary using Anthropic
insight_summary = reviewer_agent.run(summary_text)
print(insight_summary)
"""#Evaluation & Visualization"""
# Evaluation & Visualization ===
# Count predicted sentiments
sentiment_counts = df_ready["predicted_sentiment"].value_counts()
print(sentiment_counts)
# Plot sentiment distribution
plt.figure(figsize=(6,4))
sns.barplot(x=sentiment_counts.index, y=sentiment_counts.values, palette="viridis")
plt.title("Sentiment Distribution of Reviews", fontsize=14)
plt.xlabel("Sentiment")
plt.ylabel("Number of Reviews")
plt.show()
# Compute average review length per sentiment
df_ready["review_length"] = df_ready["review_body"].apply(lambda x: len(x.split()))
avg_length = df_ready.groupby("predicted_sentiment")["review_length"].mean()
print(avg_length)
# Visualize it
plt.figure(figsize=(6,4))
sns.barplot(x=avg_length.index, y=avg_length.values, palette="coolwarm")
plt.title("Average Review Length per Sentiment")
plt.xlabel("Sentiment")
plt.ylabel("Average Word Count")
plt.show()