create community folder

This commit is contained in:
Adriana394
2025-06-03 15:16:50 +02:00
parent 5782ca2b43
commit 41ae0bdc24

View File

@@ -0,0 +1,258 @@
# -*- coding: utf-8 -*-
"""Testing Fine-tuned model with RAG
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1J8P8cwqwhBo3CNIZaEFe6BMRw0WUfEqy
## Predict Product Prices
### And now, to evaluate our fine-tuned open source model
"""
!pip install -q datasets peft requests torch bitsandbytes transformers trl accelerate sentencepiece matplotlib langchain-community chromadb
import os
import re
import math
from google.colab import userdata
from huggingface_hub import login
import torch
import torch.nn.functional as F
from transformers import (
AutoModelForCausalLM, AutoTokenizer,
BitsAndBytesConfig, GenerationConfig)
from datasets import load_dataset
from peft import PeftModel
from sentence_transformers import SentenceTransformer
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
import matplotlib.pyplot as plt
# Commented out IPython magic to ensure Python compatibility.
# Constants
BASE_MODEL = "meta-llama/Llama-3.1-8B"
PROJECT_NAME = "pricer"
HF_USER = "Adriana213"
RUN_NAME = "optim-20250514_061529"
PROJECT_RUN_NAME = f"{PROJECT_NAME}-{RUN_NAME}"
FINETUNED_MODEL = f"{HF_USER}/{PROJECT_RUN_NAME}"
# Data
DATASET_NAME = f"{HF_USER}/pricer-data"
# Hyperparameters for QLoRA
QUANT_4_BIT = True
# %matplotlib inline
# Used for writing to output in color
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
RESET = "\033[0m"
COLOR_MAP = {"red":RED, "orange": YELLOW, "green": GREEN}
"""### Log in to HuggingFace
"""
hf_token = userdata.get('HF_TOKEN')
login(hf_token, add_to_git_credential=True)
dataset = load_dataset(DATASET_NAME)
train = dataset['train']
test = dataset['test']
test[0]
"""## Now load the Tokenizer and Model"""
if QUANT_4_BIT:
quant_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_quant_type="nf4"
)
else:
quant_config = BitsAndBytesConfig(
load_in_8bit=True,
bnb_8bit_compute_dtype=torch.bfloat16
)
# Load the Tokenizer and the Model
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"
base_model = AutoModelForCausalLM.from_pretrained(
BASE_MODEL,
quantization_config=quant_config,
device_map="auto",
)
base_model.generation_config.pad_token_id = tokenizer.pad_token_id
# Load the fine-tuned model with PEFT
fine_tuned_model = PeftModel.from_pretrained(base_model, FINETUNED_MODEL)
print(f"Memory footprint: {fine_tuned_model.get_memory_footprint() / 1e6:.1f} MB")
fine_tuned_model
"""# Evaluation"""
def extract_price(s):
if "Price is $" in s:
contents = s.split("Price is $")[1]
contents = contents.replace(',','')
match = re.search(r"[-+]?\d*\.\d+|\d+", contents)
return float(match.group()) if match else 0
return 0
extract_price("Price is $a fabulous 899.99 or so")
# Original prediction function takes the most likely next token
def model_predict(prompt):
inputs = tokenizer.encode(prompt, return_tensors="pt").to("cuda")
attention_mask = torch.ones(inputs.shape, device="cuda")
outputs = fine_tuned_model.generate(inputs, attention_mask=attention_mask, max_new_tokens=3, num_return_sequences=1)
response = tokenizer.decode(outputs[0])
return extract_price(response)
# top_K = 3
# def improved_model_predict(prompt, device="cuda"):
# set_seed(42)
# inputs = tokenizer.encode(prompt, return_tensors="pt").to(device)
# attention_mask = torch.ones(inputs.shape, device=device)
# with torch.no_grad():
# outputs = fine_tuned_model(inputs, attention_mask=attention_mask)
# next_token_logits = outputs.logits[:, -1, :].to('cpu')
# next_token_probs = F.softmax(next_token_logits, dim=-1)
# top_prob, top_token_id = next_token_probs.topk(top_K)
# prices, weights = [], []
# for i in range(top_K):
# predicted_token = tokenizer.decode(top_token_id[0][i])
# probability = top_prob[0][i]
# try:
# result = float(predicted_token)
# except ValueError as e:
# result = 0.0
# if result > 0:
# prices.append(result)
# weights.append(probability)
# if not prices:
# return 0.0, 0.0
# total = sum(weights)
# weighted_prices = [price * weight / total for price, weight in zip(prices, weights)]
# return sum(weighted_prices).item()
embedder = HuggingFaceEmbeddings(model_name = "all-MiniLM-L6-v2")
chroma = Chroma(
persist_directory = "chroma_train_index",
embedding_function = embedder
)
gen_config = GenerationConfig(max_new_tokens=10, do_sample=False)
def predict_price_rag(desc: str, k: int = 3) -> float:
docs = chroma.similarity_search(desc, k=k)
shots = "\n\n".join(f"Description: {d.page_content}\nPrice is ${d.metadata['price']}"
for d in docs)
prompt = f"{shots}\n\nDescription: {desc}\nPrice is $"
inp = tokenizer(prompt, return_tensors="pt").to(fine_tuned_model.device)
out = fine_tuned_model.generate(**inp, generation_config=gen_config)
txt = tokenizer.decode(out[0, inp["input_ids"].shape[-1]:], skip_special_tokens=True).strip()
return float(re.findall(r"\d+\.?\d+", txt)[0])
class Tester:
def __init__(self, predictor, data, title=None, size=250):
self.predictor = predictor
self.data = data
self.title = title or predictor.__name__.replace("_", " ").title()
self.size = size
self.guesses = []
self.truths = []
self.errors = []
self.sles = []
self.colors = []
def color_for(self, error, truth):
if error<40 or error/truth < 0.2:
return "green"
elif error<80 or error/truth < 0.4:
return "orange"
else:
return "red"
def run_datapoint(self, i):
datapoint = self.data[i]
guess = self.predictor(datapoint["text"])
truth = datapoint["price"]
error = abs(guess - truth)
log_error = math.log(truth+1) - math.log(guess+1)
sle = log_error ** 2
color = self.color_for(error, truth)
title = datapoint["text"].split("\n\n")[1][:20] + "..."
self.guesses.append(guess)
self.truths.append(truth)
self.errors.append(error)
self.sles.append(sle)
self.colors.append(color)
print(f"{COLOR_MAP[color]}{i+1}: Guess: ${guess:,.2f} Truth: ${truth:,.2f} Error: ${error:,.2f} SLE: {sle:,.2f} Item: {title}{RESET}")
def chart(self, title):
max_error = max(self.errors)
plt.figure(figsize=(12, 8))
max_val = max(max(self.truths), max(self.guesses))
plt.plot([0, max_val], [0, max_val], color='deepskyblue', lw=2, alpha=0.6)
plt.scatter(self.truths, self.guesses, s=3, c=self.colors)
plt.xlabel('Ground Truth')
plt.ylabel('Model Estimate')
plt.xlim(0, max_val)
plt.ylim(0, max_val)
plt.title(title)
plt.show()
def report(self):
average_error = sum(self.errors) / self.size
rmsle = math.sqrt(sum(self.sles) / self.size)
hits = sum(1 for color in self.colors if color=="green")
title = f"{self.title} Error=${average_error:,.2f} RMSLE={rmsle:,.2f} Hits={hits/self.size*100:.1f}%"
self.chart(title)
def run(self):
self.error = 0
for i in range(self.size):
self.run_datapoint(i)
self.report()
@classmethod
def test(cls, function, data):
cls(function, data).run()
Tester.test(predict_price_rag, test)