Umar - Bootcamp
This commit is contained in:
33
week8/community_contributions/w8d5/agents/agent.py
Normal file
33
week8/community_contributions/w8d5/agents/agent.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import logging
|
||||
|
||||
class Agent:
|
||||
"""
|
||||
An abstract superclass for Agents
|
||||
Used to log messages in a way that can identify each Agent
|
||||
"""
|
||||
|
||||
# Foreground colors
|
||||
RED = '\033[31m'
|
||||
GREEN = '\033[32m'
|
||||
YELLOW = '\033[33m'
|
||||
BLUE = '\033[34m'
|
||||
MAGENTA = '\033[35m'
|
||||
CYAN = '\033[36m'
|
||||
WHITE = '\033[37m'
|
||||
|
||||
# Background color
|
||||
BG_BLACK = '\033[40m'
|
||||
|
||||
# Reset code to return to default color
|
||||
RESET = '\033[0m'
|
||||
|
||||
name: str = ""
|
||||
color: str = '\033[37m'
|
||||
|
||||
def log(self, message):
|
||||
"""
|
||||
Log this as an info message, identifying the agent
|
||||
"""
|
||||
color_code = self.BG_BLACK + self.color
|
||||
message = f"[{self.name}] {message}"
|
||||
logging.info(color_code + message + self.RESET)
|
||||
@@ -0,0 +1,75 @@
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from typing import List, Dict
|
||||
from openai import OpenAI
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||
if w8d5_path not in sys.path:
|
||||
sys.path.insert(0, w8d5_path)
|
||||
|
||||
from agents.agent import Agent
|
||||
|
||||
|
||||
class TravelEstimatorAgent(Agent):
|
||||
|
||||
name = "Travel Estimator"
|
||||
color = Agent.BLUE
|
||||
|
||||
MODEL = "gpt-4o-mini"
|
||||
|
||||
def __init__(self, collection):
|
||||
self.log("Travel Estimator initializing")
|
||||
self.client = OpenAI()
|
||||
self.MODEL = "gpt-4o-mini"
|
||||
self.log("Travel Estimator using OpenAI")
|
||||
self.collection = collection
|
||||
self.model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
|
||||
self.log("Travel Estimator ready")
|
||||
|
||||
def make_context(self, similars: List[str], prices: List[float]) -> str:
|
||||
message = "Here are similar travel deals for context:\n\n"
|
||||
for similar, price in zip(similars, prices):
|
||||
message += f"Similar deal:\n{similar}\nPrice: ${price:.2f}\n\n"
|
||||
return message
|
||||
|
||||
def messages_for(self, description: str, similars: List[str], prices: List[float]) -> List[Dict[str, str]]:
|
||||
system_message = "You estimate fair market prices for travel deals. Reply only with the price estimate, no explanation"
|
||||
user_prompt = self.make_context(similars, prices)
|
||||
user_prompt += "Now estimate the fair market price for:\n\n"
|
||||
user_prompt += description
|
||||
return [
|
||||
{"role": "system", "content": system_message},
|
||||
{"role": "user", "content": user_prompt},
|
||||
{"role": "assistant", "content": "Fair price estimate: $"}
|
||||
]
|
||||
|
||||
def find_similars(self, description: str):
|
||||
self.log("Travel Estimator searching for similar deals")
|
||||
vector = self.model.encode([description])
|
||||
results = self.collection.query(query_embeddings=vector.astype(float).tolist(), n_results=5)
|
||||
documents = results['documents'][0][:]
|
||||
prices = [m['price'] for m in results['metadatas'][0][:]]
|
||||
self.log("Travel Estimator found similar deals")
|
||||
return documents, prices
|
||||
|
||||
def get_price(self, s) -> float:
|
||||
s = s.replace('$','').replace(',','')
|
||||
match = re.search(r"[-+]?\d*\.\d+|\d+", s)
|
||||
return float(match.group()) if match else 0.0
|
||||
|
||||
def estimate(self, description: str) -> float:
|
||||
documents, prices = self.find_similars(description)
|
||||
self.log(f"Travel Estimator calling {self.MODEL}")
|
||||
response = self.client.chat.completions.create(
|
||||
model=self.MODEL,
|
||||
messages=self.messages_for(description, documents, prices),
|
||||
seed=42,
|
||||
max_tokens=10
|
||||
)
|
||||
reply = response.choices[0].message.content
|
||||
result = self.get_price(reply)
|
||||
self.log(f"Travel Estimator complete - ${result:.2f}")
|
||||
return result
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
import os
|
||||
import sys
|
||||
import http.client
|
||||
import urllib
|
||||
|
||||
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||
if w8d5_path not in sys.path:
|
||||
sys.path.insert(0, w8d5_path)
|
||||
|
||||
from agents.agent import Agent
|
||||
from helpers.travel_deals import TravelOpportunity
|
||||
|
||||
DO_PUSH = True
|
||||
|
||||
class TravelMessagingAgent(Agent):
|
||||
|
||||
name = "Travel Messenger"
|
||||
color = Agent.WHITE
|
||||
|
||||
def __init__(self):
|
||||
self.log("Travel Messenger initializing")
|
||||
if DO_PUSH:
|
||||
self.pushover_user = os.getenv('PUSHOVER_USER', 'your-pushover-user-if-not-using-env')
|
||||
self.pushover_token = os.getenv('PUSHOVER_TOKEN', 'your-pushover-token-if-not-using-env')
|
||||
self.log("Travel Messenger has initialized Pushover")
|
||||
|
||||
def push(self, text):
|
||||
self.log("Travel Messenger sending push notification")
|
||||
conn = http.client.HTTPSConnection("api.pushover.net:443")
|
||||
conn.request("POST", "/1/messages.json",
|
||||
urllib.parse.urlencode({
|
||||
"token": self.pushover_token,
|
||||
"user": self.pushover_user,
|
||||
"message": text,
|
||||
"sound": "cashregister"
|
||||
}), { "Content-type": "application/x-www-form-urlencoded" })
|
||||
conn.getresponse()
|
||||
|
||||
def alert(self, opportunity: TravelOpportunity):
|
||||
text = f"Travel Deal! {opportunity.deal.destination} - "
|
||||
text += f"Price=${opportunity.deal.price:.2f}, "
|
||||
text += f"Est=${opportunity.estimate:.2f}, "
|
||||
text += f"Save ${opportunity.discount:.2f}! "
|
||||
text += opportunity.deal.url
|
||||
if DO_PUSH:
|
||||
self.push(text)
|
||||
self.log("Travel Messenger completed")
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
import os
|
||||
import sys
|
||||
from typing import Optional, List
|
||||
|
||||
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||
if w8d5_path not in sys.path:
|
||||
sys.path.insert(0, w8d5_path)
|
||||
|
||||
from agents.agent import Agent
|
||||
from helpers.travel_deals import TravelDeal, TravelOpportunity
|
||||
from agents.travel_scanner_agent import TravelScannerAgent
|
||||
from agents.travel_estimator_agent import TravelEstimatorAgent
|
||||
from agents.travel_messaging_agent import TravelMessagingAgent
|
||||
|
||||
|
||||
class TravelPlanningAgent(Agent):
|
||||
|
||||
name = "Travel Planner"
|
||||
color = Agent.GREEN
|
||||
DEAL_THRESHOLD = 50
|
||||
|
||||
def __init__(self, collection):
|
||||
self.log("Travel Planner initializing")
|
||||
self.scanner = TravelScannerAgent()
|
||||
self.estimator = TravelEstimatorAgent(collection)
|
||||
self.messenger = TravelMessagingAgent()
|
||||
self.log("Travel Planner ready")
|
||||
|
||||
def evaluate(self, deal: TravelDeal) -> TravelOpportunity:
|
||||
self.log(f"Travel Planner evaluating {deal.destination}")
|
||||
estimate = self.estimator.estimate(deal.description)
|
||||
discount = estimate - deal.price
|
||||
self.log(f"Travel Planner found discount ${discount:.2f}")
|
||||
return TravelOpportunity(deal=deal, estimate=estimate, discount=discount)
|
||||
|
||||
def plan(self, memory: List[str] = []) -> Optional[List[TravelOpportunity]]:
|
||||
self.log("Travel Planner starting run")
|
||||
selection = self.scanner.scan(memory=memory)
|
||||
if selection and selection.deals:
|
||||
opportunities = [self.evaluate(deal) for deal in selection.deals[:5]]
|
||||
if not opportunities:
|
||||
self.log("Travel Planner found no valid opportunities")
|
||||
return None
|
||||
opportunities.sort(key=lambda opp: opp.discount, reverse=True)
|
||||
good_deals = [opp for opp in opportunities if opp.discount > self.DEAL_THRESHOLD]
|
||||
if good_deals:
|
||||
best = good_deals[0]
|
||||
self.log(f"Travel Planner found {len(good_deals)} deals above threshold, best: ${best.discount:.2f} off")
|
||||
self.messenger.alert(best)
|
||||
self.log("Travel Planner completed")
|
||||
return good_deals
|
||||
else:
|
||||
self.log(f"Travel Planner completed - no deals above ${self.DEAL_THRESHOLD} threshold")
|
||||
return None
|
||||
self.log("Travel Planner found no deals to evaluate")
|
||||
return None
|
||||
|
||||
@@ -0,0 +1,87 @@
|
||||
import os
|
||||
import sys
|
||||
from typing import Optional, List
|
||||
from openai import OpenAI
|
||||
|
||||
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||
if w8d5_path not in sys.path:
|
||||
sys.path.insert(0, w8d5_path)
|
||||
|
||||
from agents.agent import Agent
|
||||
from helpers.travel_deals import ScrapedTravelDeal, TravelDealSelection
|
||||
|
||||
|
||||
class TravelScannerAgent(Agent):
|
||||
|
||||
MODEL = "gpt-4o-mini"
|
||||
|
||||
SYSTEM_PROMPT = """You identify and summarize the 5 most promising travel deals from a list.
|
||||
Focus on deals with destinations, deal types (flight/hotel/package), and detailed descriptions.
|
||||
If price is mentioned, extract it. If no specific price is given but there's a discount mentioned (e.g. "30% off"), estimate a reasonable price.
|
||||
If absolutely no pricing information exists, use a placeholder price of 500.
|
||||
Respond strictly in JSON with no explanation.
|
||||
|
||||
{"deals": [
|
||||
{
|
||||
"destination": "City or Country name",
|
||||
"deal_type": "Flight, Hotel, or Package",
|
||||
"description": "4-5 sentences describing the travel deal, dates, what's included, and key highlights",
|
||||
"price": 499.99,
|
||||
"url": "the url as provided"
|
||||
},
|
||||
...
|
||||
]}"""
|
||||
|
||||
USER_PROMPT_PREFIX = """Respond with the 5 most promising travel deals with destinations, types, and descriptions.
|
||||
Respond strictly in JSON. Provide detailed descriptions focusing on what travelers get.
|
||||
Extract the destination and deal type (Flight/Hotel/Package) from the title and description.
|
||||
For pricing: extract exact prices if available, estimate from percentage discounts, or use 500 as placeholder.
|
||||
|
||||
Travel Deals:
|
||||
|
||||
"""
|
||||
|
||||
USER_PROMPT_SUFFIX = "\n\nStrictly respond in JSON with exactly 5 deals."
|
||||
|
||||
name = "Travel Scanner"
|
||||
color = Agent.CYAN
|
||||
|
||||
def __init__(self):
|
||||
self.log("Travel Scanner is initializing")
|
||||
self.openai = OpenAI()
|
||||
self.log("Travel Scanner is ready")
|
||||
|
||||
def fetch_deals(self, memory) -> List[ScrapedTravelDeal]:
|
||||
self.log("Travel Scanner fetching deals from RSS feeds")
|
||||
urls = [opp.deal.url for opp in memory]
|
||||
scraped = ScrapedTravelDeal.fetch()
|
||||
result = [scrape for scrape in scraped if scrape.url not in urls]
|
||||
self.log(f"Travel Scanner found {len(result)} new deals")
|
||||
return result
|
||||
|
||||
def make_user_prompt(self, scraped) -> str:
|
||||
user_prompt = self.USER_PROMPT_PREFIX
|
||||
user_prompt += '\n\n'.join([scrape.describe() for scrape in scraped])
|
||||
user_prompt += self.USER_PROMPT_SUFFIX
|
||||
return user_prompt
|
||||
|
||||
def scan(self, memory: List[str]=[]) -> Optional[TravelDealSelection]:
|
||||
scraped = self.fetch_deals(memory)
|
||||
if scraped:
|
||||
user_prompt = self.make_user_prompt(scraped)
|
||||
self.log("Travel Scanner calling OpenAI")
|
||||
result = self.openai.beta.chat.completions.parse(
|
||||
model=self.MODEL,
|
||||
messages=[
|
||||
{"role": "system", "content": self.SYSTEM_PROMPT},
|
||||
{"role": "user", "content": user_prompt}
|
||||
],
|
||||
response_format=TravelDealSelection
|
||||
)
|
||||
result = result.choices[0].message.parsed
|
||||
valid_deals = [deal for deal in result.deals if deal.price > 0]
|
||||
result.deals = valid_deals
|
||||
self.log(f"Travel Scanner received {len(result.deals)} valid deals")
|
||||
return result if result.deals else None
|
||||
return None
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
import os
|
||||
import sys
|
||||
import numpy as np
|
||||
import joblib
|
||||
from sentence_transformers import SentenceTransformer
|
||||
import xgboost as xgb
|
||||
|
||||
w8d5_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||
if w8d5_path not in sys.path:
|
||||
sys.path.insert(0, w8d5_path)
|
||||
|
||||
from agents.agent import Agent
|
||||
|
||||
|
||||
class TravelXGBoostAgent(Agent):
|
||||
|
||||
name = "XGBoost Estimator"
|
||||
color = Agent.GREEN
|
||||
|
||||
def __init__(self, collection):
|
||||
self.log("XGBoost Estimator initializing")
|
||||
self.collection = collection
|
||||
self.model_path = os.path.join(w8d5_path, 'helpers', 'travel_xgboost_model.pkl')
|
||||
self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
|
||||
|
||||
if os.path.exists(self.model_path):
|
||||
self.log("Loading existing XGBoost model")
|
||||
self.model = joblib.load(self.model_path)
|
||||
else:
|
||||
self.log("Training new XGBoost model")
|
||||
self.model = self._train_model()
|
||||
joblib.dump(self.model, self.model_path)
|
||||
self.log(f"XGBoost model saved to {self.model_path}")
|
||||
|
||||
self.log("XGBoost Estimator ready")
|
||||
|
||||
def _train_model(self):
|
||||
self.log("Fetching training data from ChromaDB")
|
||||
result = self.collection.get(include=['embeddings', 'metadatas'])
|
||||
|
||||
X = np.array(result['embeddings'])
|
||||
y = np.array([m['price'] for m in result['metadatas']])
|
||||
|
||||
self.log(f"Training on {len(X)} samples")
|
||||
|
||||
model = xgb.XGBRegressor(
|
||||
n_estimators=100,
|
||||
max_depth=6,
|
||||
learning_rate=0.1,
|
||||
subsample=0.8,
|
||||
colsample_bytree=0.8,
|
||||
random_state=42,
|
||||
n_jobs=-1
|
||||
)
|
||||
|
||||
model.fit(X, y)
|
||||
self.log("XGBoost training complete")
|
||||
|
||||
return model
|
||||
|
||||
def estimate(self, description: str) -> float:
|
||||
self.log(f"XGBoost estimating price for: {description[:50]}...")
|
||||
|
||||
embedding = self.embedder.encode([description])[0]
|
||||
embedding_2d = embedding.reshape(1, -1)
|
||||
|
||||
prediction = self.model.predict(embedding_2d)[0]
|
||||
|
||||
prediction = max(0, prediction)
|
||||
|
||||
self.log(f"XGBoost estimate: ${prediction:.2f}")
|
||||
return float(prediction)
|
||||
|
||||
Reference in New Issue
Block a user