"""Utility helpers for ReputationRadar services.""" from __future__ import annotations import json import logging import os import random import re from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, TypedDict from bs4 import BeautifulSoup from fuzzywuzzy import fuzz LOG_FILE = Path(__file__).resolve().parents[1] / "logs" / "app.log" MIN_TEXT_LENGTH = 15 SIMILARITY_THRESHOLD = 90 class NormalizedItem(TypedDict): """Canonical representation of a fetched mention.""" source: str id: str url: str author: Optional[str] timestamp: datetime text: str meta: Dict[str, object] class ServiceError(RuntimeError): """Raised when a service hard fails.""" class ServiceWarning(RuntimeError): """Raised for recoverable issues that should surface to the UI.""" def initialize_logger(name: str = "reputation_radar") -> logging.Logger: """Configure and return a module-level logger.""" LOG_FILE.parent.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", handlers=[ logging.FileHandler(LOG_FILE, encoding="utf-8"), logging.StreamHandler(), ], ) logger = logging.getLogger(name) logger.setLevel(logging.INFO) return logger def load_sample_items(name: str) -> List[NormalizedItem]: """Load demo data from the samples directory.""" samples_dir = Path(__file__).resolve().parents[1] / "samples" sample_path = samples_dir / f"{name}.json" if not sample_path.exists(): return [] with sample_path.open("r", encoding="utf-8") as handle: raw_items = json.load(handle) cleaned: List[NormalizedItem] = [] for item in raw_items: try: cleaned.append( NormalizedItem( source=item["source"], id=str(item["id"]), url=item.get("url", ""), author=item.get("author"), timestamp=datetime.fromisoformat(item["timestamp"]), text=item["text"], meta=item.get("meta", {}), ) ) except (KeyError, ValueError): continue return cleaned def strip_html(value: str) -> str: """Remove HTML tags and normalize whitespace.""" if not value: return "" soup = BeautifulSoup(value, "html.parser") text = soup.get_text(separator=" ", strip=True) text = re.sub(r"\s+", " ", text) text = text.encode("utf-8", "ignore").decode("utf-8", "ignore") return text.strip() def sanitize_text(value: str) -> str: """Clean text and remove excessive noise.""" text = strip_html(value) text = re.sub(r"http\S+", "", text) # drop inline URLs text = re.sub(r"\s{2,}", " ", text) return text.strip() def drop_short_items(items: Iterable[NormalizedItem], minimum_length: int = MIN_TEXT_LENGTH) -> List[NormalizedItem]: """Filter out items that are too short to analyze.""" return [ item for item in items if len(item["text"]) >= minimum_length ] def fuzzy_deduplicate(items: Sequence[NormalizedItem], threshold: int = SIMILARITY_THRESHOLD) -> List[NormalizedItem]: """Remove duplicates based on URL or fuzzy text similarity.""" seen_urls: set[str] = set() deduped: List[NormalizedItem] = [] for item in items: url = item.get("url") or "" text = item.get("text") or "" if url and url in seen_urls: continue duplicate_found = False for existing in deduped: if not text or not existing.get("text"): continue if fuzz.token_set_ratio(text, existing["text"]) >= threshold: duplicate_found = True break if not duplicate_found: deduped.append(item) if url: seen_urls.add(url) return deduped def normalize_items(items: Sequence[NormalizedItem]) -> List[NormalizedItem]: """Apply sanitization, deduplication, and drop noisy entries.""" sanitized: List[NormalizedItem] = [] for item in items: cleaned_text = sanitize_text(item.get("text", "")) if len(cleaned_text) < MIN_TEXT_LENGTH: continue sanitized.append( NormalizedItem( source=item["source"], id=item["id"], url=item.get("url", ""), author=item.get("author"), timestamp=item["timestamp"], text=cleaned_text, meta=item.get("meta", {}), ) ) return fuzzy_deduplicate(sanitized) def parse_date_range(option: str) -> datetime: """Return a UTC timestamp threshold for the given range identifier.""" now = datetime.now(timezone.utc) option = option.lower() delta = { "24h": timedelta(days=1), "7d": timedelta(days=7), "30d": timedelta(days=30), }.get(option, timedelta(days=7)) return now - delta def random_user_agent() -> str: """Return a random user agent string for polite scraping.""" user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_3) AppleWebKit/605.1.15 " "(KHTML, like Gecko) Version/16.4 Safari/605.1.15", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:125.0) Gecko/20100101 Firefox/125.0", ] return random.choice(user_agents) def chunked(iterable: Sequence[str], size: int) -> Iterator[Sequence[str]]: """Yield successive chunks from iterable.""" for start in range(0, len(iterable), size): yield iterable[start : start + size] def validate_openai_key(api_key: Optional[str]) -> Tuple[Optional[str], List[str]]: """Validate an OpenAI key following the guidance from day1 notebook.""" warnings: List[str] = [] if not api_key: warnings.append("No OpenAI API key detected. VADER fallback will be used.") return None, warnings if not api_key.startswith("sk-"): warnings.append( "Provided OpenAI API key does not start with the expected prefix (sk-)." ) if api_key.strip() != api_key: warnings.append("OpenAI API key looks like it has leading or trailing whitespace.") api_key = api_key.strip() return api_key, warnings def ensure_timezone(ts: datetime) -> datetime: """Guarantee timestamps are timezone-aware in UTC.""" if ts.tzinfo is None: return ts.replace(tzinfo=timezone.utc) return ts.astimezone(timezone.utc) def safe_int(value: Optional[object], default: int = 0) -> int: """Convert a value to int with a fallback.""" try: return int(value) # type: ignore[arg-type] except (TypeError, ValueError): return default