diff --git a/week6/community-contributions/salah/smart_fine_tuner.py b/week6/community-contributions/salah/smart_fine_tuner.py new file mode 100644 index 0000000..72b62fe --- /dev/null +++ b/week6/community-contributions/salah/smart_fine_tuner.py @@ -0,0 +1,269 @@ +import sys +import os +sys.path.append("../..") + +import json +import pickle +import pandas as pd +import numpy as np +from openai import OpenAI +from dotenv import load_dotenv +from huggingface_hub import login +from smart_pricer import SmartPricer, ConfidenceAwareTester +import re +from typing import List, Dict, Tuple +import time + +load_dotenv(override=True) +os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY') +os.environ['HF_TOKEN'] = os.getenv('HF_TOKEN') + +hf_token = os.environ['HF_TOKEN'] +login(hf_token, add_to_git_credential=True) + +class SmartFineTuner: + + def __init__(self, openai_api_key: str = None): + self.client = OpenAI(api_key=openai_api_key or os.getenv('OPENAI_API_KEY')) + self.fine_tuned_model_id = None + + self.training_templates = [ + { + "system": "You are a product pricing expert. Respond only with the price, no explanation.", + "user": "Estimate the price of this product:\n\n{description}\n\nPrice: $", + "weight": 0.4 + }, + { + "system": "You are a retail pricing expert. Consider market positioning and consumer value.", + "user": "What would this product sell for in the market?\n\n{description}\n\nMarket price: $", + "weight": 0.3 + }, + { + "system": "You analyze product features to determine fair pricing.", + "user": "Based on the features and quality described, estimate the price:\n\n{description}\n\nEstimated price: $", + "weight": 0.3 + } + ] + + def prepare_enhanced_training_data(self, train_items: List, template_mix: bool = True) -> List[Dict]: + training_data = [] + + for item in train_items: + description = self._get_clean_description(item) + + if len(description.strip()) < 20: + continue + + if hasattr(item, 'price'): + price = item.price + else: + price = item.get('price', 0) + + if price <= 0: + continue + + templates_to_use = self.training_templates if template_mix else [self.training_templates[0]] + + for template in templates_to_use: + if template_mix and np.random.random() > template['weight']: + continue + + user_prompt = template['user'].format(description=description) + + messages = [ + {"role": "system", "content": template['system']}, + {"role": "user", "content": user_prompt}, + {"role": "assistant", "content": f"{price:.2f}"} + ] + + training_data.append({"messages": messages}) + + return training_data + + def _get_clean_description(self, item) -> str: + if hasattr(item, 'test_prompt'): + prompt = item.test_prompt() + clean = prompt.replace(" to the nearest dollar", "") + clean = clean.replace("\n\nPrice is $", "") + clean = re.sub(r'\$\d+\.?\d*', '', clean) + clean = re.sub(r'\d+\.?\d*\s*dollars?', '', clean) + return clean.strip() + else: + parts = [] + if 'title' in item and item['title']: + parts.append(f"Title: {item['title']}") + if 'description' in item and item['description']: + parts.append(f"Description: {item['description']}") + if 'features' in item and item['features']: + parts.append(f"Features: {item['features']}") + + return '\n'.join(parts) + + def create_training_files(self, train_items: List, val_items: List, + enhanced: bool = True) -> Tuple[str, str]: + train_data = self.prepare_enhanced_training_data(train_items, template_mix=enhanced) + val_data = self.prepare_enhanced_training_data(val_items, template_mix=False) + + print(f"Prepared {len(train_data)} training examples") + print(f"Prepared {len(val_data)} validation examples") + + train_file = "smart_pricer_train.jsonl" + val_file = "smart_pricer_validation.jsonl" + + with open(train_file, 'w') as f: + for example in train_data: + f.write(json.dumps(example) + '\n') + + with open(val_file, 'w') as f: + for example in val_data: + f.write(json.dumps(example) + '\n') + + return train_file, val_file + + def start_fine_tuning(self, train_file: str, val_file: str, + model: str = "gpt-4o-mini-2024-07-18", + epochs: int = 1) -> str: + print(f"Starting fine-tuning with enhanced training data...") + + with open(train_file, 'rb') as f: + train_file_obj = self.client.files.create(file=f, purpose="fine-tune") + + with open(val_file, 'rb') as f: + val_file_obj = self.client.files.create(file=f, purpose="fine-tune") + + print(f"Uploaded training file: {train_file_obj.id}") + print(f"Uploaded validation file: {val_file_obj.id}") + + job = self.client.fine_tuning.jobs.create( + training_file=train_file_obj.id, + validation_file=val_file_obj.id, + model=model, + hyperparameters={"n_epochs": epochs}, + suffix="smart_pricer" + ) + + self.fine_tuned_model_id = job.id + print(f"Fine-tuning job created: {job.id}") + + return job.id + + def check_job_status(self, job_id: str) -> Dict: + job = self.client.fine_tuning.jobs.retrieve(job_id) + return { + 'status': job.status, + 'model': job.fine_tuned_model, + 'created_at': job.created_at, + 'finished_at': job.finished_at + } + + def evaluate_fine_tuned_model(self, test_data: List, job_id: str) -> Dict: + job_info = self.check_job_status(job_id) + + if job_info['status'] != 'succeeded': + print(f"Job not completed yet. Status: {job_info['status']}") + return {} + + fine_tuned_model = job_info['model'] + print(f"Evaluating fine-tuned model: {fine_tuned_model}") + + pricer = SmartPricer(fine_tuned_model=fine_tuned_model) + + tester = ConfidenceAwareTester( + pricer, + test_data[:100], + title=f"Fine-tuned Smart Pricer ({fine_tuned_model})", + size=100 + ) + + results = tester.run_enhanced_test() + + if results: + avg_error = np.mean([r['error'] for r in results]) + avg_confidence = np.mean([r['confidence'] for r in results]) + high_conf_results = [r for r in results if r['confidence'] > 0.7] + high_conf_error = np.mean([r['error'] for r in high_conf_results]) if high_conf_results else float('inf') + + summary = { + 'model_id': fine_tuned_model, + 'total_predictions': len(results), + 'average_error': avg_error, + 'average_confidence': avg_confidence, + 'high_confidence_count': len(high_conf_results), + 'high_confidence_error': high_conf_error, + 'job_id': job_id + } + + print(f"\nEVALUATION SUMMARY:") + print(f"Average Error: ${avg_error:.2f}") + print(f"Average Confidence: {avg_confidence:.2f}") + print(f"High Confidence Predictions: {len(high_conf_results)}") + print(f"High Confidence Error: ${high_conf_error:.2f}") + + return summary + + return {} + +def quick_fine_tune_demo(train_size: int = 200, val_size: int = 50): + print("Smart Pricer Fine-Tuning Demo") + print("=" * 50) + + try: + with open('train.pkl', 'rb') as file: + train_data = pickle.load(file) + with open('test.pkl', 'rb') as file: + test_data = pickle.load(file) + print(f"Loaded training data: {len(train_data)} items") + print(f"Loaded test data: {len(test_data)} items") + except FileNotFoundError: + print("Training data not found. Make sure train.pkl and test.pkl are in current directory.") + return + + train_items = train_data[:train_size] + val_items = train_data[train_size:train_size + val_size] + + print(f"Using {len(train_items)} training items, {len(val_items)} validation items") + + fine_tuner = SmartFineTuner() + + train_file, val_file = fine_tuner.create_training_files( + train_items, val_items, enhanced=True + ) + + print(f"Created training files: {train_file}, {val_file}") + + print(f"\nTo start fine-tuning, uncomment the following lines:") + print(f"job_id = fine_tuner.start_fine_tuning('{train_file}', '{val_file}')") + print(f"# Wait for job to complete...") + print(f"# results = fine_tuner.evaluate_fine_tuned_model(test_data, job_id)") + + print(f"\nDemo with base model (no fine-tuning):") + pricer = SmartPricer() + tester = ConfidenceAwareTester(pricer, test_data[:25], size=25) + tester.run_enhanced_test() + +def main(): + import argparse + + parser = argparse.ArgumentParser(description='Smart Pricer Fine-Tuning') + parser.add_argument('--demo', action='store_true', help='Run demo mode') + parser.add_argument('--train-size', type=int, default=200, help='Training set size') + parser.add_argument('--val-size', type=int, default=50, help='Validation set size') + parser.add_argument('--evaluate', type=str, help='Evaluate existing model by job ID') + + args = parser.parse_args() + + if args.demo: + quick_fine_tune_demo(args.train_size, args.val_size) + elif args.evaluate: + try: + with open('test.pkl', 'rb') as file: + test_data = pickle.load(file) + fine_tuner = SmartFineTuner() + fine_tuner.evaluate_fine_tuned_model(test_data, args.evaluate) + except FileNotFoundError: + print("Test data not found. Make sure test.pkl is in current directory.") + else: + print("Use --demo to run demo or --evaluate to evaluate existing model") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/week6/community-contributions/salah/smart_pricer.py b/week6/community-contributions/salah/smart_pricer.py new file mode 100644 index 0000000..f158633 --- /dev/null +++ b/week6/community-contributions/salah/smart_pricer.py @@ -0,0 +1,384 @@ +import sys +import os +sys.path.append("../..") + +import pickle +import json +import re +import numpy as np +import pandas as pd +from openai import OpenAI +from dotenv import load_dotenv +from huggingface_hub import login +import matplotlib.pyplot as plt +import math +from typing import List, Tuple, Dict +from dataclasses import dataclass +from collections import defaultdict +import time + +load_dotenv(override=True) +os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY') +os.environ['HF_TOKEN'] = os.getenv('HF_TOKEN') + +hf_token = os.environ['HF_TOKEN'] +login(hf_token, add_to_git_credential=True) + +from items import Item +from testing import Tester + +GREEN = "\033[92m" +YELLOW = "\033[93m" +RED = "\033[91m" +BLUE = "\033[94m" +RESET = "\033[0m" +COLOR_MAP = {"red": RED, "orange": YELLOW, "green": GREEN, "blue": BLUE} + + +@dataclass +class ConfidentPrediction: + predicted_price: float + confidence_score: float + price_range: Tuple[float, float] + prediction_details: Dict + risk_level: str + + +class SmartPricer: + + def __init__(self, openai_api_key: str = None, fine_tuned_model: str = None): + self.client = OpenAI(api_key=openai_api_key or os.getenv('OPENAI_API_KEY')) + self.fine_tuned_model = fine_tuned_model or "gpt-4o-mini-2024-07-18" + + self.prompt_strategies = { + "direct": self._create_direct_prompt, + "comparative": self._create_comparative_prompt, + "detailed": self._create_detailed_prompt, + "market_based": self._create_market_prompt + } + + self.price_patterns = [ + r'\$?(\d+\.?\d{0,2})', + r'(\d+\.?\d{0,2})\s*dollars?', + r'price.*?(\d+\.?\d{0,2})', + r'(\d+\.?\d{0,2})\s*USD' + ] + + def _create_direct_prompt(self, item) -> str: + description = self._get_clean_description(item) + return f"""Estimate the price of this product. Respond only with the price number. + +Product: {description} + +Price: $""" + + def _create_comparative_prompt(self, item) -> str: + description = self._get_clean_description(item) + return f"""You are pricing this product compared to similar items in the market. +Consider quality, features, and typical market prices. + +Product: {description} + +Based on market comparison, the price should be: $""" + + def _create_detailed_prompt(self, item) -> str: + description = self._get_clean_description(item) + return f"""Analyze this product and estimate its price by considering: +1. Materials and build quality +2. Brand positioning +3. Features and functionality +4. Target market + +Product: {description} + +Estimated price: $""" + + def _create_market_prompt(self, item) -> str: + description = self._get_clean_description(item) + return f"""As a retail pricing expert, what would this product sell for? +Consider production costs, markup, and consumer willingness to pay. + +Product: {description} + +Retail price: $""" + + def _get_clean_description(self, item) -> str: + if hasattr(item, 'test_prompt'): + prompt = item.test_prompt() + clean = prompt.replace(" to the nearest dollar", "") + clean = clean.replace("\n\nPrice is $", "") + return clean.strip() + else: + parts = [] + if 'title' in item: + parts.append(f"Title: {item['title']}") + if 'description' in item: + parts.append(f"Description: {item['description']}") + if 'features' in item: + parts.append(f"Features: {item['features']}") + return '\n'.join(parts) + + def _extract_price(self, response: str) -> float: + if not response: + return 0.0 + + clean_response = response.replace('$', '').replace(',', '').strip() + + try: + numbers = re.findall(r'\d+\.?\d{0,2}', clean_response) + if numbers: + return float(numbers[0]) + except: + pass + + return 0.0 + + def _get_single_prediction(self, item, strategy_name: str) -> float: + try: + prompt_func = self.prompt_strategies[strategy_name] + prompt = prompt_func(item) + + response = self.client.chat.completions.create( + model=self.fine_tuned_model, + messages=[ + {"role": "system", "content": "You are a product pricing expert. Respond only with a price number."}, + {"role": "user", "content": prompt} + ], + max_tokens=10, + temperature=0.1 + ) + + price = self._extract_price(response.choices[0].message.content) + return max(0.0, price) + + except Exception as e: + print(f"Error in {strategy_name} prediction: {e}") + return 0.0 + + def predict_with_confidence(self, item) -> ConfidentPrediction: + predictions = {} + for strategy_name in self.prompt_strategies: + pred = self._get_single_prediction(item, strategy_name) + if pred > 0: + predictions[strategy_name] = pred + + if not predictions: + return ConfidentPrediction( + predicted_price=50.0, + confidence_score=0.1, + price_range=(10.0, 100.0), + prediction_details={"fallback": 50.0}, + risk_level="high" + ) + + prices = list(predictions.values()) + mean_price = np.mean(prices) + std_price = np.std(prices) + min_price = min(prices) + max_price = max(prices) + + if len(prices) == 1: + confidence = 0.5 + else: + coefficient_of_variation = std_price / mean_price if mean_price > 0 else 1.0 + confidence = max(0.1, min(1.0, 1.0 - coefficient_of_variation)) + + if confidence > 0.8: + range_factor = 0.1 + elif confidence > 0.5: + range_factor = 0.2 + else: + range_factor = 0.4 + + price_range = ( + max(0.5, mean_price * (1 - range_factor)), + mean_price * (1 + range_factor) + ) + + if confidence > 0.7: + risk_level = "low" + elif confidence > 0.4: + risk_level = "medium" + else: + risk_level = "high" + + return ConfidentPrediction( + predicted_price=mean_price, + confidence_score=confidence, + price_range=price_range, + prediction_details=predictions, + risk_level=risk_level + ) + + def simple_predict(self, item) -> float: + confident_pred = self.predict_with_confidence(item) + return confident_pred.predicted_price + + +class ConfidenceAwareTester: + + def __init__(self, predictor, data, title="Smart Pricer with Confidence", size=250): + self.predictor = predictor + self.data = data + self.title = title + self.size = size + self.results = [] + self.confidence_stats = defaultdict(list) + + def color_for_confidence(self, confidence: float) -> str: + if confidence > 0.7: + return "blue" + elif confidence > 0.4: + return "green" + else: + return "orange" + + def run_enhanced_test(self): + print(f"\n{self.title}") + print("=" * 60) + + for i in range(min(self.size, len(self.data))): + item = self.data[i] + + if hasattr(self.predictor, 'predict_with_confidence'): + confident_pred = self.predictor.predict_with_confidence(item) + guess = confident_pred.predicted_price + confidence = confident_pred.confidence_score + price_range = confident_pred.price_range + risk_level = confident_pred.risk_level + else: + guess = self.predictor(item) + confidence = 0.5 + price_range = (guess * 0.8, guess * 1.2) + risk_level = "medium" + + if hasattr(item, 'price'): + truth = item.price + title = item.title[:40] + "..." if len(item.title) > 40 else item.title + else: + truth = item.get('price', 0) + title = item.get('title', 'Unknown')[:40] + "..." + + error = abs(guess - truth) + in_range = price_range[0] <= truth <= price_range[1] + + self.results.append({ + 'guess': guess, + 'truth': truth, + 'error': error, + 'confidence': confidence, + 'in_range': in_range, + 'risk_level': risk_level, + 'title': title + }) + + self.confidence_stats[risk_level].append(error) + + color = self.color_for_confidence(confidence) + range_indicator = "+" if in_range else "-" + + print(f"{COLOR_MAP[color]}{i+1:3d}: ${guess:6.2f} ({confidence*100:4.1f}%) " + f"vs ${truth:6.2f} | Error: ${error:5.2f} | {range_indicator} | {title}{RESET}") + + self._print_confidence_summary() + self._create_confidence_visualization() + + def _print_confidence_summary(self): + if not self.results: + return + + print(f"\nPERFORMANCE SUMMARY") + print("=" * 60) + + total_predictions = len(self.results) + avg_confidence = np.mean([r['confidence'] for r in self.results]) + avg_error = np.mean([r['error'] for r in self.results]) + range_accuracy = np.mean([r['in_range'] for r in self.results]) * 100 + + print(f"Total Predictions: {total_predictions}") + print(f"Average Confidence: {avg_confidence:.2f}") + print(f"Average Error: ${avg_error:.2f}") + print(f"Range Accuracy: {range_accuracy:.1f}%") + + print(f"\nBY RISK LEVEL:") + for risk_level in ['low', 'medium', 'high']: + if risk_level in self.confidence_stats: + errors = self.confidence_stats[risk_level] + count = len(errors) + avg_error = np.mean(errors) + print(f" {risk_level.upper():6} risk: {count:3d} predictions, ${avg_error:6.2f} avg error") + + high_conf_results = [r for r in self.results if r['confidence'] > 0.7] + if high_conf_results: + high_conf_error = np.mean([r['error'] for r in high_conf_results]) + high_conf_accuracy = np.mean([r['in_range'] for r in high_conf_results]) * 100 + print(f"\nHIGH CONFIDENCE PREDICTIONS (>0.7):") + print(f" Count: {len(high_conf_results)}") + print(f" Average Error: ${high_conf_error:.2f}") + print(f" Range Accuracy: {high_conf_accuracy:.1f}%") + + def _create_confidence_visualization(self): + if not self.results: + return + + confidences = [r['confidence'] for r in self.results] + errors = [r['error'] for r in self.results] + + plt.figure(figsize=(12, 5)) + + plt.subplot(1, 2, 1) + plt.scatter(confidences, errors, alpha=0.6, c=confidences, cmap='RdYlBu') + plt.xlabel('Confidence Score') + plt.ylabel('Prediction Error ($)') + plt.title('Confidence vs Prediction Error') + plt.colorbar(label='Confidence') + + plt.subplot(1, 2, 2) + plt.hist(confidences, bins=20, alpha=0.7, color='skyblue', edgecolor='black') + plt.xlabel('Confidence Score') + plt.ylabel('Count') + plt.title('Distribution of Confidence Scores') + + plt.tight_layout() + plt.show() + + +def create_smart_pricer_function(fine_tuned_model_id: str = None): + pricer = SmartPricer(fine_tuned_model=fine_tuned_model_id) + return pricer.simple_predict + + +def test_smart_pricer_with_confidence(test_data, fine_tuned_model_id: str = None): + pricer = SmartPricer(fine_tuned_model=fine_tuned_model_id) + tester = ConfidenceAwareTester(pricer, test_data) + tester.run_enhanced_test() + return tester.results + + +def main(): + print("Smart Product Pricer with Confidence Scoring") + print("=" * 60) + + try: + with open('test.pkl', 'rb') as file: + test_data = pickle.load(file) + print(f"Loaded {len(test_data)} test items") + except FileNotFoundError: + print("Test data not found. Make sure test.pkl is in current directory.") + return + + pricer = SmartPricer() + + print(f"\nTesting with confidence analysis (50 items)...") + test_data_sample = test_data[:50] + + tester = ConfidenceAwareTester(pricer, test_data_sample, size=50) + tester.run_enhanced_test() + + print(f"\nComparison with traditional testing:") + simple_pricer = create_smart_pricer_function() + Tester.test(simple_pricer, test_data_sample[:25]) + + +if __name__ == "__main__": + main()