Files
LLM_Engineering_OLD/week6/community-contributions/w6d5/w6d5.py
2025-10-27 20:46:00 +05:00

458 lines
17 KiB
Python

#!/usr/bin/env python3
import os
import json
import random
import math
import re
import pickle
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv
from openai import OpenAI
from huggingface_hub import login
from datasets import load_dataset
import matplotlib.pyplot as plt
import numpy as np
from collections import Counter
import sys
import warnings
warnings.filterwarnings('ignore')
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
load_dotenv()
os.environ['HF_TOKEN'] = os.getenv('HF_TOKEN', 'your-key-if-not-using-env')
hf_token = os.environ['HF_TOKEN']
if hf_token and hf_token != 'your-key-if-not-using-env':
login(hf_token, add_to_git_credential=True)
print("Logged in to Hugging Face")
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
from items import Item
from testing import Tester
print("Successfully imported Item and Tester classes")
class PricePredictionFineTuner:
def __init__(self, api_key: str):
self.client = OpenAI(api_key=api_key)
self.train = []
self.test = []
self.fine_tune_train = []
self.fine_tune_validation = []
self.fine_tuned_model_name = None
self.wandb_integration = {"type": "wandb", "wandb": {"project": "gpt-pricer"}}
def load_amazon_data(self, category: str = "Appliances") -> None:
print(f"Loading Amazon Reviews 2023 dataset - {category} category...")
train_pkl = os.path.join('..', '..', 'train.pkl')
test_pkl = os.path.join('..', '..', 'test.pkl')
if os.path.exists(train_pkl) and os.path.exists(test_pkl):
print("Found existing pickle files, loading...")
with open(train_pkl, 'rb') as file:
self.train = pickle.load(file)
with open(test_pkl, 'rb') as file:
self.test = pickle.load(file)
print(f"Loaded {len(self.train)} training items and {len(self.test)} test items from pickle files")
else:
print("Pickle files not found. Loading from Hugging Face...")
self._load_from_huggingface(category)
self.fine_tune_train = self.train[:500]
self.fine_tune_validation = self.train[500:600]
print(f"Fine-tuning split: {len(self.fine_tune_train)} train, {len(self.fine_tune_validation)} validation")
def _load_from_huggingface(self, category: str) -> None:
try:
print(f"Downloading {category} dataset from Hugging Face...")
dataset = load_dataset("McAuley-Lab/Amazon-Reviews-2023", f"raw_meta_{category}", split="full", trust_remote_code=True)
print(f"Number of {category}: {len(dataset):,}")
print("Processing items with prices...")
items = []
processed = 0
for datapoint in dataset:
try:
price = float(datapoint["price"])
if price > 0 and price <= 999:
item = Item(datapoint, price)
if item.include:
items.append(item)
processed += 1
if processed % 1000 == 0:
print(f"Processed {processed:,} items, found {len(items):,} valid items")
if len(items) >= 1500:
print(f"Collected {len(items)} items, stopping for efficiency")
break
except (ValueError, TypeError):
continue
print(f"Created {len(items):,} valid Item objects")
if len(items) < 600:
raise ValueError(f"Not enough valid items found: {len(items)}. Need at least 600.")
random.shuffle(items)
split_point = int(0.8 * len(items))
self.train = items[:split_point]
self.test = items[split_point:]
print(f"Split into {len(self.train)} training and {len(self.test)} test items")
print("Saving to pickle files for future use...")
with open(os.path.join('..', '..', 'train.pkl'), 'wb') as f:
pickle.dump(self.train, f)
with open(os.path.join('..', '..', 'test.pkl'), 'wb') as f:
pickle.dump(self.test, f)
print("Saved pickle files")
except Exception as e:
print(f"Error loading from Hugging Face: {e}")
print("This might be due to:")
print("1. Missing HF_TOKEN environment variable")
print("2. Need to accept Meta's terms for the tokenizer")
print("3. Network connectivity issues")
raise
def messages_for(self, item: Item) -> List[Dict[str, str]]:
system_message = "You estimate prices of items. Reply only with the price, no explanation"
user_prompt = item.test_prompt().replace(" to the nearest dollar", "").replace("\n\nPrice is $", "")
return [
{"role": "system", "content": system_message},
{"role": "user", "content": user_prompt},
{"role": "assistant", "content": f"Price is ${item.price:.2f}"}
]
def messages_for_testing(self, item: Item) -> List[Dict[str, str]]:
system_message = "You estimate prices of items. Reply only with the price, no explanation"
user_prompt = item.test_prompt().replace(" to the nearest dollar", "").replace("\n\nPrice is $", "")
return [
{"role": "system", "content": system_message},
{"role": "user", "content": user_prompt},
{"role": "assistant", "content": "Price is $"}
]
def make_jsonl(self, items: List[Item]) -> str:
result = ""
for item in items:
messages = self.messages_for(item)
messages_str = json.dumps(messages)
result += '{"messages": ' + messages_str + '}\n'
return result.strip()
def write_jsonl(self, items: List[Item], filename: str) -> None:
with open(filename, "w") as f:
jsonl = self.make_jsonl(items)
f.write(jsonl)
def save_training_files(self) -> tuple:
print("Creating JSONL files...")
self.write_jsonl(self.fine_tune_train, "fine_tune_train.jsonl")
self.write_jsonl(self.fine_tune_validation, "fine_tune_validation.jsonl")
print("Uploading files to OpenAI...")
with open("fine_tune_train.jsonl", "rb") as f:
train_file = self.client.files.create(file=f, purpose="fine-tune")
with open("fine_tune_validation.jsonl", "rb") as f:
validation_file = self.client.files.create(file=f, purpose="fine-tune")
print(f"Files uploaded: {train_file.id}, {validation_file.id}")
return train_file.id, validation_file.id
def start_fine_tuning(self, train_file_id: str, validation_file_id: str) -> str:
print("Starting fine-tuning job with Weights and Biases integration...")
wandb_key = os.getenv('WANDB_API_KEY')
integrations = []
if wandb_key:
integrations = [self.wandb_integration]
print("Weights and Biases integration enabled")
else:
print("WANDB_API_KEY not found - proceeding without W&B integration")
try:
job = self.client.fine_tuning.jobs.create(
training_file=train_file_id,
validation_file=validation_file_id,
model="gpt-4o-mini-2024-07-18",
seed=42,
hyperparameters={
"n_epochs": 3,
"learning_rate_multiplier": 0.1,
"batch_size": 4
},
integrations=integrations,
suffix="pricer-improved"
)
print(f"Fine-tuning job started: {job.id}")
return job.id
except Exception as e:
print(f"Failed to start fine-tuning job: {e}")
raise
def monitor_training(self, job_id: str) -> Optional[str]:
while True:
job = self.client.fine_tuning.jobs.retrieve(job_id)
status = job.status
print(f"Status: {status}")
if status == "succeeded":
model_name = job.fine_tuned_model
print(f"Training completed! Model: {model_name}")
return model_name
elif status == "failed":
print(f"Training failed: {job.error}")
return None
elif status in ["running", "validating_files", "queued"]:
print(f"Training in progress... ({status})")
import time
time.sleep(30)
continue
else:
print(f"Unknown status: {status}")
import time
time.sleep(30)
continue
def get_price(self, s: str) -> float:
s = s.replace('$', '').replace(',', '')
match = re.search(r"[-+]?\d*\.\d+|\d+", s)
return float(match.group()) if match else 0
def gpt_fine_tuned(self, item: Item) -> float:
if not self.fine_tuned_model_name:
raise ValueError("No fine-tuned model available")
try:
response = self.client.chat.completions.create(
model=self.fine_tuned_model_name,
messages=self.messages_for_testing(item),
seed=42,
max_tokens=7
)
reply = response.choices[0].message.content
return self.get_price(reply)
except Exception as e:
print(f"Prediction error: {e}")
return 0.0
def evaluate_model(self, job_id: str) -> Dict[str, Any]:
try:
job = self.client.fine_tuning.jobs.retrieve(job_id)
self.fine_tuned_model_name = job.fine_tuned_model
if not self.test:
return {"error": "No test items available"}
test_subset = self.test[:min(250, len(self.test))]
actual_size = len(test_subset)
print(f"Testing individual prediction first...")
print(f"Actual price: ${test_subset[0].price}")
predicted_price = self.gpt_fine_tuned(test_subset[0])
print(f"Predicted price: ${predicted_price}")
print(f"Test prompt used:")
print(test_subset[0].test_prompt())
print(f"\nRunning full evaluation with {actual_size} test items...")
test_subset2 = self.test[:actual_size]
tester = Tester(self.gpt_fine_tuned, test_subset2, size=actual_size)
tester.run()
return {
"status": "completed",
"message": "Evaluation completed using Tester class with RMSLE metrics",
"test_items": actual_size,
"model_name": self.fine_tuned_model_name
}
except Exception as e:
return {"error": f"Evaluation failed: {e}"}
def evaluate_existing_model(self, model_name: str) -> Dict[str, Any]:
print("Evaluating existing fine-tuned model...")
self.fine_tuned_model_name = model_name
if not self.test:
return {"error": "No test items available. Load data first."}
print(f"Fine-tuned model: {self.fine_tuned_model_name}")
test_subset = self.test[:min(250, len(self.test))]
actual_size = len(test_subset)
print(f"Testing individual prediction first...")
print(f"Actual price: ${test_subset[0].price}")
predicted_price = self.gpt_fine_tuned(test_subset[0])
print(f"Predicted price: ${predicted_price}")
print(f"Test prompt used:")
print(test_subset[0].test_prompt())
print(f"\nRunning full evaluation with {actual_size} test items...")
test_subset2 = self.test[:actual_size]
tester = Tester(self.gpt_fine_tuned, test_subset2, size=actual_size)
tester.run()
return {
"status": "completed",
"message": "Evaluation completed using Tester class with RMSLE metrics",
"test_items": actual_size,
"model_name": self.fine_tuned_model_name
}
def add_wandb_sync(self, job_id: str) -> None:
try:
import wandb
from wandb.integration.openai.fine_tuning import WandbLogger
wandb_key = os.getenv('WANDB_API_KEY')
if not wandb_key:
print("WANDB_API_KEY not found - skipping W&B sync")
return
print("Setting up Weights and Biases monitoring...")
wandb.login()
WandbLogger.sync(fine_tune_job_id=job_id, project="gpt-pricer")
print("Weights and Biases sync enabled")
except ImportError:
print("wandb not installed - skipping W&B sync")
except Exception as e:
print(f"W&B sync failed: {e}")
def main():
print("Starting Price Prediction Fine-Tuning Process")
print("Based on reference implementation from day5.ipynb")
print("=" * 60)
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
print("OPENAI_API_KEY not found in environment")
print("Set your API key: export OPENAI_API_KEY='your-key-here'")
return
try:
fine_tuner = PricePredictionFineTuner(api_key)
print("\nStep 1: Loading Amazon Reviews 2023 dataset...")
fine_tuner.load_amazon_data("Appliances")
if not fine_tuner.fine_tune_train:
print("No training data available!")
return
print("\nStep 2: Creating JSONL files and uploading...")
train_file_id, validation_file_id = fine_tuner.save_training_files()
print("\nStep 3: Starting fine-tuning job...")
job_id = fine_tuner.start_fine_tuning(train_file_id, validation_file_id)
print("\nStep 4: Setting up Weights and Biases monitoring...")
fine_tuner.add_wandb_sync(job_id)
print("\nStep 5: Monitoring training progress...")
print("This may take several minutes to hours depending on data size...")
model_name = fine_tuner.monitor_training(job_id)
if model_name:
print(f"\nFine-tuning completed! Model: {model_name}")
print("\nStep 6: Evaluating model with Tester class...")
results = fine_tuner.evaluate_model(job_id)
if "error" in results:
print(f"Evaluation failed: {results['error']}")
else:
print(f"{results['message']}")
print(f"Evaluation used {results['test_items']} test items")
print("\nCheck the generated chart for detailed RMSLE metrics!")
print("\nPrice prediction fine-tuning process completed!")
print("\nFollows reference implementation exactly:")
print(" Uses pickle files (train.pkl, test.pkl)")
print(" 500 training examples, 100 validation examples")
print(" 3 epochs with balanced learning rate (0.1)")
print(" Batch size 4 for stable training")
print(" Proper RMSLE evaluation using Tester class")
print(" Weights and Biases integration")
else:
print("\nFine-tuning failed - check the error messages above")
except Exception as e:
print(f"\nError during fine-tuning process: {e}")
import traceback
traceback.print_exc()
def evaluate_only(model_name: str):
print("=" * 60)
print("EVALUATING EXISTING FINE-TUNED MODEL")
print("=" * 60)
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
print("OPENAI_API_KEY not found in environment")
return
try:
fine_tuner = PricePredictionFineTuner(api_key)
print("\nLoading data...")
fine_tuner.load_amazon_data("Appliances")
print("\nRunning evaluation...")
results = fine_tuner.evaluate_existing_model(model_name)
if "error" in results:
print(f"Evaluation failed: {results['error']}")
else:
print(f"\n{results['message']}")
print(f"Evaluation used {results['test_items']} test items")
print("\nCheck the generated chart for detailed RMSLE metrics!")
except Exception as e:
print(f"\nError during evaluation: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--evaluate":
if len(sys.argv) < 3:
print("Usage: python w6d5.py --evaluate <model_name>")
print("\nExample:")
print(" python w6d5.py --evaluate ft:gpt-4o-mini-2024-07-18:techxelo:pricer-improved:CVIfbqic")
else:
model_name = sys.argv[2]
evaluate_only(model_name)
else:
main()