Files

218 lines
6.9 KiB
Python

"""Utility helpers for ReputationRadar services."""
from __future__ import annotations
import json
import logging
import os
import random
import re
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, TypedDict
from bs4 import BeautifulSoup
from fuzzywuzzy import fuzz
LOG_FILE = Path(__file__).resolve().parents[1] / "logs" / "app.log"
MIN_TEXT_LENGTH = 15
SIMILARITY_THRESHOLD = 90
class NormalizedItem(TypedDict):
"""Canonical representation of a fetched mention."""
source: str
id: str
url: str
author: Optional[str]
timestamp: datetime
text: str
meta: Dict[str, object]
class ServiceError(RuntimeError):
"""Raised when a service hard fails."""
class ServiceWarning(RuntimeError):
"""Raised for recoverable issues that should surface to the UI."""
def initialize_logger(name: str = "reputation_radar") -> logging.Logger:
"""Configure and return a module-level logger."""
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
handlers=[
logging.FileHandler(LOG_FILE, encoding="utf-8"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
return logger
def load_sample_items(name: str) -> List[NormalizedItem]:
"""Load demo data from the samples directory."""
samples_dir = Path(__file__).resolve().parents[1] / "samples"
sample_path = samples_dir / f"{name}.json"
if not sample_path.exists():
return []
with sample_path.open("r", encoding="utf-8") as handle:
raw_items = json.load(handle)
cleaned: List[NormalizedItem] = []
for item in raw_items:
try:
cleaned.append(
NormalizedItem(
source=item["source"],
id=str(item["id"]),
url=item.get("url", ""),
author=item.get("author"),
timestamp=datetime.fromisoformat(item["timestamp"]),
text=item["text"],
meta=item.get("meta", {}),
)
)
except (KeyError, ValueError):
continue
return cleaned
def strip_html(value: str) -> str:
"""Remove HTML tags and normalize whitespace."""
if not value:
return ""
soup = BeautifulSoup(value, "html.parser")
text = soup.get_text(separator=" ", strip=True)
text = re.sub(r"\s+", " ", text)
text = text.encode("utf-8", "ignore").decode("utf-8", "ignore")
return text.strip()
def sanitize_text(value: str) -> str:
"""Clean text and remove excessive noise."""
text = strip_html(value)
text = re.sub(r"http\S+", "", text) # drop inline URLs
text = re.sub(r"\s{2,}", " ", text)
return text.strip()
def drop_short_items(items: Iterable[NormalizedItem], minimum_length: int = MIN_TEXT_LENGTH) -> List[NormalizedItem]:
"""Filter out items that are too short to analyze."""
return [
item
for item in items
if len(item["text"]) >= minimum_length
]
def fuzzy_deduplicate(items: Sequence[NormalizedItem], threshold: int = SIMILARITY_THRESHOLD) -> List[NormalizedItem]:
"""Remove duplicates based on URL or fuzzy text similarity."""
seen_urls: set[str] = set()
deduped: List[NormalizedItem] = []
for item in items:
url = item.get("url") or ""
text = item.get("text") or ""
if url and url in seen_urls:
continue
duplicate_found = False
for existing in deduped:
if not text or not existing.get("text"):
continue
if fuzz.token_set_ratio(text, existing["text"]) >= threshold:
duplicate_found = True
break
if not duplicate_found:
deduped.append(item)
if url:
seen_urls.add(url)
return deduped
def normalize_items(items: Sequence[NormalizedItem]) -> List[NormalizedItem]:
"""Apply sanitization, deduplication, and drop noisy entries."""
sanitized: List[NormalizedItem] = []
for item in items:
cleaned_text = sanitize_text(item.get("text", ""))
if len(cleaned_text) < MIN_TEXT_LENGTH:
continue
sanitized.append(
NormalizedItem(
source=item["source"],
id=item["id"],
url=item.get("url", ""),
author=item.get("author"),
timestamp=item["timestamp"],
text=cleaned_text,
meta=item.get("meta", {}),
)
)
return fuzzy_deduplicate(sanitized)
def parse_date_range(option: str) -> datetime:
"""Return a UTC timestamp threshold for the given range identifier."""
now = datetime.now(timezone.utc)
option = option.lower()
delta = {
"24h": timedelta(days=1),
"7d": timedelta(days=7),
"30d": timedelta(days=30),
}.get(option, timedelta(days=7))
return now - delta
def random_user_agent() -> str:
"""Return a random user agent string for polite scraping."""
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_3) AppleWebKit/605.1.15 "
"(KHTML, like Gecko) Version/16.4 Safari/605.1.15",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:125.0) Gecko/20100101 Firefox/125.0",
]
return random.choice(user_agents)
def chunked(iterable: Sequence[str], size: int) -> Iterator[Sequence[str]]:
"""Yield successive chunks from iterable."""
for start in range(0, len(iterable), size):
yield iterable[start : start + size]
def validate_openai_key(api_key: Optional[str]) -> Tuple[Optional[str], List[str]]:
"""Validate an OpenAI key following the guidance from day1 notebook."""
warnings: List[str] = []
if not api_key:
warnings.append("No OpenAI API key detected. VADER fallback will be used.")
return None, warnings
if not api_key.startswith("sk-"):
warnings.append(
"Provided OpenAI API key does not start with the expected prefix (sk-)."
)
if api_key.strip() != api_key:
warnings.append("OpenAI API key looks like it has leading or trailing whitespace.")
api_key = api_key.strip()
return api_key, warnings
def ensure_timezone(ts: datetime) -> datetime:
"""Guarantee timestamps are timezone-aware in UTC."""
if ts.tzinfo is None:
return ts.replace(tzinfo=timezone.utc)
return ts.astimezone(timezone.utc)
def safe_int(value: Optional[object], default: int = 0) -> int:
"""Convert a value to int with a fallback."""
try:
return int(value) # type: ignore[arg-type]
except (TypeError, ValueError):
return default