feat(week8): add ensemble-joshua assignment (API, Modal, agents); omit large RF model

This commit is contained in:
The Top Dev
2025-10-30 00:38:42 +03:00
parent a20b9b1bce
commit 9d8fca6eb4
11 changed files with 805 additions and 0 deletions

View File

@@ -0,0 +1,35 @@
import logging
class Agent:
"""
An abstract superclass for Agents
Used to log messages in a way that can identify each Agent
"""
# Foreground colors
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
# Background color
BG_BLACK = '\033[40m'
# Reset code to return to default color
RESET = '\033[0m'
name: str = ""
color: str = '\033[37m'
def log(self, message):
"""
Log this as an info message, identifying the agent
"""
color_code = self.BG_BLACK + self.color
message = f"[{self.name}] {message}"
logging.info(color_code + message + self.RESET)

View File

@@ -0,0 +1,111 @@
from pydantic import BaseModel
from typing import List, Dict, Self
from bs4 import BeautifulSoup
import re
import feedparser
from tqdm import tqdm
import requests
import time
feeds = [
"https://www.dealnews.com/c142/Electronics/?rss=1",
"https://www.dealnews.com/c39/Computers/?rss=1",
"https://www.dealnews.com/c238/Automotive/?rss=1",
"https://www.dealnews.com/f1912/Smart-Home/?rss=1",
"https://www.dealnews.com/c196/Home-Garden/?rss=1",
]
def extract(html_snippet: str) -> str:
"""
Use Beautiful Soup to clean up this HTML snippet and extract useful text
"""
soup = BeautifulSoup(html_snippet, 'html.parser')
snippet_div = soup.find('div', class_='snippet summary')
if snippet_div:
description = snippet_div.get_text(strip=True)
description = BeautifulSoup(description, 'html.parser').get_text()
description = re.sub('<[^<]+?>', '', description)
result = description.strip()
else:
result = html_snippet
return result.replace('\n', ' ')
class ScrapedDeal:
"""
A class to represent a Deal retrieved from an RSS feed
"""
category: str
title: str
summary: str
url: str
details: str
features: str
def __init__(self, entry: Dict[str, str]):
"""
Populate this instance based on the provided dict
"""
self.title = entry['title']
self.summary = extract(entry['summary'])
self.url = entry['links'][0]['href']
stuff = requests.get(self.url).content
soup = BeautifulSoup(stuff, 'html.parser')
content = soup.find('div', class_='content-section').get_text()
content = content.replace('\nmore', '').replace('\n', ' ')
if "Features" in content:
self.details, self.features = content.split("Features")
else:
self.details = content
self.features = ""
def __repr__(self):
"""
Return a string to describe this deal
"""
return f"<{self.title}>"
def describe(self):
"""
Return a longer string to describe this deal for use in calling a model
"""
return f"Title: {self.title}\nDetails: {self.details.strip()}\nFeatures: {self.features.strip()}\nURL: {self.url}"
@classmethod
def fetch(cls, show_progress : bool = False) -> List[Self]:
"""
Retrieve all deals from the selected RSS feeds
"""
deals = []
feed_iter = tqdm(feeds) if show_progress else feeds
for feed_url in feed_iter:
feed = feedparser.parse(feed_url)
for entry in feed.entries[:10]:
deals.append(cls(entry))
time.sleep(0.5)
return deals
class Deal(BaseModel):
"""
A class to Represent a Deal with a summary description
"""
product_description: str
price: float
url: str
class DealSelection(BaseModel):
"""
A class to Represent a list of Deals
"""
deals: List[Deal]
class Opportunity(BaseModel):
"""
A class to represent a possible opportunity: a Deal where we estimate
it should cost more than it's being offered
"""
deal: Deal
estimate: float
discount: float

View File

@@ -0,0 +1,57 @@
import pandas as pd
from sklearn.linear_model import LinearRegression
import joblib
import os
from agents.agent import Agent
from agents.specialist_agent import SpecialistAgent
from agents.frontier_agent import FrontierAgent
from agents.random_forest_agent import RandomForestAgent
class EnsembleAgent(Agent):
name = "Ensemble Agent"
color = Agent.YELLOW
def __init__(self, collection):
"""
Create an instance of Ensemble, by creating each of the models
And loading the weights of the Ensemble
"""
self.log("Initializing Ensemble Agent")
self.specialist = SpecialistAgent()
self.frontier = FrontierAgent(collection)
self.random_forest = RandomForestAgent()
# Resolve model path: prefer local contribution folder copy, fallback to week8 root
candidate_paths = [
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'ensemble_model.pkl'), # ../../ensemble_model.pkl
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'ensemble_model.pkl'), # ../../../ensemble_model.pkl (week8 root)
'ensemble_model.pkl',
]
model_path = next((p for p in candidate_paths if os.path.exists(p)), candidate_paths[-1])
self.model = joblib.load(model_path)
self.log("Ensemble Agent is ready")
def price(self, description: str) -> float:
"""
Run this ensemble model
Ask each of the models to price the product
Then use the Linear Regression model to return the weighted price
:param description: the description of a product
:return: an estimate of its price
"""
self.log("Running Ensemble Agent - collaborating with specialist, frontier and random forest agents")
specialist = self.specialist.price(description)
frontier = self.frontier.price(description)
random_forest = self.random_forest.price(description)
X = pd.DataFrame({
'Specialist': [specialist],
'Frontier': [frontier],
'RandomForest': [random_forest],
'Min': [min(specialist, frontier, random_forest)],
'Max': [max(specialist, frontier, random_forest)],
})
y = max(0, self.model.predict(X)[0])
self.log(f"Ensemble Agent complete - returning ${y:.2f}")
return y

View File

@@ -0,0 +1,80 @@
import os
# from twilio.rest import Client
from agents.deals import Opportunity
import http.client
import urllib
from agents.agent import Agent
# Uncomment the Twilio lines if you wish to use Twilio
DO_TEXT = False
DO_PUSH = True
class MessagingAgent(Agent):
name = "Messaging Agent"
color = Agent.WHITE
def __init__(self):
"""
Set up this object to either do push notifications via Pushover,
or SMS via Twilio,
whichever is specified in the constants
"""
self.log(f"Messaging Agent is initializing")
if DO_TEXT:
account_sid = os.getenv('TWILIO_ACCOUNT_SID', 'your-sid-if-not-using-env')
auth_token = os.getenv('TWILIO_AUTH_TOKEN', 'your-auth-if-not-using-env')
self.me_from = os.getenv('TWILIO_FROM', 'your-phone-number-if-not-using-env')
self.me_to = os.getenv('MY_PHONE_NUMBER', 'your-phone-number-if-not-using-env')
# self.client = Client(account_sid, auth_token)
self.log("Messaging Agent has initialized Twilio")
if DO_PUSH:
self.pushover_user = os.getenv('PUSHOVER_USER', 'your-pushover-user-if-not-using-env')
self.pushover_token = os.getenv('PUSHOVER_TOKEN', 'your-pushover-user-if-not-using-env')
self.log("Messaging Agent has initialized Pushover")
def message(self, text):
"""
Send an SMS message using the Twilio API
"""
self.log("Messaging Agent is sending a text message")
message = self.client.messages.create(
from_=self.me_from,
body=text,
to=self.me_to
)
def push(self, text):
"""
Send a Push Notification using the Pushover API
"""
self.log("Messaging Agent is sending a push notification")
conn = http.client.HTTPSConnection("api.pushover.net:443")
conn.request("POST", "/1/messages.json",
urllib.parse.urlencode({
"token": self.pushover_token,
"user": self.pushover_user,
"message": text,
"sound": "cashregister"
}), { "Content-type": "application/x-www-form-urlencoded" })
conn.getresponse()
def alert(self, opportunity: Opportunity):
"""
Make an alert about the specified Opportunity
"""
text = f"Deal Alert! Price=${opportunity.deal.price:.2f}, "
text += f"Estimate=${opportunity.estimate:.2f}, "
text += f"Discount=${opportunity.discount:.2f} :"
text += opportunity.deal.product_description[:10]+'... '
text += opportunity.deal.url
if DO_TEXT:
self.message(text)
if DO_PUSH:
self.push(text)
self.log("Messaging Agent has completed")

View File

@@ -0,0 +1,58 @@
from typing import Optional, List
from agents.agent import Agent
from agents.deals import ScrapedDeal, DealSelection, Deal, Opportunity
from agents.scanner_agent import ScannerAgent
from agents.ensemble_agent import EnsembleAgent
from agents.messaging_agent import MessagingAgent
class PlanningAgent(Agent):
name = "Planning Agent"
color = Agent.GREEN
DEAL_THRESHOLD = 50
def __init__(self, collection):
"""
Create instances of the 3 Agents that this planner coordinates across
"""
self.log("Planning Agent is initializing")
self.scanner = ScannerAgent()
self.ensemble = EnsembleAgent(collection)
self.messenger = MessagingAgent()
self.log("Planning Agent is ready")
def run(self, deal: Deal) -> Opportunity:
"""
Run the workflow for a particular deal
:param deal: the deal, summarized from an RSS scrape
:returns: an opportunity including the discount
"""
self.log("Planning Agent is pricing up a potential deal")
estimate = self.ensemble.price(deal.product_description)
discount = estimate - deal.price
self.log(f"Planning Agent has processed a deal with discount ${discount:.2f}")
return Opportunity(deal=deal, estimate=estimate, discount=discount)
def plan(self, memory: List[str] = []) -> Optional[Opportunity]:
"""
Run the full workflow:
1. Use the ScannerAgent to find deals from RSS feeds
2. Use the EnsembleAgent to estimate them
3. Use the MessagingAgent to send a notification of deals
:param memory: a list of URLs that have been surfaced in the past
:return: an Opportunity if one was surfaced, otherwise None
"""
self.log("Planning Agent is kicking off a run")
selection = self.scanner.scan(memory=memory)
if selection:
opportunities = [self.run(deal) for deal in selection.deals[:5]]
opportunities.sort(key=lambda opp: opp.discount, reverse=True)
best = opportunities[0]
self.log(f"Planning Agent has identified the best deal has discount ${best.discount:.2f}")
if best.discount > self.DEAL_THRESHOLD:
self.messenger.alert(best)
self.log("Planning Agent has completed a run")
return best if best.discount > self.DEAL_THRESHOLD else None
return None

View File

@@ -0,0 +1,46 @@
# imports
import os
import re
from typing import List
from sentence_transformers import SentenceTransformer
import joblib
import os
from agents.agent import Agent
class RandomForestAgent(Agent):
name = "Random Forest Agent"
color = Agent.MAGENTA
def __init__(self):
"""
Initialize this object by loading in the saved model weights
and the SentenceTransformer vector encoding model
"""
self.log("Random Forest Agent is initializing")
self.vectorizer = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
# Resolve model path: prefer local contribution folder copy, fallback to week8 root
candidate_paths = [
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'random_forest_model.pkl'), # ../../random_forest_model.pkl
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'random_forest_model.pkl'), # ../../../random_forest_model.pkl (week8 root)
'random_forest_model.pkl',
]
model_path = next((p for p in candidate_paths if os.path.exists(p)), candidate_paths[-1])
self.model = joblib.load(model_path)
self.log("Random Forest Agent is ready")
def price(self, description: str) -> float:
"""
Use a Random Forest model to estimate the price of the described item
:param description: the product to be estimated
:return: the price as a float
"""
self.log("Random Forest Agent is starting a prediction")
vector = self.vectorizer.encode([description])
result = max(0, self.model.predict(vector)[0])
self.log(f"Random Forest Agent completed - predicting ${result:.2f}")
return result

View File

@@ -0,0 +1,95 @@
import os
import json
from typing import Optional, List
from openai import OpenAI
from agents.deals import ScrapedDeal, DealSelection
from agents.agent import Agent
class ScannerAgent(Agent):
MODEL = "gpt-4o-mini"
SYSTEM_PROMPT = """You identify and summarize the 5 most detailed deals from a list, by selecting deals that have the most detailed, high quality description and the most clear price.
Respond strictly in JSON with no explanation, using this format. You should provide the price as a number derived from the description. If the price of a deal isn't clear, do not include that deal in your response.
Most important is that you respond with the 5 deals that have the most detailed product description with price. It's not important to mention the terms of the deal; most important is a thorough description of the product.
Be careful with products that are described as "$XXX off" or "reduced by $XXX" - this isn't the actual price of the product. Only respond with products when you are highly confident about the price.
{"deals": [
{
"product_description": "Your clearly expressed summary of the product in 4-5 sentences. Details of the item are much more important than why it's a good deal. Avoid mentioning discounts and coupons; focus on the item itself. There should be a paragpraph of text for each item you choose.",
"price": 99.99,
"url": "the url as provided"
},
...
]}"""
USER_PROMPT_PREFIX = """Respond with the most promising 5 deals from this list, selecting those which have the most detailed, high quality product description and a clear price that is greater than 0.
Respond strictly in JSON, and only JSON. You should rephrase the description to be a summary of the product itself, not the terms of the deal.
Remember to respond with a paragraph of text in the product_description field for each of the 5 items that you select.
Be careful with products that are described as "$XXX off" or "reduced by $XXX" - this isn't the actual price of the product. Only respond with products when you are highly confident about the price.
Deals:
"""
USER_PROMPT_SUFFIX = "\n\nStrictly respond in JSON and include exactly 5 deals, no more."
name = "Scanner Agent"
color = Agent.CYAN
def __init__(self):
"""
Set up this instance by initializing OpenAI
"""
self.log("Scanner Agent is initializing")
self.openai = OpenAI()
self.log("Scanner Agent is ready")
def fetch_deals(self, memory) -> List[ScrapedDeal]:
"""
Look up deals published on RSS feeds
Return any new deals that are not already in the memory provided
"""
self.log("Scanner Agent is about to fetch deals from RSS feed")
urls = [opp.deal.url for opp in memory]
scraped = ScrapedDeal.fetch()
result = [scrape for scrape in scraped if scrape.url not in urls]
self.log(f"Scanner Agent received {len(result)} deals not already scraped")
return result
def make_user_prompt(self, scraped) -> str:
"""
Create a user prompt for OpenAI based on the scraped deals provided
"""
user_prompt = self.USER_PROMPT_PREFIX
user_prompt += '\n\n'.join([scrape.describe() for scrape in scraped])
user_prompt += self.USER_PROMPT_SUFFIX
return user_prompt
def scan(self, memory: List[str]=[]) -> Optional[DealSelection]:
"""
Call OpenAI to provide a high potential list of deals with good descriptions and prices
Use StructuredOutputs to ensure it conforms to our specifications
:param memory: a list of URLs representing deals already raised
:return: a selection of good deals, or None if there aren't any
"""
scraped = self.fetch_deals(memory)
if scraped:
user_prompt = self.make_user_prompt(scraped)
self.log("Scanner Agent is calling OpenAI using Structured Output")
result = self.openai.beta.chat.completions.parse(
model=self.MODEL,
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": user_prompt}
],
response_format=DealSelection
)
result = result.choices[0].message.parsed
result.deals = [deal for deal in result.deals if deal.price>0]
self.log(f"Scanner Agent received {len(result.deals)} selected deals with price>0 from OpenAI")
return result
return None