Merge pull request #873 from TheTopDeveloper/community-contributions-branch
Add Week 6 finetuning solution with pickle data and enhanced modules- Joshua Oluoch (Gen AI Bootcamp)
This commit is contained in:
@@ -0,0 +1,35 @@
|
||||
import logging
|
||||
|
||||
class Agent:
|
||||
"""
|
||||
An abstract superclass for Agents
|
||||
Used to log messages in a way that can identify each Agent
|
||||
"""
|
||||
|
||||
# Foreground colors
|
||||
RED = '\033[31m'
|
||||
GREEN = '\033[32m'
|
||||
YELLOW = '\033[33m'
|
||||
BLUE = '\033[34m'
|
||||
MAGENTA = '\033[35m'
|
||||
CYAN = '\033[36m'
|
||||
WHITE = '\033[37m'
|
||||
|
||||
# Background color
|
||||
BG_BLACK = '\033[40m'
|
||||
|
||||
# Reset code to return to default color
|
||||
RESET = '\033[0m'
|
||||
|
||||
name: str = ""
|
||||
color: str = '\033[37m'
|
||||
|
||||
def log(self, message):
|
||||
"""
|
||||
Log this as an info message, identifying the agent
|
||||
"""
|
||||
color_code = self.BG_BLACK + self.color
|
||||
message = f"[{self.name}] {message}"
|
||||
logging.info(color_code + message + self.RESET)
|
||||
|
||||
|
||||
111
week8/community_contributions/ensemble-joshua/agents/deals.py
Normal file
111
week8/community_contributions/ensemble-joshua/agents/deals.py
Normal file
@@ -0,0 +1,111 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Dict, Self
|
||||
from bs4 import BeautifulSoup
|
||||
import re
|
||||
import feedparser
|
||||
from tqdm import tqdm
|
||||
import requests
|
||||
import time
|
||||
|
||||
feeds = [
|
||||
"https://www.dealnews.com/c142/Electronics/?rss=1",
|
||||
"https://www.dealnews.com/c39/Computers/?rss=1",
|
||||
"https://www.dealnews.com/c238/Automotive/?rss=1",
|
||||
"https://www.dealnews.com/f1912/Smart-Home/?rss=1",
|
||||
"https://www.dealnews.com/c196/Home-Garden/?rss=1",
|
||||
]
|
||||
|
||||
def extract(html_snippet: str) -> str:
|
||||
"""
|
||||
Use Beautiful Soup to clean up this HTML snippet and extract useful text
|
||||
"""
|
||||
soup = BeautifulSoup(html_snippet, 'html.parser')
|
||||
snippet_div = soup.find('div', class_='snippet summary')
|
||||
|
||||
if snippet_div:
|
||||
description = snippet_div.get_text(strip=True)
|
||||
description = BeautifulSoup(description, 'html.parser').get_text()
|
||||
description = re.sub('<[^<]+?>', '', description)
|
||||
result = description.strip()
|
||||
else:
|
||||
result = html_snippet
|
||||
return result.replace('\n', ' ')
|
||||
|
||||
class ScrapedDeal:
|
||||
"""
|
||||
A class to represent a Deal retrieved from an RSS feed
|
||||
"""
|
||||
category: str
|
||||
title: str
|
||||
summary: str
|
||||
url: str
|
||||
details: str
|
||||
features: str
|
||||
|
||||
def __init__(self, entry: Dict[str, str]):
|
||||
"""
|
||||
Populate this instance based on the provided dict
|
||||
"""
|
||||
self.title = entry['title']
|
||||
self.summary = extract(entry['summary'])
|
||||
self.url = entry['links'][0]['href']
|
||||
stuff = requests.get(self.url).content
|
||||
soup = BeautifulSoup(stuff, 'html.parser')
|
||||
content = soup.find('div', class_='content-section').get_text()
|
||||
content = content.replace('\nmore', '').replace('\n', ' ')
|
||||
if "Features" in content:
|
||||
self.details, self.features = content.split("Features")
|
||||
else:
|
||||
self.details = content
|
||||
self.features = ""
|
||||
|
||||
def __repr__(self):
|
||||
"""
|
||||
Return a string to describe this deal
|
||||
"""
|
||||
return f"<{self.title}>"
|
||||
|
||||
def describe(self):
|
||||
"""
|
||||
Return a longer string to describe this deal for use in calling a model
|
||||
"""
|
||||
return f"Title: {self.title}\nDetails: {self.details.strip()}\nFeatures: {self.features.strip()}\nURL: {self.url}"
|
||||
|
||||
@classmethod
|
||||
def fetch(cls, show_progress : bool = False) -> List[Self]:
|
||||
"""
|
||||
Retrieve all deals from the selected RSS feeds
|
||||
"""
|
||||
deals = []
|
||||
feed_iter = tqdm(feeds) if show_progress else feeds
|
||||
for feed_url in feed_iter:
|
||||
feed = feedparser.parse(feed_url)
|
||||
for entry in feed.entries[:10]:
|
||||
deals.append(cls(entry))
|
||||
time.sleep(0.5)
|
||||
return deals
|
||||
|
||||
class Deal(BaseModel):
|
||||
"""
|
||||
A class to Represent a Deal with a summary description
|
||||
"""
|
||||
product_description: str
|
||||
price: float
|
||||
url: str
|
||||
|
||||
class DealSelection(BaseModel):
|
||||
"""
|
||||
A class to Represent a list of Deals
|
||||
"""
|
||||
deals: List[Deal]
|
||||
|
||||
class Opportunity(BaseModel):
|
||||
"""
|
||||
A class to represent a possible opportunity: a Deal where we estimate
|
||||
it should cost more than it's being offered
|
||||
"""
|
||||
deal: Deal
|
||||
estimate: float
|
||||
discount: float
|
||||
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
import pandas as pd
|
||||
from sklearn.linear_model import LinearRegression
|
||||
import joblib
|
||||
import os
|
||||
|
||||
from agents.agent import Agent
|
||||
from agents.specialist_agent import SpecialistAgent
|
||||
from agents.frontier_agent import FrontierAgent
|
||||
from agents.random_forest_agent import RandomForestAgent
|
||||
|
||||
class EnsembleAgent(Agent):
|
||||
|
||||
name = "Ensemble Agent"
|
||||
color = Agent.YELLOW
|
||||
|
||||
def __init__(self, collection):
|
||||
"""
|
||||
Create an instance of Ensemble, by creating each of the models
|
||||
And loading the weights of the Ensemble
|
||||
"""
|
||||
self.log("Initializing Ensemble Agent")
|
||||
self.specialist = SpecialistAgent()
|
||||
self.frontier = FrontierAgent(collection)
|
||||
self.random_forest = RandomForestAgent()
|
||||
# Resolve model path: prefer local contribution folder copy, fallback to week8 root
|
||||
candidate_paths = [
|
||||
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'ensemble_model.pkl'), # ../../ensemble_model.pkl
|
||||
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'ensemble_model.pkl'), # ../../../ensemble_model.pkl (week8 root)
|
||||
'ensemble_model.pkl',
|
||||
]
|
||||
model_path = next((p for p in candidate_paths if os.path.exists(p)), candidate_paths[-1])
|
||||
self.model = joblib.load(model_path)
|
||||
self.log("Ensemble Agent is ready")
|
||||
|
||||
def price(self, description: str) -> float:
|
||||
"""
|
||||
Run this ensemble model
|
||||
Ask each of the models to price the product
|
||||
Then use the Linear Regression model to return the weighted price
|
||||
:param description: the description of a product
|
||||
:return: an estimate of its price
|
||||
"""
|
||||
self.log("Running Ensemble Agent - collaborating with specialist, frontier and random forest agents")
|
||||
specialist = self.specialist.price(description)
|
||||
frontier = self.frontier.price(description)
|
||||
random_forest = self.random_forest.price(description)
|
||||
X = pd.DataFrame({
|
||||
'Specialist': [specialist],
|
||||
'Frontier': [frontier],
|
||||
'RandomForest': [random_forest],
|
||||
'Min': [min(specialist, frontier, random_forest)],
|
||||
'Max': [max(specialist, frontier, random_forest)],
|
||||
})
|
||||
y = max(0, self.model.predict(X)[0])
|
||||
self.log(f"Ensemble Agent complete - returning ${y:.2f}")
|
||||
return y
|
||||
|
||||
@@ -0,0 +1,80 @@
|
||||
import os
|
||||
# from twilio.rest import Client
|
||||
from agents.deals import Opportunity
|
||||
import http.client
|
||||
import urllib
|
||||
from agents.agent import Agent
|
||||
|
||||
# Uncomment the Twilio lines if you wish to use Twilio
|
||||
|
||||
DO_TEXT = False
|
||||
DO_PUSH = True
|
||||
|
||||
class MessagingAgent(Agent):
|
||||
|
||||
name = "Messaging Agent"
|
||||
color = Agent.WHITE
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Set up this object to either do push notifications via Pushover,
|
||||
or SMS via Twilio,
|
||||
whichever is specified in the constants
|
||||
"""
|
||||
self.log(f"Messaging Agent is initializing")
|
||||
if DO_TEXT:
|
||||
account_sid = os.getenv('TWILIO_ACCOUNT_SID', 'your-sid-if-not-using-env')
|
||||
auth_token = os.getenv('TWILIO_AUTH_TOKEN', 'your-auth-if-not-using-env')
|
||||
self.me_from = os.getenv('TWILIO_FROM', 'your-phone-number-if-not-using-env')
|
||||
self.me_to = os.getenv('MY_PHONE_NUMBER', 'your-phone-number-if-not-using-env')
|
||||
# self.client = Client(account_sid, auth_token)
|
||||
self.log("Messaging Agent has initialized Twilio")
|
||||
if DO_PUSH:
|
||||
self.pushover_user = os.getenv('PUSHOVER_USER', 'your-pushover-user-if-not-using-env')
|
||||
self.pushover_token = os.getenv('PUSHOVER_TOKEN', 'your-pushover-user-if-not-using-env')
|
||||
self.log("Messaging Agent has initialized Pushover")
|
||||
|
||||
def message(self, text):
|
||||
"""
|
||||
Send an SMS message using the Twilio API
|
||||
"""
|
||||
self.log("Messaging Agent is sending a text message")
|
||||
message = self.client.messages.create(
|
||||
from_=self.me_from,
|
||||
body=text,
|
||||
to=self.me_to
|
||||
)
|
||||
|
||||
def push(self, text):
|
||||
"""
|
||||
Send a Push Notification using the Pushover API
|
||||
"""
|
||||
self.log("Messaging Agent is sending a push notification")
|
||||
conn = http.client.HTTPSConnection("api.pushover.net:443")
|
||||
conn.request("POST", "/1/messages.json",
|
||||
urllib.parse.urlencode({
|
||||
"token": self.pushover_token,
|
||||
"user": self.pushover_user,
|
||||
"message": text,
|
||||
"sound": "cashregister"
|
||||
}), { "Content-type": "application/x-www-form-urlencoded" })
|
||||
conn.getresponse()
|
||||
|
||||
def alert(self, opportunity: Opportunity):
|
||||
"""
|
||||
Make an alert about the specified Opportunity
|
||||
"""
|
||||
text = f"Deal Alert! Price=${opportunity.deal.price:.2f}, "
|
||||
text += f"Estimate=${opportunity.estimate:.2f}, "
|
||||
text += f"Discount=${opportunity.discount:.2f} :"
|
||||
text += opportunity.deal.product_description[:10]+'... '
|
||||
text += opportunity.deal.url
|
||||
if DO_TEXT:
|
||||
self.message(text)
|
||||
if DO_PUSH:
|
||||
self.push(text)
|
||||
self.log("Messaging Agent has completed")
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
from typing import Optional, List
|
||||
from agents.agent import Agent
|
||||
from agents.deals import ScrapedDeal, DealSelection, Deal, Opportunity
|
||||
from agents.scanner_agent import ScannerAgent
|
||||
from agents.ensemble_agent import EnsembleAgent
|
||||
from agents.messaging_agent import MessagingAgent
|
||||
|
||||
|
||||
class PlanningAgent(Agent):
|
||||
|
||||
name = "Planning Agent"
|
||||
color = Agent.GREEN
|
||||
DEAL_THRESHOLD = 50
|
||||
|
||||
def __init__(self, collection):
|
||||
"""
|
||||
Create instances of the 3 Agents that this planner coordinates across
|
||||
"""
|
||||
self.log("Planning Agent is initializing")
|
||||
self.scanner = ScannerAgent()
|
||||
self.ensemble = EnsembleAgent(collection)
|
||||
self.messenger = MessagingAgent()
|
||||
self.log("Planning Agent is ready")
|
||||
|
||||
def run(self, deal: Deal) -> Opportunity:
|
||||
"""
|
||||
Run the workflow for a particular deal
|
||||
:param deal: the deal, summarized from an RSS scrape
|
||||
:returns: an opportunity including the discount
|
||||
"""
|
||||
self.log("Planning Agent is pricing up a potential deal")
|
||||
estimate = self.ensemble.price(deal.product_description)
|
||||
discount = estimate - deal.price
|
||||
self.log(f"Planning Agent has processed a deal with discount ${discount:.2f}")
|
||||
return Opportunity(deal=deal, estimate=estimate, discount=discount)
|
||||
|
||||
def plan(self, memory: List[str] = []) -> Optional[Opportunity]:
|
||||
"""
|
||||
Run the full workflow:
|
||||
1. Use the ScannerAgent to find deals from RSS feeds
|
||||
2. Use the EnsembleAgent to estimate them
|
||||
3. Use the MessagingAgent to send a notification of deals
|
||||
:param memory: a list of URLs that have been surfaced in the past
|
||||
:return: an Opportunity if one was surfaced, otherwise None
|
||||
"""
|
||||
self.log("Planning Agent is kicking off a run")
|
||||
selection = self.scanner.scan(memory=memory)
|
||||
if selection:
|
||||
opportunities = [self.run(deal) for deal in selection.deals[:5]]
|
||||
opportunities.sort(key=lambda opp: opp.discount, reverse=True)
|
||||
best = opportunities[0]
|
||||
self.log(f"Planning Agent has identified the best deal has discount ${best.discount:.2f}")
|
||||
if best.discount > self.DEAL_THRESHOLD:
|
||||
self.messenger.alert(best)
|
||||
self.log("Planning Agent has completed a run")
|
||||
return best if best.discount > self.DEAL_THRESHOLD else None
|
||||
return None
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
# imports
|
||||
|
||||
import os
|
||||
import re
|
||||
from typing import List
|
||||
from sentence_transformers import SentenceTransformer
|
||||
import joblib
|
||||
import os
|
||||
from agents.agent import Agent
|
||||
|
||||
|
||||
|
||||
class RandomForestAgent(Agent):
|
||||
|
||||
name = "Random Forest Agent"
|
||||
color = Agent.MAGENTA
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize this object by loading in the saved model weights
|
||||
and the SentenceTransformer vector encoding model
|
||||
"""
|
||||
self.log("Random Forest Agent is initializing")
|
||||
self.vectorizer = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
|
||||
# Resolve model path: prefer local contribution folder copy, fallback to week8 root
|
||||
candidate_paths = [
|
||||
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'random_forest_model.pkl'), # ../../random_forest_model.pkl
|
||||
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'random_forest_model.pkl'), # ../../../random_forest_model.pkl (week8 root)
|
||||
'random_forest_model.pkl',
|
||||
]
|
||||
model_path = next((p for p in candidate_paths if os.path.exists(p)), candidate_paths[-1])
|
||||
self.model = joblib.load(model_path)
|
||||
self.log("Random Forest Agent is ready")
|
||||
|
||||
def price(self, description: str) -> float:
|
||||
"""
|
||||
Use a Random Forest model to estimate the price of the described item
|
||||
:param description: the product to be estimated
|
||||
:return: the price as a float
|
||||
"""
|
||||
self.log("Random Forest Agent is starting a prediction")
|
||||
vector = self.vectorizer.encode([description])
|
||||
result = max(0, self.model.predict(vector)[0])
|
||||
self.log(f"Random Forest Agent completed - predicting ${result:.2f}")
|
||||
return result
|
||||
|
||||
@@ -0,0 +1,95 @@
|
||||
import os
|
||||
import json
|
||||
from typing import Optional, List
|
||||
from openai import OpenAI
|
||||
from agents.deals import ScrapedDeal, DealSelection
|
||||
from agents.agent import Agent
|
||||
|
||||
|
||||
class ScannerAgent(Agent):
|
||||
|
||||
MODEL = "gpt-4o-mini"
|
||||
|
||||
SYSTEM_PROMPT = """You identify and summarize the 5 most detailed deals from a list, by selecting deals that have the most detailed, high quality description and the most clear price.
|
||||
Respond strictly in JSON with no explanation, using this format. You should provide the price as a number derived from the description. If the price of a deal isn't clear, do not include that deal in your response.
|
||||
Most important is that you respond with the 5 deals that have the most detailed product description with price. It's not important to mention the terms of the deal; most important is a thorough description of the product.
|
||||
Be careful with products that are described as "$XXX off" or "reduced by $XXX" - this isn't the actual price of the product. Only respond with products when you are highly confident about the price.
|
||||
|
||||
{"deals": [
|
||||
{
|
||||
"product_description": "Your clearly expressed summary of the product in 4-5 sentences. Details of the item are much more important than why it's a good deal. Avoid mentioning discounts and coupons; focus on the item itself. There should be a paragpraph of text for each item you choose.",
|
||||
"price": 99.99,
|
||||
"url": "the url as provided"
|
||||
},
|
||||
...
|
||||
]}"""
|
||||
|
||||
USER_PROMPT_PREFIX = """Respond with the most promising 5 deals from this list, selecting those which have the most detailed, high quality product description and a clear price that is greater than 0.
|
||||
Respond strictly in JSON, and only JSON. You should rephrase the description to be a summary of the product itself, not the terms of the deal.
|
||||
Remember to respond with a paragraph of text in the product_description field for each of the 5 items that you select.
|
||||
Be careful with products that are described as "$XXX off" or "reduced by $XXX" - this isn't the actual price of the product. Only respond with products when you are highly confident about the price.
|
||||
|
||||
Deals:
|
||||
|
||||
"""
|
||||
|
||||
USER_PROMPT_SUFFIX = "\n\nStrictly respond in JSON and include exactly 5 deals, no more."
|
||||
|
||||
name = "Scanner Agent"
|
||||
color = Agent.CYAN
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Set up this instance by initializing OpenAI
|
||||
"""
|
||||
self.log("Scanner Agent is initializing")
|
||||
self.openai = OpenAI()
|
||||
self.log("Scanner Agent is ready")
|
||||
|
||||
def fetch_deals(self, memory) -> List[ScrapedDeal]:
|
||||
"""
|
||||
Look up deals published on RSS feeds
|
||||
Return any new deals that are not already in the memory provided
|
||||
"""
|
||||
self.log("Scanner Agent is about to fetch deals from RSS feed")
|
||||
urls = [opp.deal.url for opp in memory]
|
||||
scraped = ScrapedDeal.fetch()
|
||||
result = [scrape for scrape in scraped if scrape.url not in urls]
|
||||
self.log(f"Scanner Agent received {len(result)} deals not already scraped")
|
||||
return result
|
||||
|
||||
def make_user_prompt(self, scraped) -> str:
|
||||
"""
|
||||
Create a user prompt for OpenAI based on the scraped deals provided
|
||||
"""
|
||||
user_prompt = self.USER_PROMPT_PREFIX
|
||||
user_prompt += '\n\n'.join([scrape.describe() for scrape in scraped])
|
||||
user_prompt += self.USER_PROMPT_SUFFIX
|
||||
return user_prompt
|
||||
|
||||
def scan(self, memory: List[str]=[]) -> Optional[DealSelection]:
|
||||
"""
|
||||
Call OpenAI to provide a high potential list of deals with good descriptions and prices
|
||||
Use StructuredOutputs to ensure it conforms to our specifications
|
||||
:param memory: a list of URLs representing deals already raised
|
||||
:return: a selection of good deals, or None if there aren't any
|
||||
"""
|
||||
scraped = self.fetch_deals(memory)
|
||||
if scraped:
|
||||
user_prompt = self.make_user_prompt(scraped)
|
||||
self.log("Scanner Agent is calling OpenAI using Structured Output")
|
||||
result = self.openai.beta.chat.completions.parse(
|
||||
model=self.MODEL,
|
||||
messages=[
|
||||
{"role": "system", "content": self.SYSTEM_PROMPT},
|
||||
{"role": "user", "content": user_prompt}
|
||||
],
|
||||
response_format=DealSelection
|
||||
)
|
||||
result = result.choices[0].message.parsed
|
||||
result.deals = [deal for deal in result.deals if deal.price>0]
|
||||
self.log(f"Scanner Agent received {len(result.deals)} selected deals with price>0 from OpenAI")
|
||||
return result
|
||||
return None
|
||||
|
||||
|
||||
75
week8/community_contributions/ensemble-joshua/api.py
Normal file
75
week8/community_contributions/ensemble-joshua/api.py
Normal file
@@ -0,0 +1,75 @@
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel
|
||||
import os
|
||||
import chromadb
|
||||
|
||||
from agents.specialist_agent import SpecialistAgent
|
||||
from agents.frontier_agent import FrontierAgent
|
||||
from agents.random_forest_agent import RandomForestAgent
|
||||
from agents.ensemble_agent import EnsembleAgent
|
||||
from deal_agent_framework import DealAgentFramework
|
||||
|
||||
|
||||
class PriceRequest(BaseModel):
|
||||
description: str
|
||||
|
||||
|
||||
class DealScanResponse(BaseModel):
|
||||
opportunities: list
|
||||
|
||||
|
||||
DB_PATH = os.path.join(os.path.dirname(__file__), "../../products_vectorstore")
|
||||
client = chromadb.PersistentClient(path=DB_PATH)
|
||||
collection = client.get_or_create_collection("products")
|
||||
|
||||
app = FastAPI(title="Week8 Pricer API", version="1.0.0")
|
||||
|
||||
|
||||
@app.get("/healthz")
|
||||
def healthz():
|
||||
return {"ok": True}
|
||||
|
||||
|
||||
@app.post("/price/specialist")
|
||||
def price_specialist(body: PriceRequest):
|
||||
if not body.description:
|
||||
raise HTTPException(400, "description is required")
|
||||
agent = SpecialistAgent()
|
||||
price = float(agent.price(body.description))
|
||||
return {"price": price, "agent": "specialist"}
|
||||
|
||||
|
||||
@app.post("/price/frontier")
|
||||
def price_frontier(body: PriceRequest):
|
||||
if not body.description:
|
||||
raise HTTPException(400, "description is required")
|
||||
agent = FrontierAgent(collection)
|
||||
price = float(agent.price(body.description))
|
||||
return {"price": price, "agent": "frontier"}
|
||||
|
||||
|
||||
@app.post("/price/random_forest")
|
||||
def price_random_forest(body: PriceRequest):
|
||||
if not body.description:
|
||||
raise HTTPException(400, "description is required")
|
||||
agent = RandomForestAgent()
|
||||
price = float(agent.price(body.description))
|
||||
return {"price": price, "agent": "random_forest"}
|
||||
|
||||
|
||||
@app.post("/price/ensemble")
|
||||
def price_ensemble(body: PriceRequest):
|
||||
if not body.description:
|
||||
raise HTTPException(400, "description is required")
|
||||
agent = EnsembleAgent(collection)
|
||||
price = float(agent.price(body.description))
|
||||
return {"price": price, "agent": "ensemble"}
|
||||
|
||||
|
||||
@app.post("/deals/scan")
|
||||
def deals_scan():
|
||||
framework = DealAgentFramework()
|
||||
opportunities = framework.run()
|
||||
return {"count": len(opportunities), "opportunities": [o.dict() for o in opportunities]}
|
||||
|
||||
|
||||
BIN
week8/community_contributions/ensemble-joshua/ensemble_model.pkl
Normal file
BIN
week8/community_contributions/ensemble-joshua/ensemble_model.pkl
Normal file
Binary file not shown.
150
week8/community_contributions/ensemble-joshua/frontier_agent.py
Normal file
150
week8/community_contributions/ensemble-joshua/frontier_agent.py
Normal file
@@ -0,0 +1,150 @@
|
||||
# imports
|
||||
|
||||
import os
|
||||
import re
|
||||
import math
|
||||
import json
|
||||
from typing import List, Dict
|
||||
from openai import OpenAI
|
||||
try:
|
||||
from openai import APIStatusError
|
||||
APIStatusError = Exception
|
||||
import statistics
|
||||
from sentence_transformers import SentenceTransformer
|
||||
from datasets import load_dataset
|
||||
import chromadb
|
||||
from items import Item
|
||||
from testing import Tester
|
||||
from agents.agent import Agent
|
||||
|
||||
|
||||
class FrontierAgent(Agent):
|
||||
|
||||
name = "Frontier Agent"
|
||||
color = Agent.BLUE
|
||||
|
||||
MODEL = "gpt-4o-mini"
|
||||
|
||||
def __init__(self, collection):
|
||||
"""
|
||||
Set up this instance by connecting to OpenAI or DeepSeek, to the Chroma Datastore,
|
||||
And setting up the vector encoding model
|
||||
"""
|
||||
self.log("Initializing Frontier Agent")
|
||||
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
|
||||
if deepseek_api_key:
|
||||
self.client = OpenAI(api_key=deepseek_api_key, base_url="https://api.deepseek.com")
|
||||
self.MODEL = "deepseek-chat"
|
||||
self.log("Frontier Agent is set up with DeepSeek")
|
||||
else:
|
||||
self.client = OpenAI()
|
||||
self.MODEL = "gpt-4o-mini"
|
||||
self.log("Frontier Agent is setting up with OpenAI")
|
||||
self.collection = collection
|
||||
self.model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
|
||||
self.log("Frontier Agent is ready")
|
||||
|
||||
def make_context(self, similars: List[str], prices: List[float]) -> str:
|
||||
"""
|
||||
Create context that can be inserted into the prompt
|
||||
:param similars: similar products to the one being estimated
|
||||
:param prices: prices of the similar products
|
||||
:return: text to insert in the prompt that provides context
|
||||
"""
|
||||
message = "To provide some context, here are some other items that might be similar to the item you need to estimate.\n\n"
|
||||
for similar, price in zip(similars, prices):
|
||||
message += f"Potentially related product:\n{similar}\nPrice is ${price:.2f}\n\n"
|
||||
return message
|
||||
|
||||
def messages_for(self, description: str, similars: List[str], prices: List[float]) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Create the message list to be included in a call to OpenAI
|
||||
With the system and user prompt
|
||||
:param description: a description of the product
|
||||
:param similars: similar products to this one
|
||||
:param prices: prices of similar products
|
||||
:return: the list of messages in the format expected by OpenAI
|
||||
"""
|
||||
system_message = "You estimate prices of items. Reply only with the price, no explanation"
|
||||
user_prompt = self.make_context(similars, prices)
|
||||
user_prompt += "And now the question for you:\n\n"
|
||||
user_prompt += "How much does this cost?\n\n" + description
|
||||
return [
|
||||
{"role": "system", "content": system_message},
|
||||
{"role": "user", "content": user_prompt},
|
||||
{"role": "assistant", "content": "Price is $"}
|
||||
]
|
||||
|
||||
def find_similars(self, description: str):
|
||||
"""
|
||||
Return a list of items similar to the given one by looking in the Chroma datastore
|
||||
"""
|
||||
self.log("Frontier Agent is performing a RAG search of the Chroma datastore to find 5 similar products")
|
||||
vector = self.model.encode([description])
|
||||
results = self.collection.query(query_embeddings=vector.astype(float).tolist(), n_results=5)
|
||||
documents = results['documents'][0][:]
|
||||
prices = [m['price'] for m in results['metadatas'][0][:]]
|
||||
self.log("Frontier Agent has found similar products")
|
||||
return documents, prices
|
||||
|
||||
def get_price(self, s) -> float:
|
||||
"""
|
||||
A utility that plucks a floating point number out of a string
|
||||
"""
|
||||
s = s.replace('$','').replace(',','')
|
||||
match = re.search(r"[-+]?\d*\.\d+|\d+", s)
|
||||
return float(match.group()) if match else 0.0
|
||||
|
||||
def price(self, description: str) -> float:
|
||||
"""
|
||||
Make a call to OpenAI or DeepSeek to estimate the price of the described product,
|
||||
by looking up 5 similar products and including them in the prompt to give context
|
||||
:param description: a description of the product
|
||||
:return: an estimate of the price
|
||||
"""
|
||||
documents, prices = self.find_similars(description)
|
||||
|
||||
# If external calls are disabled, or similar pricing is empty, use heuristic
|
||||
allow_external = os.getenv("FRONTIER_ALLOW_EXTERNAL", "true").lower() in {"1", "true", "yes"}
|
||||
|
||||
def heuristic_price() -> float:
|
||||
if prices:
|
||||
# Robust central tendency fallback
|
||||
try:
|
||||
return float(statistics.median(prices))
|
||||
except Exception:
|
||||
return float(sum(prices) / max(len(prices), 1))
|
||||
# As a last resort, return 0.0
|
||||
return 0.0
|
||||
|
||||
if not allow_external:
|
||||
self.log("External LLM calls disabled via FRONTIER_ALLOW_EXTERNAL; using heuristic fallback")
|
||||
result = heuristic_price()
|
||||
self.log(f"Frontier Agent (fallback) - predicting ${result:.2f}")
|
||||
return result
|
||||
|
||||
self.log(f"Frontier Agent is about to call {self.MODEL} with context including 5 similar products")
|
||||
try:
|
||||
response = self.client.chat.completions.create(
|
||||
model=self.MODEL,
|
||||
messages=self.messages_for(description, documents, prices),
|
||||
seed=42,
|
||||
max_tokens=5,
|
||||
)
|
||||
reply = response.choices[0].message.content
|
||||
result = self.get_price(reply)
|
||||
self.log(f"Frontier Agent completed - predicting ${result:.2f}")
|
||||
return result
|
||||
except APIStatusError as e: # Insufficient balance or other HTTP errors
|
||||
msg = getattr(e, "message", str(e))
|
||||
self.log(f"Frontier Agent API error: {msg}. Falling back to heuristic price.")
|
||||
result = heuristic_price()
|
||||
self.log(f"Frontier Agent (fallback) - predicting ${result:.2f}")
|
||||
return result
|
||||
except Exception as e:
|
||||
self.log(f"Frontier Agent unexpected error: {e}. Falling back to heuristic price.")
|
||||
result = heuristic_price()
|
||||
self.log(f"Frontier Agent (fallback) - predicting ${result:.2f}")
|
||||
return result
|
||||
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
import modal
|
||||
from modal import App, Volume, Image
|
||||
|
||||
|
||||
app = modal.App("pricer-service")
|
||||
image = Image.debian_slim().pip_install("huggingface", "torch", "transformers", "bitsandbytes", "accelerate", "peft")
|
||||
|
||||
secrets = [modal.Secret.from_name("hf-secret")]
|
||||
|
||||
# Constants
|
||||
GPU = "T4"
|
||||
BASE_MODEL = "meta-llama/Meta-Llama-3.1-8B"
|
||||
PROJECT_NAME = "pricer"
|
||||
HF_USER = "ed-donner"
|
||||
RUN_NAME = "2024-09-13_13.04.39"
|
||||
PROJECT_RUN_NAME = f"{PROJECT_NAME}-{RUN_NAME}"
|
||||
REVISION = "e8d637df551603dc86cd7a1598a8f44af4d7ae36"
|
||||
FINETUNED_MODEL = f"{HF_USER}/{PROJECT_RUN_NAME}"
|
||||
CACHE_DIR = "/cache"
|
||||
|
||||
|
||||
MIN_CONTAINERS = 0
|
||||
|
||||
QUESTION = "How much does this cost to the nearest dollar?"
|
||||
PREFIX = "Price is $"
|
||||
|
||||
hf_cache_volume = Volume.from_name("hf-hub-cache", create_if_missing=True)
|
||||
|
||||
@app.cls(
|
||||
image=image.env({"HF_HUB_CACHE": CACHE_DIR}),
|
||||
secrets=secrets,
|
||||
gpu=GPU,
|
||||
timeout=1800,
|
||||
min_containers=MIN_CONTAINERS,
|
||||
volumes={CACHE_DIR: hf_cache_volume}
|
||||
)
|
||||
class Pricer:
|
||||
|
||||
@modal.enter()
|
||||
def setup(self):
|
||||
import torch
|
||||
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, set_seed
|
||||
from peft import PeftModel
|
||||
|
||||
# Quant Config
|
||||
quant_config = BitsAndBytesConfig(
|
||||
load_in_4bit=True,
|
||||
bnb_4bit_use_double_quant=True,
|
||||
bnb_4bit_compute_dtype=torch.bfloat16,
|
||||
bnb_4bit_quant_type="nf4"
|
||||
)
|
||||
|
||||
# Load model and tokenizer
|
||||
self.tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL)
|
||||
self.tokenizer.pad_token = self.tokenizer.eos_token
|
||||
self.tokenizer.padding_side = "right"
|
||||
self.base_model = AutoModelForCausalLM.from_pretrained(
|
||||
BASE_MODEL,
|
||||
quantization_config=quant_config,
|
||||
device_map="auto"
|
||||
)
|
||||
self.fine_tuned_model = PeftModel.from_pretrained(self.base_model, FINETUNED_MODEL, revision=REVISION)
|
||||
|
||||
@modal.method()
|
||||
def price(self, description: str) -> float:
|
||||
import os
|
||||
import re
|
||||
import torch
|
||||
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, set_seed
|
||||
from peft import PeftModel
|
||||
|
||||
set_seed(42)
|
||||
prompt = f"{QUESTION}\n\n{description}\n\n{PREFIX}"
|
||||
inputs = self.tokenizer.encode(prompt, return_tensors="pt").to("cuda")
|
||||
attention_mask = torch.ones(inputs.shape, device="cuda")
|
||||
outputs = self.fine_tuned_model.generate(inputs, attention_mask=attention_mask, max_new_tokens=5, num_return_sequences=1)
|
||||
result = self.tokenizer.decode(outputs[0])
|
||||
|
||||
contents = result.split("Price is $")[1]
|
||||
contents = contents.replace(',','')
|
||||
match = re.search(r"[-+]?\d*\.\d+|\d+", contents)
|
||||
return float(match.group()) if match else 0
|
||||
|
||||
|
||||
# Simple HTTP endpoint so external apps can call this on Modal
|
||||
@app.function(image=image, secrets=secrets, gpu=GPU, timeout=1800)
|
||||
@modal.web_endpoint(method="POST")
|
||||
def price_http(body: dict):
|
||||
"""HTTP endpoint: {"description": str} -> {"price": float}"""
|
||||
description = body.get("description", '').strip()
|
||||
if not description:
|
||||
return {"error": "Missing 'description'"}
|
||||
|
||||
pricer = Pricer()
|
||||
value = pricer.price.remote(description)
|
||||
return {"price": float(value)}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user