Week8 dkisselev-zz update

This commit is contained in:
Dmitry Kisselev
2025-10-29 02:07:03 -07:00
parent ba929c7ed4
commit d28039e255
81 changed files with 21291 additions and 0 deletions

View File

@@ -0,0 +1,37 @@
"""Utility functions for Tuxedo Link."""
from .deduplication import (
create_fingerprint,
calculate_levenshtein_similarity,
calculate_text_similarity,
)
from .image_utils import generate_image_embedding, calculate_image_similarity
from .log_utils import reformat
from .config import (
get_config,
is_production,
get_db_path,
get_vectordb_path,
get_email_provider,
get_email_config,
get_mailgun_config,
reload_config,
)
__all__ = [
"create_fingerprint",
"calculate_levenshtein_similarity",
"calculate_text_similarity",
"generate_image_embedding",
"calculate_image_similarity",
"reformat",
"get_config",
"is_production",
"get_db_path",
"get_vectordb_path",
"get_email_provider",
"get_email_config",
"get_mailgun_config",
"reload_config",
]

View File

@@ -0,0 +1,174 @@
"""
Breed mapping utilities for cat APIs.
Handles mapping user breed terms to valid API breed values
using dictionary lookups, vector search, and exact matching.
"""
import logging
from typing import List, Optional, Dict
# Mapping of common user terms to API breed values
# These are fuzzy/colloquial terms that users might type
USER_TERM_TO_API_BREED: Dict[str, List[str]] = {
# Common misspellings and variations
"main coon": ["Maine Coon"],
"maine": ["Maine Coon"],
"ragdol": ["Ragdoll"],
"siames": ["Siamese"],
"persian": ["Persian"],
"bengal": ["Bengal"],
"british shorthair": ["British Shorthair"],
"russian blue": ["Russian Blue"],
"sphynx": ["Sphynx"],
"sphinx": ["Sphynx"],
"american shorthair": ["American Shorthair"],
"scottish fold": ["Scottish Fold"],
"abyssinian": ["Abyssinian"],
"birman": ["Birman"],
"burmese": ["Burmese"],
"himalayan": ["Himalayan"],
"norwegian forest": ["Norwegian Forest Cat"],
"norwegian forest cat": ["Norwegian Forest Cat"],
"oriental": ["Oriental"],
"somali": ["Somali"],
"turkish angora": ["Turkish Angora"],
"turkish van": ["Turkish Van"],
# Mixed breeds
"mixed": ["Mixed Breed", "Domestic Short Hair", "Domestic Medium Hair", "Domestic Long Hair"],
"mixed breed": ["Mixed Breed", "Domestic Short Hair", "Domestic Medium Hair", "Domestic Long Hair"],
"domestic": ["Domestic Short Hair", "Domestic Medium Hair", "Domestic Long Hair"],
"dsh": ["Domestic Short Hair"],
"dmh": ["Domestic Medium Hair"],
"dlh": ["Domestic Long Hair"],
"tabby": ["Domestic Short Hair"], # Tabby is a pattern, not a breed
"tuxedo": ["Domestic Short Hair"], # Tuxedo is a color, not a breed
}
def normalize_user_breeds(
user_breeds: List[str],
valid_api_breeds: List[str],
vectordb: Optional[object] = None,
source: str = "petfinder",
similarity_threshold: float = 0.7
) -> List[str]:
"""
Normalize user breed preferences to valid API breed values.
Uses 3-tier strategy:
1. Dictionary lookup for common variations
2. Vector DB semantic search for fuzzy matching
3. Direct string matching as fallback
Args:
user_breeds: List of breed terms provided by the user
valid_api_breeds: List of breeds actually accepted by the API
vectordb: Optional MetadataVectorDB instance for semantic search
source: API source (petfinder/rescuegroups) for vector filtering
similarity_threshold: Minimum similarity score (0-1) for vector matches
Returns:
List of valid API breed strings
"""
if not user_breeds:
return []
normalized_breeds = set()
for user_term in user_breeds:
if not user_term or not user_term.strip():
continue
user_term_lower = user_term.lower().strip()
matched = False
# Tier 1: Dictionary lookup (instant, common variations)
if user_term_lower in USER_TERM_TO_API_BREED:
mapped_breeds = USER_TERM_TO_API_BREED[user_term_lower]
for mapped_breed in mapped_breeds:
if mapped_breed in valid_api_breeds:
normalized_breeds.add(mapped_breed)
matched = True
if matched:
logging.info(f"🎯 Dictionary match: '{user_term}'{list(mapped_breeds)}")
continue
# Tier 2: Vector DB semantic search (fuzzy matching, handles typos)
if vectordb:
try:
matches = vectordb.search_breed(
user_term,
n_results=1,
source_filter=source
)
if matches and matches[0]['similarity'] >= similarity_threshold:
best_match = matches[0]['breed']
similarity = matches[0]['similarity']
if best_match in valid_api_breeds:
normalized_breeds.add(best_match)
logging.info(
f"🔍 Vector match: '{user_term}''{best_match}' "
f"(similarity: {similarity:.2f})"
)
matched = True
continue
except Exception as e:
logging.warning(f"Vector search failed for breed '{user_term}': {e}")
# Tier 3: Direct string matching (exact or substring)
if not matched:
# Try exact match (case-insensitive)
for valid_breed in valid_api_breeds:
if valid_breed.lower() == user_term_lower:
normalized_breeds.add(valid_breed)
logging.info(f"✓ Exact match: '{user_term}''{valid_breed}'")
matched = True
break
# Try substring match if exact didn't work
if not matched:
for valid_breed in valid_api_breeds:
if user_term_lower in valid_breed.lower():
normalized_breeds.add(valid_breed)
logging.info(f"≈ Substring match: '{user_term}''{valid_breed}'")
matched = True
# Log if no match found
if not matched:
logging.warning(
f"⚠️ No breed match found for '{user_term}'. "
f"User will see broader results."
)
result = list(normalized_breeds)
logging.info(f"Breed normalization complete: {user_breeds}{result}")
return result
def get_breed_suggestions(breed_term: str, valid_breeds: List[str], top_n: int = 5) -> List[str]:
"""
Get breed suggestions for autocomplete or error messages.
Args:
breed_term: Partial or misspelled breed name
valid_breeds: List of valid API breed values
top_n: Number of suggestions to return
Returns:
List of suggested breed names
"""
term_lower = breed_term.lower().strip()
suggestions = []
# Find breeds containing the term
for breed in valid_breeds:
if term_lower in breed.lower():
suggestions.append(breed)
return suggestions[:top_n]

View File

@@ -0,0 +1,224 @@
"""
Color mapping utilities for cat APIs.
Handles mapping user color terms to valid API color values
using dictionary lookups, vector search, and exact matching.
"""
import logging
from typing import List, Dict, Optional
# Mapping of common user terms to Petfinder API color values
# Based on actual Petfinder API color list
USER_TERM_TO_API_COLOR: Dict[str, List[str]] = {
# Tuxedo/Bicolor patterns
"tuxedo": ["Black & White / Tuxedo"],
"black and white": ["Black & White / Tuxedo"],
"black & white": ["Black & White / Tuxedo"],
"bicolor": ["Black & White / Tuxedo"], # Most common bicolor
# Solid colors
"black": ["Black"],
"white": ["White"],
# Orange variations
"orange": ["Orange / Red"],
"red": ["Orange / Red"],
"ginger": ["Orange / Red"],
"orange and white": ["Orange & White"],
"orange & white": ["Orange & White"],
# Gray variations
"gray": ["Gray / Blue / Silver"],
"grey": ["Gray / Blue / Silver"],
"silver": ["Gray / Blue / Silver"],
"blue": ["Gray / Blue / Silver"],
"gray and white": ["Gray & White"],
"grey and white": ["Gray & White"],
# Brown/Chocolate
"brown": ["Brown / Chocolate"],
"chocolate": ["Brown / Chocolate"],
# Cream/Ivory
"cream": ["Cream / Ivory"],
"ivory": ["Cream / Ivory"],
"buff": ["Buff / Tan / Fawn"],
"tan": ["Buff / Tan / Fawn"],
"fawn": ["Buff / Tan / Fawn"],
# Patterns
"calico": ["Calico"],
"dilute calico": ["Dilute Calico"],
"tortoiseshell": ["Tortoiseshell"],
"tortie": ["Tortoiseshell"],
"dilute tortoiseshell": ["Dilute Tortoiseshell"],
"torbie": ["Torbie"],
# Tabby patterns
"tabby": ["Tabby (Brown / Chocolate)", "Tabby (Gray / Blue / Silver)", "Tabby (Orange / Red)"],
"brown tabby": ["Tabby (Brown / Chocolate)"],
"gray tabby": ["Tabby (Gray / Blue / Silver)"],
"grey tabby": ["Tabby (Gray / Blue / Silver)"],
"orange tabby": ["Tabby (Orange / Red)"],
"red tabby": ["Tabby (Orange / Red)"],
"tiger": ["Tabby (Tiger Striped)"],
"tiger striped": ["Tabby (Tiger Striped)"],
"leopard": ["Tabby (Leopard / Spotted)"],
"spotted": ["Tabby (Leopard / Spotted)"],
# Point colors (Siamese-type)
"blue point": ["Blue Point"],
"chocolate point": ["Chocolate Point"],
"cream point": ["Cream Point"],
"flame point": ["Flame Point"],
"lilac point": ["Lilac Point"],
"seal point": ["Seal Point"],
# Other
"smoke": ["Smoke"],
"blue cream": ["Blue Cream"],
}
def normalize_user_colors(
user_colors: List[str],
valid_api_colors: List[str],
vectordb: Optional[object] = None,
source: str = "petfinder",
similarity_threshold: float = 0.7
) -> List[str]:
"""
Normalize user color preferences to valid API color values.
Uses 3-tier strategy:
1. Dictionary lookup for common color terms
2. Vector DB semantic search for fuzzy matching
3. Direct string matching as fallback
Args:
user_colors: List of color terms provided by the user
valid_api_colors: List of colors actually accepted by the API
vectordb: Optional MetadataVectorDB instance for semantic search
source: API source (petfinder/rescuegroups) for vector filtering
similarity_threshold: Minimum similarity score (0-1) for vector matches
Returns:
List of valid API color strings
"""
if not user_colors:
return []
normalized_colors = set()
for user_term in user_colors:
if not user_term or not user_term.strip():
continue
user_term_lower = user_term.lower().strip()
matched = False
# Tier 1: Dictionary lookup (instant, common color terms)
if user_term_lower in USER_TERM_TO_API_COLOR:
mapped_colors = USER_TERM_TO_API_COLOR[user_term_lower]
for mapped_color in mapped_colors:
if mapped_color in valid_api_colors:
normalized_colors.add(mapped_color)
matched = True
if matched:
logging.info(f"🎯 Dictionary match: '{user_term}'{list(mapped_colors)}")
continue
# Tier 2: Vector DB semantic search (fuzzy matching, handles typos)
if vectordb:
try:
matches = vectordb.search_color(
user_term,
n_results=1,
source_filter=source
)
if matches and matches[0]['similarity'] >= similarity_threshold:
best_match = matches[0]['color']
similarity = matches[0]['similarity']
if best_match in valid_api_colors:
normalized_colors.add(best_match)
logging.info(
f"🔍 Vector match: '{user_term}''{best_match}' "
f"(similarity: {similarity:.2f})"
)
matched = True
continue
except Exception as e:
logging.warning(f"Vector search failed for color '{user_term}': {e}")
# Tier 3: Direct string matching (exact or substring)
if not matched:
# Try exact match (case-insensitive)
for valid_color in valid_api_colors:
if valid_color.lower() == user_term_lower:
normalized_colors.add(valid_color)
logging.info(f"✓ Exact match: '{user_term}''{valid_color}'")
matched = True
break
# Try substring match if exact didn't work
if not matched:
for valid_color in valid_api_colors:
if user_term_lower in valid_color.lower():
normalized_colors.add(valid_color)
logging.info(f"≈ Substring match: '{user_term}''{valid_color}'")
matched = True
# Log if no match found
if not matched:
logging.warning(
f"⚠️ No color match found for '{user_term}'. "
f"User will see broader results."
)
result = list(normalized_colors)
logging.info(f"Color normalization complete: {user_colors}{result}")
return result
def get_color_suggestions(color_term: str, valid_colors: List[str], top_n: int = 5) -> List[str]:
"""
Get color suggestions for autocomplete or error messages.
Args:
color_term: Partial or misspelled color name
valid_colors: List of valid API color values
top_n: Number of suggestions to return
Returns:
List of suggested color names
"""
term_lower = color_term.lower().strip()
suggestions = []
# Find colors containing the term
for color in valid_colors:
if term_lower in color.lower():
suggestions.append(color)
return suggestions[:top_n]
def get_color_help_text(valid_colors: List[str]) -> str:
"""
Generate help text for valid colors.
Args:
valid_colors: List of valid API colors
Returns:
Formatted string describing valid colors
"""
if not valid_colors:
return "No color information available."
return f"Valid colors: {', '.join(valid_colors)}"

View File

@@ -0,0 +1,134 @@
"""Configuration management for Tuxedo Link."""
import yaml
import os
from pathlib import Path
from typing import Dict, Any
_config_cache: Dict[str, Any] = None
def load_config() -> Dict[str, Any]:
"""
Load configuration from YAML with environment variable overrides.
Returns:
Dict[str, Any]: Configuration dictionary
"""
global _config_cache
if _config_cache:
return _config_cache
# Determine config path - look for config.yaml, fallback to example
project_root = Path(__file__).parent.parent
config_path = project_root / "config.yaml"
if not config_path.exists():
config_path = project_root / "config.example.yaml"
if not config_path.exists():
raise FileNotFoundError(
"No config.yaml or config.example.yaml found. "
"Please copy config.example.yaml to config.yaml and configure it."
)
# Load YAML
with open(config_path) as f:
config = yaml.safe_load(f)
# Override with environment variables if present
if 'EMAIL_PROVIDER' in os.environ:
config['email']['provider'] = os.environ['EMAIL_PROVIDER']
if 'DEPLOYMENT_MODE' in os.environ:
config['deployment']['mode'] = os.environ['DEPLOYMENT_MODE']
if 'MAILGUN_DOMAIN' in os.environ:
config['mailgun']['domain'] = os.environ['MAILGUN_DOMAIN']
_config_cache = config
return config
def get_config() -> Dict[str, Any]:
"""
Get current configuration.
Returns:
Dict[str, Any]: Configuration dictionary
"""
return load_config()
def is_production() -> bool:
"""
Check if running in production mode.
Returns:
bool: True if production mode, False if local
"""
return get_config()['deployment']['mode'] == 'production'
def get_db_path() -> str:
"""
Get database path based on deployment mode.
Returns:
str: Path to database file
"""
config = get_config()
mode = config['deployment']['mode']
return config['deployment'][mode]['db_path']
def get_vectordb_path() -> str:
"""
Get vector database path based on deployment mode.
Returns:
str: Path to vector database directory
"""
config = get_config()
mode = config['deployment']['mode']
return config['deployment'][mode]['vectordb_path']
def get_email_provider() -> str:
"""
Get configured email provider.
Returns:
str: Email provider name (mailgun or sendgrid)
"""
return get_config()['email']['provider']
def get_email_config() -> Dict[str, str]:
"""
Get email configuration.
Returns:
Dict[str, str]: Email configuration (from_name, from_email)
"""
return get_config()['email']
def get_mailgun_config() -> Dict[str, str]:
"""
Get Mailgun configuration.
Returns:
Dict[str, str]: Mailgun configuration (domain)
"""
return get_config()['mailgun']
def reload_config() -> None:
"""
Force reload configuration from file.
Useful for testing or when config changes.
"""
global _config_cache
_config_cache = None
load_config()

View File

@@ -0,0 +1,201 @@
"""Deduplication utilities for identifying duplicate cat listings."""
import hashlib
from typing import Tuple
import Levenshtein
from models.cats import Cat
def create_fingerprint(cat: Cat) -> str:
"""
Create a fingerprint for a cat based on stable attributes.
The fingerprint is a hash of:
- Organization name (normalized)
- Breed (normalized)
- Age
- Gender
Args:
cat: Cat object
Returns:
Fingerprint hash (16 characters)
"""
components = [
cat.organization_name.lower().strip(),
cat.breed.lower().strip(),
str(cat.age).lower(),
cat.gender.lower()
]
# Create hash from combined components
combined = '|'.join(components)
hash_obj = hashlib.sha256(combined.encode())
# Return first 16 characters of hex digest
return hash_obj.hexdigest()[:16]
def calculate_levenshtein_similarity(str1: str, str2: str) -> float:
"""
Calculate normalized Levenshtein similarity between two strings.
Similarity = 1 - (distance / max_length)
Args:
str1: First string
str2: Second string
Returns:
Similarity score (0-1, where 1 is identical)
"""
if not str1 or not str2:
return 0.0
# Normalize strings
str1 = str1.lower().strip()
str2 = str2.lower().strip()
# Handle identical strings
if str1 == str2:
return 1.0
# Calculate Levenshtein distance
distance = Levenshtein.distance(str1, str2)
# Normalize by maximum possible distance
max_length = max(len(str1), len(str2))
if max_length == 0:
return 1.0
similarity = 1.0 - (distance / max_length)
return max(0.0, similarity)
def calculate_text_similarity(cat1: Cat, cat2: Cat) -> Tuple[float, float]:
"""
Calculate text similarity between two cats (name and description).
Args:
cat1: First cat
cat2: Second cat
Returns:
Tuple of (name_similarity, description_similarity)
"""
# Name similarity
name_similarity = calculate_levenshtein_similarity(cat1.name, cat2.name)
# Description similarity
desc_similarity = calculate_levenshtein_similarity(
cat1.description,
cat2.description
)
return name_similarity, desc_similarity
def calculate_composite_score(
name_similarity: float,
description_similarity: float,
image_similarity: float,
name_weight: float = 0.4,
description_weight: float = 0.3,
image_weight: float = 0.3
) -> float:
"""
Calculate a composite similarity score from multiple signals.
Args:
name_similarity: Name similarity (0-1)
description_similarity: Description similarity (0-1)
image_similarity: Image similarity (0-1)
name_weight: Weight for name similarity
description_weight: Weight for description similarity
image_weight: Weight for image similarity
Returns:
Composite score (0-1)
"""
# Ensure weights sum to 1
total_weight = name_weight + description_weight + image_weight
if total_weight == 0:
return 0.0
# Normalize weights
name_weight /= total_weight
description_weight /= total_weight
image_weight /= total_weight
# Calculate weighted score
score = (
name_similarity * name_weight +
description_similarity * description_weight +
image_similarity * image_weight
)
return score
def normalize_string(s: str) -> str:
"""
Normalize a string for comparison.
- Convert to lowercase
- Strip whitespace
- Remove extra spaces
Args:
s: String to normalize
Returns:
Normalized string
"""
import re
s = s.lower().strip()
s = re.sub(r'\s+', ' ', s) # Replace multiple spaces with single space
return s
def calculate_breed_similarity(breed1: str, breed2: str) -> float:
"""
Calculate breed similarity with special handling for mixed breeds.
Args:
breed1: First breed
breed2: Second breed
Returns:
Similarity score (0-1)
"""
breed1_norm = normalize_string(breed1)
breed2_norm = normalize_string(breed2)
# Exact match
if breed1_norm == breed2_norm:
return 1.0
# Check if both are domestic shorthair/longhair (very common)
domestic_variants = ['domestic short hair', 'domestic shorthair', 'dsh',
'domestic long hair', 'domestic longhair', 'dlh',
'domestic medium hair', 'domestic mediumhair', 'dmh']
if breed1_norm in domestic_variants and breed2_norm in domestic_variants:
return 0.9 # High similarity for domestic cats
# Check for mix/mixed keywords
mix_keywords = ['mix', 'mixed', 'tabby']
breed1_has_mix = any(keyword in breed1_norm for keyword in mix_keywords)
breed2_has_mix = any(keyword in breed2_norm for keyword in mix_keywords)
if breed1_has_mix and breed2_has_mix:
# Both are mixes, higher tolerance
return calculate_levenshtein_similarity(breed1, breed2) * 0.9
# Standard Levenshtein similarity
return calculate_levenshtein_similarity(breed1, breed2)

View File

@@ -0,0 +1,161 @@
"""Geocoding utilities for location services."""
import requests
from typing import Optional, Tuple
def geocode_location(location: str) -> Optional[Tuple[float, float]]:
"""
Convert a location string (address, city, or ZIP) to latitude/longitude.
Uses the free Nominatim API (OpenStreetMap).
Args:
location: Location string (address, city, ZIP code, etc.)
Returns:
Tuple of (latitude, longitude) or None if geocoding fails
"""
try:
# Use Nominatim API (free, no API key required)
url = "https://nominatim.openstreetmap.org/search"
params = {
'q': location,
'format': 'json',
'limit': 1,
'countrycodes': 'us,ca' # Limit to US and Canada
}
headers = {
'User-Agent': 'TuxedoLink/1.0' # Required by Nominatim
}
response = requests.get(url, params=params, headers=headers, timeout=10)
response.raise_for_status()
results = response.json()
if results and len(results) > 0:
lat = float(results[0]['lat'])
lon = float(results[0]['lon'])
return lat, lon
return None
except Exception as e:
print(f"Geocoding failed for '{location}': {e}")
return None
def reverse_geocode(latitude: float, longitude: float) -> Optional[dict]:
"""
Convert latitude/longitude to address information.
Args:
latitude: Latitude
longitude: Longitude
Returns:
Dictionary with address components or None if failed
"""
try:
url = "https://nominatim.openstreetmap.org/reverse"
params = {
'lat': latitude,
'lon': longitude,
'format': 'json'
}
headers = {
'User-Agent': 'TuxedoLink/1.0'
}
response = requests.get(url, params=params, headers=headers, timeout=10)
response.raise_for_status()
result = response.json()
if 'address' in result:
address = result['address']
return {
'city': address.get('city', address.get('town', address.get('village', ''))),
'state': address.get('state', ''),
'zip': address.get('postcode', ''),
'country': address.get('country', ''),
'display_name': result.get('display_name', '')
}
return None
except Exception as e:
print(f"Reverse geocoding failed for ({latitude}, {longitude}): {e}")
return None
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Calculate the great circle distance between two points in miles.
Uses the Haversine formula.
Args:
lat1: Latitude of first point
lon1: Longitude of first point
lat2: Latitude of second point
lon2: Longitude of second point
Returns:
Distance in miles
"""
from math import radians, sin, cos, sqrt, atan2
# Earth's radius in miles
R = 3959.0
# Convert to radians
lat1_rad = radians(lat1)
lon1_rad = radians(lon1)
lat2_rad = radians(lat2)
lon2_rad = radians(lon2)
# Differences
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
# Haversine formula
a = sin(dlat/2)**2 + cos(lat1_rad) * cos(lat2_rad) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
distance = R * c
return distance
def parse_location_input(location_input: str) -> Optional[Tuple[float, float]]:
"""
Parse location input that might be coordinates or an address.
Handles formats:
- "lat,long" (e.g., "40.7128,-74.0060")
- ZIP code (e.g., "10001")
- City, State (e.g., "New York, NY")
- Full address
Args:
location_input: Location string
Returns:
Tuple of (latitude, longitude) or None if parsing fails
"""
# Try to parse as coordinates first
if ',' in location_input:
parts = location_input.split(',')
if len(parts) == 2:
try:
lat = float(parts[0].strip())
lon = float(parts[1].strip())
# Basic validation
if -90 <= lat <= 90 and -180 <= lon <= 180:
return lat, lon
except ValueError:
pass # Not coordinates, try geocoding
# Fall back to geocoding
return geocode_location(location_input)

View File

@@ -0,0 +1,168 @@
"""Image utilities for generating and comparing image embeddings."""
import numpy as np
import requests
from PIL import Image
from io import BytesIO
from typing import Optional
import open_clip
import torch
class ImageEmbeddingGenerator:
"""Generate image embeddings using CLIP model."""
def __init__(self, model_name: str = 'ViT-B-32', pretrained: str = 'openai'):
"""
Initialize the embedding generator.
Args:
model_name: CLIP model architecture
pretrained: Pretrained weights to use
"""
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model, _, self.preprocess = open_clip.create_model_and_transforms(
model_name,
pretrained=pretrained,
device=self.device
)
self.model.eval()
def download_image(self, url: str, timeout: int = 10) -> Optional[Image.Image]:
"""
Download an image from a URL.
Args:
url: Image URL
timeout: Request timeout in seconds
Returns:
PIL Image or None if download fails
"""
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
img = Image.open(BytesIO(response.content))
return img.convert('RGB') # Ensure RGB format
except Exception as e:
print(f"Failed to download image from {url}: {e}")
return None
def generate_embedding(self, image: Image.Image) -> np.ndarray:
"""
Generate CLIP embedding for an image.
Args:
image: PIL Image
Returns:
Numpy array of image embedding
"""
with torch.no_grad():
image_input = self.preprocess(image).unsqueeze(0).to(self.device)
image_features = self.model.encode_image(image_input)
# Normalize embedding
image_features = image_features / image_features.norm(dim=-1, keepdim=True)
# Convert to numpy
embedding = image_features.cpu().numpy().flatten()
return embedding.astype(np.float32)
def generate_embedding_from_url(self, url: str) -> Optional[np.ndarray]:
"""
Download an image and generate its embedding.
Args:
url: Image URL
Returns:
Numpy array of image embedding or None if failed
"""
image = self.download_image(url)
if image is None:
return None
return self.generate_embedding(image)
# Global instance (lazy loaded)
_embedding_generator: Optional[ImageEmbeddingGenerator] = None
def get_embedding_generator() -> ImageEmbeddingGenerator:
"""Get or create the global embedding generator instance."""
global _embedding_generator
if _embedding_generator is None:
_embedding_generator = ImageEmbeddingGenerator()
return _embedding_generator
def generate_image_embedding(image_url: str) -> Optional[np.ndarray]:
"""
Generate an image embedding from a URL.
This is a convenience function that uses the global embedding generator.
Args:
image_url: URL of the image
Returns:
Numpy array of image embedding or None if failed
"""
generator = get_embedding_generator()
return generator.generate_embedding_from_url(image_url)
def calculate_image_similarity(embedding1: np.ndarray, embedding2: np.ndarray) -> float:
"""
Calculate cosine similarity between two image embeddings.
Args:
embedding1: First image embedding
embedding2: Second image embedding
Returns:
Similarity score (0-1, where 1 is most similar)
"""
if embedding1 is None or embedding2 is None:
return 0.0
# Ensure embeddings are normalized
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
if norm1 == 0 or norm2 == 0:
return 0.0
embedding1_norm = embedding1 / norm1
embedding2_norm = embedding2 / norm2
# Cosine similarity
similarity = np.dot(embedding1_norm, embedding2_norm)
# Clip to [0, 1] range (cosine similarity is [-1, 1])
similarity = (similarity + 1) / 2
return float(similarity)
def batch_generate_embeddings(image_urls: list[str]) -> list[Optional[np.ndarray]]:
"""
Generate embeddings for multiple images.
Args:
image_urls: List of image URLs
Returns:
List of embeddings (same length as input, None for failed downloads)
"""
generator = get_embedding_generator()
embeddings = []
for url in image_urls:
embedding = generator.generate_embedding_from_url(url)
embeddings.append(embedding)
return embeddings

View File

@@ -0,0 +1,46 @@
"""Logging utilities for Tuxedo Link."""
# Foreground colors
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
# Background color
BG_BLACK = '\033[40m'
BG_BLUE = '\033[44m'
# Reset code to return to default color
RESET = '\033[0m'
# Mapping of terminal color codes to HTML colors
mapper = {
BG_BLACK+RED: "#dd0000",
BG_BLACK+GREEN: "#00dd00",
BG_BLACK+YELLOW: "#dddd00",
BG_BLACK+BLUE: "#0000ee",
BG_BLACK+MAGENTA: "#aa00dd",
BG_BLACK+CYAN: "#00dddd",
BG_BLACK+WHITE: "#87CEEB",
BG_BLUE+WHITE: "#ff7800"
}
def reformat(message: str) -> str:
"""
Convert terminal color codes to HTML spans for Gradio display.
Args:
message: Log message with terminal color codes
Returns:
HTML formatted message
"""
for key, value in mapper.items():
message = message.replace(key, f'<span style="color: {value}">')
message = message.replace(RESET, '</span>')
return message

View File

@@ -0,0 +1,37 @@
"""Timing utilities for performance monitoring."""
import time
import functools
from typing import Callable, Any
def timed(func: Callable[..., Any]) -> Callable[..., Any]:
"""
Decorator to time function execution and log it.
Args:
func: Function to be timed
Returns:
Wrapped function that logs execution time
Usage:
@timed
def my_function():
...
"""
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
"""Wrapper function that times the execution."""
start_time = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start_time
# Try to log if the object has a log method (Agent classes)
if args and hasattr(args[0], 'log'):
args[0].log(f"{func.__name__} completed in {elapsed:.2f} seconds")
return result
return wrapper