Merge pull request #784 from parth1212121/reputation-radar-pr

Add ReputationRadar community contribution [ Project - Day1 (LLM Summarizer) ]
This commit is contained in:
Ed Donner
2025-10-25 13:53:39 -04:00
committed by GitHub
22 changed files with 1794 additions and 0 deletions

View File

@@ -0,0 +1,16 @@
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV STREAMLIT_SERVER_HEADLESS=true \
STREAMLIT_SERVER_ADDRESS=0.0.0.0 \
STREAMLIT_SERVER_PORT=8501
EXPOSE 8501
CMD ["streamlit", "run", "app.py"]

View File

@@ -0,0 +1,13 @@
PYTHON ?= python
.PHONY: install run test
install:
$(PYTHON) -m pip install --upgrade pip
$(PYTHON) -m pip install -r requirements.txt
run:
streamlit run app.py
test:
pytest

View File

@@ -0,0 +1,124 @@
# 📡 ReputationRadar
> Real-time brand intelligence with human-readable insights.
ReputationRadar is a Streamlit dashboard that unifies Reddit, Twitter/X, and Trustpilot chatter, classifies sentiment with OpenAI (or VADER fallback), and delivers exportable executive summaries. It ships with modular services, caching, retry-aware scrapers, demo data, and pytest coverage—ready for production hardening or internal deployment.
---
## Table of Contents
- [Demo](#demo)
- [Feature Highlights](#feature-highlights)
- [Architecture Overview](#architecture-overview)
- [Quick Start](#quick-start)
- [Configuration & Credentials](#configuration--credentials)
- [Running Tests](#running-tests)
- [Working Without API Keys](#working-without-api-keys)
- [Exports & Deliverables](#exports--deliverables)
- [Troubleshooting](#troubleshooting)
- [Legal & Compliance](#legal--compliance)
---
## Demo
The video demo of the app can be found at:-
https://drive.google.com/file/d/1XZ09NOht1H5LCJEbOrAldny2L5SV1DeT/view?usp=sharing
## Feature Highlights
- **Adaptive Ingestion** Toggle Reddit, Twitter/X, and Trustpilot independently; backoff, caching, and polite scraping keep providers happy.
- **Smart Sentiment** Batch OpenAI classification with rationale-aware prompts and auto-fallback to VADER when credentials are missing.
- **Actionable Summaries** Executive brief card (highlights, risks, tone, actions) plus refreshed PDF layout that respects margins and typography.
- **Interactive Insights** Plotly visuals, per-source filtering, and a lean “Representative Mentions” link list to avoid content overload.
- **Export Suite** CSV, Excel (auto-sized columns), and polished PDF snapshots for stakeholder handoffs.
- **Robust Foundation** Structured logging, reusable UI components, pytest suites, Dockerfile, and Makefile for frictionless iteration.
---
## Architecture Overview
```
community-contributions/Reputation_Radar/
├── app.py # Streamlit orchestrator & layout
├── components/ # Sidebar, dashboard, summaries, loaders
├── services/ # Reddit/Twitter clients, Trustpilot scraper, LLM wrapper, utilities
├── samples/ # Demo JSON payloads (auto-loaded when credentials missing)
├── tests/ # Pytest coverage for utilities and LLM fallback
├── assets/ # Placeholder icons/logo
├── logs/ # Streaming log output
├── requirements.txt # Runtime dependencies (includes PDF + Excel writers)
├── Dockerfile # Containerised deployment recipe
└── Makefile # Helper targets for install/run/test
```
Each service returns a normalised payload to keep the downstream sentiment pipeline deterministic. Deduplication is handled centrally via fuzzy matching, and timestamps are coerced to UTC before analysis.
---
## Quick Start
1. **Clone & enter the project directory (`community-contributions/Reputation_Radar`).**
2. **Install dependencies and launch Streamlit:**
```bash
pip install -r requirements.txt && streamlit run app.py
```
(Use a virtual environment if preferred.)
3. **Populate the sidebar:** add your brand name, optional filters, toggled sources, and API credentials (stored only in session state).
4. **Click “Run Analysis 🚀”** follow the status indicators as sources load, sentiment processes, and summaries render.
### Optional Docker Run
```bash
docker build -t reputation-radar .
docker run --rm -p 8501:8501 -e OPENAI_API_KEY=your_key reputation-radar
```
---
## Configuration & Credentials
The app reads from `.env`, Streamlit secrets, or direct sidebar input. Expected variables:
| Variable | Purpose |
| --- | --- |
| `OPENAI_API_KEY` | Enables OpenAI sentiment + executive summary (falls back to VADER if absent). |
| `REDDIT_CLIENT_ID` | PRAW client ID for Reddit API access. |
| `REDDIT_CLIENT_SECRET` | PRAW client secret. |
| `REDDIT_USER_AGENT` | Descriptive user agent (e.g., `ReputationRadar/1.0 by you`). |
| `TWITTER_BEARER_TOKEN` | Twitter/X v2 recent search bearer token. |
Credential validation mirrors the guidance from `week1/day1.ipynb`—mistyped OpenAI keys surface helpful warnings before analysis begins.
---
## Running Tests
```bash
pytest
```
Tests cover sentiment fallback behaviour and core sanitisation/deduplication helpers. Extend them as you add new data transforms or UI logic.
---
## Working Without API Keys
- Reddit/Twitter/Trustpilot can be toggled independently; missing credentials raise gentle warnings rather than hard failures.
- Curated fixtures in `samples/` automatically load for any disabled source, keeping charts, exports, and PDF output functional in demo mode.
- The LLM layer drops to VADER sentiment scoring and skips the executive summary when `OPENAI_API_KEY` is absent.
---
## Exports & Deliverables
- **CSV** Clean, UTF-8 dataset for quick spreadsheet edits.
- **Excel** Auto-sized columns, formatted timestamps, instantaneous import into stakeholder workbooks.
- **PDF** Professionally typeset executive summary with bullet lists, consistent margins, and wrapped excerpts (thanks to ReportLabs Platypus engine).
All exports are regenerated on demand and never persisted server-side.
---
## Troubleshooting
- **OpenAI key missing/invalid** Watch the sidebar notices; the app falls back gracefully but no executive summary will be produced.
- **Twitter 401/403** Confirm your bearer token scope and that the project has search access enabled.
- **Rate limiting (429)** Built-in sleeps help, but repeated requests may require manual pauses. Try narrowing filters or reducing per-source limits.
- **Trustpilot blocks** Respect robots.txt. If scraping is denied, switch to the official API or provide compliant CSV imports.
- **PDF text clipping** Resolved by the new layout; if you customise templates ensure col widths/table styles remain inside page margins.
---
## Legal & Compliance
ReputationRadar surfaces public discourse for legitimate monitoring purposes. Always comply with each platforms Terms of Service, local regulations, and privacy expectations. Avoid storing third-party data longer than necessary, and never commit API keys to version control—the app only keeps them in Streamlit session state.

View File

@@ -0,0 +1,436 @@
"""ReputationRadar Streamlit application entrypoint."""
from __future__ import annotations
import io
import json
import os
import re
from datetime import datetime
from typing import Dict, List, Optional
import pandas as pd
import streamlit as st
from dotenv import load_dotenv
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle
from components.dashboard import render_overview, render_source_explorer, render_top_comments
from components.filters import render_sidebar
from components.summary import render_summary
from components.loaders import show_empty_state, source_status
from services import llm, reddit_client, trustpilot_scraper, twitter_client, utils
from services.llm import SentimentResult
from services.utils import (
NormalizedItem,
ServiceError,
ServiceWarning,
initialize_logger,
load_sample_items,
normalize_items,
parse_date_range,
validate_openai_key,
)
st.set_page_config(page_title="ReputationRadar", page_icon="📡", layout="wide")
load_dotenv(override=True)
LOGGER = initialize_logger()
st.title("📡 ReputationRadar")
st.caption("Aggregate brand chatter, classify sentiment, and surface actionable insights in minutes.")
def _get_env_defaults() -> Dict[str, Optional[str]]:
"""Read supported credentials from environment variables."""
return {
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY"),
"REDDIT_CLIENT_ID": os.getenv("REDDIT_CLIENT_ID"),
"REDDIT_CLIENT_SECRET": os.getenv("REDDIT_CLIENT_SECRET"),
"REDDIT_USER_AGENT": os.getenv("REDDIT_USER_AGENT", "ReputationRadar/1.0"),
"TWITTER_BEARER_TOKEN": os.getenv("TWITTER_BEARER_TOKEN"),
}
@st.cache_data(ttl=600, show_spinner=False)
def cached_reddit_fetch(
brand: str,
limit: int,
date_range: str,
min_upvotes: int,
client_id: str,
client_secret: str,
user_agent: str,
) -> List[NormalizedItem]:
credentials = {
"client_id": client_id,
"client_secret": client_secret,
"user_agent": user_agent,
}
return reddit_client.fetch_mentions(
brand=brand,
credentials=credentials,
limit=limit,
date_filter=date_range,
min_upvotes=min_upvotes,
)
@st.cache_data(ttl=600, show_spinner=False)
def cached_twitter_fetch(
brand: str,
limit: int,
min_likes: int,
language: str,
bearer: str,
) -> List[NormalizedItem]:
return twitter_client.fetch_mentions(
brand=brand,
bearer_token=bearer,
limit=limit,
min_likes=min_likes,
language=language,
)
@st.cache_data(ttl=600, show_spinner=False)
def cached_trustpilot_fetch(
brand: str,
language: str,
pages: int = 2,
) -> List[NormalizedItem]:
return trustpilot_scraper.fetch_reviews(brand=brand, language=language, pages=pages)
def _to_dataframe(items: List[NormalizedItem], sentiments: List[SentimentResult]) -> pd.DataFrame:
data = []
for item, sentiment in zip(items, sentiments):
data.append(
{
"source": item["source"],
"id": item["id"],
"url": item.get("url"),
"author": item.get("author"),
"timestamp": item["timestamp"],
"text": item["text"],
"label": sentiment.label,
"confidence": sentiment.confidence,
"meta": json.dumps(item.get("meta", {})),
}
)
df = pd.DataFrame(data)
if not df.empty:
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df
def _build_pdf(summary: Optional[Dict[str, str]], df: pd.DataFrame) -> bytes:
buffer = io.BytesIO()
doc = SimpleDocTemplate(
buffer,
pagesize=letter,
rightMargin=40,
leftMargin=40,
topMargin=60,
bottomMargin=40,
title="ReputationRadar Executive Summary",
)
styles = getSampleStyleSheet()
title_style = styles["Title"]
subtitle_style = ParagraphStyle(
"Subtitle",
parent=styles["BodyText"],
fontSize=10,
leading=14,
textColor="#555555",
)
body_style = ParagraphStyle(
"Body",
parent=styles["BodyText"],
leading=14,
fontSize=11,
)
bullet_style = ParagraphStyle(
"Bullet",
parent=body_style,
leftIndent=16,
bulletIndent=8,
spaceBefore=2,
spaceAfter=2,
)
heading_style = ParagraphStyle(
"SectionHeading",
parent=styles["Heading3"],
spaceBefore=10,
spaceAfter=6,
)
story: List[Paragraph | Spacer | Table] = []
story.append(Paragraph("ReputationRadar Executive Summary", title_style))
story.append(Spacer(1, 6))
story.append(
Paragraph(
f"Generated on: {datetime.utcnow().strftime('%Y-%m-%d %H:%M')} UTC",
subtitle_style,
)
)
story.append(Spacer(1, 18))
if summary and summary.get("raw"):
story.extend(_summary_to_story(summary["raw"], body_style, bullet_style, heading_style))
else:
story.append(
Paragraph(
"Executive summary disabled (OpenAI key missing).",
body_style,
)
)
story.append(Spacer(1, 16))
story.append(Paragraph("Sentiment Snapshot", styles["Heading2"]))
story.append(Spacer(1, 10))
table_data: List[List[Paragraph]] = [
[
Paragraph("Date", body_style),
Paragraph("Sentiment", body_style),
Paragraph("Source", body_style),
Paragraph("Excerpt", body_style),
]
]
snapshot = df.sort_values("timestamp", ascending=False).head(15)
for _, row in snapshot.iterrows():
excerpt = _truncate_text(row["text"], 180)
table_data.append(
[
Paragraph(row["timestamp"].strftime("%Y-%m-%d %H:%M"), body_style),
Paragraph(row["label"].title(), body_style),
Paragraph(row["source"].title(), body_style),
Paragraph(excerpt, body_style),
]
)
table = Table(table_data, colWidths=[90, 70, 80, 250])
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#f3f4f6")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.HexColor("#1f2937")),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("ALIGN", (0, 0), (-1, -1), "LEFT"),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("INNERGRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#d1d5db")),
("BOX", (0, 0), (-1, -1), 0.5, colors.HexColor("#9ca3af")),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f9fafb")]),
]
)
)
story.append(table)
doc.build(story)
buffer.seek(0)
return buffer.getvalue()
def _summary_to_story(
raw_summary: str,
body_style: ParagraphStyle,
bullet_style: ParagraphStyle,
heading_style: ParagraphStyle,
) -> List[Paragraph | Spacer]:
story: List[Paragraph | Spacer] = []
lines = [line.strip() for line in raw_summary.splitlines()]
for line in lines:
if not line:
continue
clean = re.sub(r"\*\*(.*?)\*\*", r"\1", line)
if clean.endswith(":") and len(clean) < 40:
story.append(Paragraph(clean.rstrip(":"), heading_style))
continue
if clean.lower().startswith(("highlights", "risks & concerns", "recommended actions", "overall tone")):
story.append(Paragraph(clean, heading_style))
continue
if line.startswith(("-", "*")):
bullet_text = re.sub(r"\*\*(.*?)\*\*", r"\1", line[1:].strip())
story.append(Paragraph(bullet_text, bullet_style, bulletText=""))
else:
story.append(Paragraph(clean, body_style))
story.append(Spacer(1, 10))
return story
def _truncate_text(text: str, max_length: int) -> str:
clean = re.sub(r"\s+", " ", text).strip()
if len(clean) <= max_length:
return clean
return clean[: max_length - 1].rstrip() + ""
def _build_excel(df: pd.DataFrame) -> bytes:
buffer = io.BytesIO()
export_df = df.copy()
export_df["timestamp"] = export_df["timestamp"].dt.strftime("%Y-%m-%d %H:%M")
with pd.ExcelWriter(buffer, engine="xlsxwriter") as writer:
export_df.to_excel(writer, index=False, sheet_name="Mentions")
worksheet = writer.sheets["Mentions"]
for idx, column in enumerate(export_df.columns):
series = export_df[column].astype(str)
max_len = min(60, max(series.map(len).max(), len(column)) + 2)
worksheet.set_column(idx, idx, max_len)
buffer.seek(0)
return buffer.getvalue()
def main() -> None:
env_defaults = _get_env_defaults()
openai_env_key = env_defaults.get("OPENAI_API_KEY") or st.session_state.get("secrets", {}).get("OPENAI_API_KEY")
validated_env_key, notices = validate_openai_key(openai_env_key)
config = render_sidebar(env_defaults, tuple(notices))
chosen_key = config["credentials"]["openai"] or validated_env_key
openai_key, runtime_notices = validate_openai_key(chosen_key)
for msg in runtime_notices:
st.sidebar.info(msg)
run_clicked = st.button("Run Analysis 🚀", type="primary")
if not run_clicked:
show_empty_state("Enter a brand name and click **Run Analysis** to get started.")
return
if not config["brand"]:
st.error("Brand name is required.")
return
threshold = parse_date_range(config["date_range"])
collected: List[NormalizedItem] = []
with st.container():
if config["sources"]["reddit"]:
with source_status("Fetching Reddit mentions") as status:
try:
reddit_items = cached_reddit_fetch(
brand=config["brand"],
limit=config["limits"]["reddit"],
date_range=config["date_range"],
min_upvotes=config["min_reddit_upvotes"],
client_id=config["credentials"]["reddit"]["client_id"],
client_secret=config["credentials"]["reddit"]["client_secret"],
user_agent=config["credentials"]["reddit"]["user_agent"],
)
reddit_items = [item for item in reddit_items if item["timestamp"] >= threshold]
status.write(f"Fetched {len(reddit_items)} Reddit items.")
collected.extend(reddit_items)
except ServiceWarning as warning:
st.warning(str(warning))
demo = load_sample_items("reddit_sample")
if demo:
st.info("Loaded demo Reddit data.", icon="🧪")
collected.extend(demo)
except ServiceError as error:
st.error(f"Reddit fetch failed: {error}")
if config["sources"]["twitter"]:
with source_status("Fetching Twitter mentions") as status:
try:
twitter_items = cached_twitter_fetch(
brand=config["brand"],
limit=config["limits"]["twitter"],
min_likes=config["min_twitter_likes"],
language=config["language"],
bearer=config["credentials"]["twitter"],
)
twitter_items = [item for item in twitter_items if item["timestamp"] >= threshold]
status.write(f"Fetched {len(twitter_items)} tweets.")
collected.extend(twitter_items)
except ServiceWarning as warning:
st.warning(str(warning))
demo = load_sample_items("twitter_sample")
if demo:
st.info("Loaded demo Twitter data.", icon="🧪")
collected.extend(demo)
except ServiceError as error:
st.error(f"Twitter fetch failed: {error}")
if config["sources"]["trustpilot"]:
with source_status("Fetching Trustpilot reviews") as status:
try:
trustpilot_items = cached_trustpilot_fetch(
brand=config["brand"],
language=config["language"],
)
trustpilot_items = [item for item in trustpilot_items if item["timestamp"] >= threshold]
status.write(f"Fetched {len(trustpilot_items)} reviews.")
collected.extend(trustpilot_items)
except ServiceWarning as warning:
st.warning(str(warning))
demo = load_sample_items("trustpilot_sample")
if demo:
st.info("Loaded demo Trustpilot data.", icon="🧪")
collected.extend(demo)
except ServiceError as error:
st.error(f"Trustpilot fetch failed: {error}")
if not collected:
show_empty_state("No mentions found. Try enabling more sources or loosening filters.")
return
cleaned = normalize_items(collected)
if not cleaned:
show_empty_state("All results were filtered out as noise. Try again with different settings.")
return
sentiment_service = llm.LLMService(
api_key=config["credentials"]["openai"] or openai_key,
batch_size=config["batch_size"],
)
sentiments = sentiment_service.classify_sentiment_batch([item["text"] for item in cleaned])
df = _to_dataframe(cleaned, sentiments)
render_overview(df)
render_top_comments(df)
summary_payload: Optional[Dict[str, str]] = None
if sentiment_service.available():
try:
summary_payload = sentiment_service.summarize_overall(
[{"label": row["label"], "text": row["text"]} for _, row in df.iterrows()]
)
except ServiceWarning as warning:
st.warning(str(warning))
else:
st.info("OpenAI key missing. Using VADER fallback for sentiment; summary disabled.", icon="")
render_summary(summary_payload)
render_source_explorer(df)
csv_data = df.to_csv(index=False).encode("utf-8")
excel_data = _build_excel(df)
pdf_data = _build_pdf(summary_payload, df)
col_csv, col_excel, col_pdf = st.columns(3)
with col_csv:
st.download_button(
"⬇️ Export CSV",
data=csv_data,
file_name="reputation_radar.csv",
mime="text/csv",
)
with col_excel:
st.download_button(
"⬇️ Export Excel",
data=excel_data,
file_name="reputation_radar.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
with col_pdf:
st.download_button(
"⬇️ Export PDF Summary",
data=pdf_data,
file_name="reputation_radar_summary.pdf",
mime="application/pdf",
)
st.success("Analysis complete! Review the insights above.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,5 @@
"""Reusable Streamlit UI components for ReputationRadar."""
from . import dashboard, filters, loaders, summary
__all__ = ["dashboard", "filters", "loaders", "summary"]

View File

@@ -0,0 +1,136 @@
"""Render the ReputationRadar dashboard components."""
from __future__ import annotations
from typing import Dict, Optional
import pandas as pd
import plotly.express as px
import streamlit as st
SOURCE_CHIPS = {
"reddit": "🔺 Reddit",
"twitter": "✖️ Twitter",
"trustpilot": "⭐ Trustpilot",
}
SENTIMENT_COLORS = {
"positive": "#4caf50",
"neutral": "#90a4ae",
"negative": "#ef5350",
}
def render_overview(df: pd.DataFrame) -> None:
"""Display charts summarising sentiment."""
counts = (
df["label"]
.value_counts()
.reindex(["positive", "neutral", "negative"], fill_value=0)
.rename_axis("label")
.reset_index(name="count")
)
pie = px.pie(
counts,
names="label",
values="count",
color="label",
color_discrete_map=SENTIMENT_COLORS,
title="Sentiment distribution",
)
pie.update_traces(textinfo="percent+label")
ts = (
df.set_index("timestamp")
.groupby([pd.Grouper(freq="D"), "label"])
.size()
.reset_index(name="count")
)
if not ts.empty:
ts_plot = px.line(
ts,
x="timestamp",
y="count",
color="label",
color_discrete_map=SENTIMENT_COLORS,
markers=True,
title="Mentions over time",
)
else:
ts_plot = None
col1, col2 = st.columns(2)
with col1:
st.plotly_chart(pie, use_container_width=True)
with col2:
if ts_plot is not None:
st.plotly_chart(ts_plot, use_container_width=True)
else:
st.info("Not enough data for a time-series. Try widening the date range.", icon="📆")
def render_top_comments(df: pd.DataFrame) -> None:
"""Show representative comments per sentiment."""
st.subheader("Representative Mentions")
cols = st.columns(3)
for idx, sentiment in enumerate(["positive", "neutral", "negative"]):
subset = (
df[df["label"] == sentiment]
.sort_values("confidence", ascending=False)
.head(5)
)
with cols[idx]:
st.caption(sentiment.capitalize())
if subset.empty:
st.write("No items yet.")
continue
for _, row in subset.iterrows():
chip = SOURCE_CHIPS.get(row["source"], row["source"])
author = row.get("author") or "Unknown"
timestamp = row["timestamp"].strftime("%Y-%m-%d %H:%M")
label = f"{chip} · {author} · {timestamp}"
if row.get("url"):
st.markdown(f"- [{label}]({row['url']})")
else:
st.markdown(f"- {label}")
def render_source_explorer(df: pd.DataFrame) -> None:
"""Interactive tabular explorer with pagination and filters."""
with st.expander("Source Explorer", expanded=False):
search_term = st.text_input("Search mentions", key="explorer_search")
selected_source = st.selectbox("Source filter", options=["All"] + list(SOURCE_CHIPS.values()))
min_conf = st.slider("Minimum confidence", min_value=0.0, max_value=1.0, value=0.0, step=0.1)
filtered = df.copy()
if search_term:
filtered = filtered[filtered["text"].str.contains(search_term, case=False, na=False)]
if selected_source != "All":
source_key = _reverse_lookup(selected_source)
if source_key:
filtered = filtered[filtered["source"] == source_key]
filtered = filtered[filtered["confidence"] >= min_conf]
if filtered.empty:
st.info("No results found. Try widening the date range or removing filters.", icon="🪄")
return
page_size = 10
total_pages = max(1, (len(filtered) + page_size - 1) // page_size)
page = st.number_input("Page", min_value=1, max_value=total_pages, value=1)
start = (page - 1) * page_size
end = start + page_size
explorer_df = filtered.iloc[start:end].copy()
explorer_df["source"] = explorer_df["source"].map(SOURCE_CHIPS).fillna(explorer_df["source"])
explorer_df["timestamp"] = explorer_df["timestamp"].dt.strftime("%Y-%m-%d %H:%M")
explorer_df = explorer_df[["timestamp", "source", "author", "label", "confidence", "text", "url"]]
st.dataframe(explorer_df, use_container_width=True, hide_index=True)
def _reverse_lookup(value: str) -> Optional[str]:
for key, chip in SOURCE_CHIPS.items():
if chip == value:
return key
return None

View File

@@ -0,0 +1,128 @@
"""Sidebar filters and configuration controls."""
from __future__ import annotations
from typing import Dict, Optional, Tuple
import streamlit as st
DATE_RANGE_LABELS = {
"24h": "Last 24 hours",
"7d": "Last 7 days",
"30d": "Last 30 days",
}
SUPPORTED_LANGUAGES = {
"en": "English",
"es": "Spanish",
"de": "German",
"fr": "French",
}
def _store_secret(key: str, value: str) -> None:
"""Persist sensitive values in session state only."""
if value:
st.session_state.setdefault("secrets", {})
st.session_state["secrets"][key] = value
def _get_secret(key: str, default: str = "") -> str:
return st.session_state.get("secrets", {}).get(key, default)
def render_sidebar(env_defaults: Dict[str, Optional[str]], openai_notices: Tuple[str, ...]) -> Dict[str, object]:
"""Render all sidebar controls and return configuration."""
with st.sidebar:
st.header("Tune Your Radar", anchor=False)
brand = st.text_input("Brand Name*", value=st.session_state.get("brand_input", ""))
if brand:
st.session_state["brand_input"] = brand
date_range = st.selectbox(
"Date Range",
options=list(DATE_RANGE_LABELS.keys()),
format_func=lambda key: DATE_RANGE_LABELS[key],
index=1,
)
min_reddit_upvotes = st.number_input(
"Minimum Reddit upvotes",
min_value=0,
value=st.session_state.get("min_reddit_upvotes", 4),
)
st.session_state["min_reddit_upvotes"] = min_reddit_upvotes
min_twitter_likes = st.number_input(
"Minimum X likes",
min_value=0,
value=st.session_state.get("min_twitter_likes", 100),
)
st.session_state["min_twitter_likes"] = min_twitter_likes
language = st.selectbox(
"Language",
options=list(SUPPORTED_LANGUAGES.keys()),
format_func=lambda key: SUPPORTED_LANGUAGES[key],
index=0,
)
st.markdown("### Sources")
reddit_enabled = st.toggle("🔺 Reddit", value=st.session_state.get("reddit_enabled", True))
twitter_enabled = st.toggle("✖️ Twitter", value=st.session_state.get("twitter_enabled", True))
trustpilot_enabled = st.toggle("⭐ Trustpilot", value=st.session_state.get("trustpilot_enabled", True))
st.session_state["reddit_enabled"] = reddit_enabled
st.session_state["twitter_enabled"] = twitter_enabled
st.session_state["trustpilot_enabled"] = trustpilot_enabled
st.markdown("### API Keys")
openai_key_default = env_defaults.get("OPENAI_API_KEY") or _get_secret("OPENAI_API_KEY")
openai_key = st.text_input("OpenAI API Key", value=openai_key_default or "", type="password", help="Stored only in this session.")
_store_secret("OPENAI_API_KEY", openai_key.strip())
reddit_client_id = st.text_input("Reddit Client ID", value=env_defaults.get("REDDIT_CLIENT_ID") or _get_secret("REDDIT_CLIENT_ID"), type="password")
reddit_client_secret = st.text_input("Reddit Client Secret", value=env_defaults.get("REDDIT_CLIENT_SECRET") or _get_secret("REDDIT_CLIENT_SECRET"), type="password")
reddit_user_agent = st.text_input("Reddit User Agent", value=env_defaults.get("REDDIT_USER_AGENT") or _get_secret("REDDIT_USER_AGENT"))
twitter_bearer_token = st.text_input("Twitter Bearer Token", value=env_defaults.get("TWITTER_BEARER_TOKEN") or _get_secret("TWITTER_BEARER_TOKEN"), type="password")
_store_secret("REDDIT_CLIENT_ID", reddit_client_id.strip())
_store_secret("REDDIT_CLIENT_SECRET", reddit_client_secret.strip())
_store_secret("REDDIT_USER_AGENT", reddit_user_agent.strip())
_store_secret("TWITTER_BEARER_TOKEN", twitter_bearer_token.strip())
if openai_notices:
for notice in openai_notices:
st.info(notice)
with st.expander("Advanced Options", expanded=False):
reddit_limit = st.slider("Reddit results", min_value=10, max_value=100, value=st.session_state.get("reddit_limit", 40), step=5)
twitter_limit = st.slider("Twitter results", min_value=10, max_value=100, value=st.session_state.get("twitter_limit", 40), step=5)
trustpilot_limit = st.slider("Trustpilot results", min_value=10, max_value=60, value=st.session_state.get("trustpilot_limit", 30), step=5)
llm_batch_size = st.slider("OpenAI batch size", min_value=5, max_value=20, value=st.session_state.get("llm_batch_size", 20), step=5)
st.session_state["reddit_limit"] = reddit_limit
st.session_state["twitter_limit"] = twitter_limit
st.session_state["trustpilot_limit"] = trustpilot_limit
st.session_state["llm_batch_size"] = llm_batch_size
return {
"brand": brand.strip(),
"date_range": date_range,
"min_reddit_upvotes": min_reddit_upvotes,
"min_twitter_likes": min_twitter_likes,
"language": language,
"sources": {
"reddit": reddit_enabled,
"twitter": twitter_enabled,
"trustpilot": trustpilot_enabled,
},
"limits": {
"reddit": reddit_limit,
"twitter": twitter_limit,
"trustpilot": trustpilot_limit,
},
"batch_size": llm_batch_size,
"credentials": {
"openai": openai_key.strip(),
"reddit": {
"client_id": reddit_client_id.strip(),
"client_secret": reddit_client_secret.strip(),
"user_agent": reddit_user_agent.strip(),
},
"twitter": twitter_bearer_token.strip(),
},
}

View File

@@ -0,0 +1,25 @@
"""Loading indicators and status helpers."""
from __future__ import annotations
from contextlib import contextmanager
from typing import Iterator
import streamlit as st
@contextmanager
def source_status(label: str) -> Iterator[st.delta_generator.DeltaGenerator]:
"""Context manager that yields a status widget for source fetching."""
status = st.status(label, expanded=True)
try:
yield status
status.update(label=f"{label}", state="complete")
except Exception as exc: # noqa: BLE001
status.update(label=f"{label} ⚠️ {exc}", state="error")
raise
def show_empty_state(message: str) -> None:
"""Render a friendly empty-state callout."""
st.info(message, icon="🔎")

View File

@@ -0,0 +1,23 @@
"""Executive summary display components."""
from __future__ import annotations
from typing import Dict, Optional
import streamlit as st
def render_summary(summary: Optional[Dict[str, str]]) -> None:
"""Render executive summary card."""
st.subheader("Executive Summary", anchor=False)
if not summary:
st.warning("Executive summary disabled. Provide an OpenAI API key to unlock this section.", icon="🤖")
return
st.markdown(
"""
<div style="padding:1rem;border:1px solid #eee;border-radius:0.75rem;background-color:#f9fafb;">
""",
unsafe_allow_html=True,
)
st.markdown(summary.get("raw", ""))
st.markdown("</div>", unsafe_allow_html=True)

View File

@@ -0,0 +1,16 @@
streamlit
praw
requests
beautifulsoup4
pandas
python-dotenv
tenacity
plotly
openai>=1.0.0
vaderSentiment
fuzzywuzzy[speedup]
python-Levenshtein
reportlab
tqdm
pytest
XlsxWriter

View File

@@ -0,0 +1,20 @@
[
{
"source": "reddit",
"id": "t3_sample1",
"url": "https://www.reddit.com/r/technology/comments/sample1",
"author": "techfan42",
"timestamp": "2025-01-15T14:30:00+00:00",
"text": "ReputationRadar did an impressive job resolving our customer issues within hours. Support has been world class!",
"meta": {"score": 128, "num_comments": 24, "subreddit": "technology", "type": "submission"}
},
{
"source": "reddit",
"id": "t1_sample2",
"url": "https://www.reddit.com/r/startups/comments/sample2/comment/sample",
"author": "growthguru",
"timestamp": "2025-01-14T10:10:00+00:00",
"text": "Noticed a spike in downtime alerts with ReputationRadar this week. Anyone else seeing false positives?",
"meta": {"score": 45, "subreddit": "startups", "type": "comment", "submission_title": "Monitoring tools"}
}
]

View File

@@ -0,0 +1,20 @@
[
{
"source": "trustpilot",
"id": "trustpilot-001",
"url": "https://www.trustpilot.com/review/reputationradar.ai",
"author": "Dana",
"timestamp": "2025-01-12T11:00:00+00:00",
"text": "ReputationRadar has simplified our weekly reporting. The sentiment breakdowns are easy to understand and accurate.",
"meta": {"rating": "5 stars"}
},
{
"source": "trustpilot",
"id": "trustpilot-002",
"url": "https://www.trustpilot.com/review/reputationradar.ai?page=2",
"author": "Liam",
"timestamp": "2025-01-10T18:20:00+00:00",
"text": "Support was responsive, but the Trustpilot integration kept timing out. Hoping for a fix soon.",
"meta": {"rating": "3 stars"}
}
]

View File

@@ -0,0 +1,20 @@
[
{
"source": "twitter",
"id": "173654001",
"url": "https://twitter.com/brandlover/status/173654001",
"author": "brandlover",
"timestamp": "2025-01-15T16:45:00+00:00",
"text": "Huge shoutout to ReputationRadar for flagging sentiment risks ahead of our launch. Saved us hours this morning!",
"meta": {"likes": 57, "retweets": 8, "replies": 3, "quote_count": 2}
},
{
"source": "twitter",
"id": "173653991",
"url": "https://twitter.com/critique/status/173653991",
"author": "critique",
"timestamp": "2025-01-13T09:12:00+00:00",
"text": "The new ReputationRadar dashboard feels laggy and the PDF export failed twice. Dev team please check your rollout.",
"meta": {"likes": 14, "retweets": 1, "replies": 5, "quote_count": 0}
}
]

View File

@@ -0,0 +1,11 @@
"""Service layer exports for ReputationRadar."""
from . import llm, reddit_client, trustpilot_scraper, twitter_client, utils
__all__ = [
"llm",
"reddit_client",
"trustpilot_scraper",
"twitter_client",
"utils",
]

View File

@@ -0,0 +1,147 @@
"""LLM sentiment analysis and summarization utilities."""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence
try: # pragma: no cover - optional dependency
from openai import OpenAI
except ModuleNotFoundError: # pragma: no cover
OpenAI = None # type: ignore[assignment]
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from .utils import ServiceWarning, chunked
CLASSIFICATION_SYSTEM_PROMPT = "You are a precise brand-sentiment classifier. Output JSON only."
SUMMARY_SYSTEM_PROMPT = "You analyze brand chatter and produce concise, executive-ready summaries."
@dataclass
class SentimentResult:
"""Structured sentiment output."""
label: str
confidence: float
class LLMService:
"""Wrapper around OpenAI with VADER fallback."""
def __init__(self, api_key: Optional[str], model: str = "gpt-4o-mini", batch_size: int = 20):
self.batch_size = max(1, batch_size)
self.model = model
self.logger = logging.getLogger("services.llm")
self._client: Optional[Any] = None
self._analyzer = SentimentIntensityAnalyzer()
if api_key and OpenAI is not None:
try:
self._client = OpenAI(api_key=api_key)
except Exception as exc: # noqa: BLE001
self.logger.warning("Failed to initialize OpenAI client, using VADER fallback: %s", exc)
self._client = None
elif api_key and OpenAI is None:
self.logger.warning("openai package not installed; falling back to VADER despite API key.")
def available(self) -> bool:
"""Return whether OpenAI-backed features are available."""
return self._client is not None
def classify_sentiment_batch(self, texts: Sequence[str]) -> List[SentimentResult]:
"""Classify multiple texts, chunking if necessary."""
if not texts:
return []
if not self.available():
return [self._vader_sentiment(text) for text in texts]
results: List[SentimentResult] = []
for chunk in chunked(list(texts), self.batch_size):
prompt_lines = ["Classify each item as \"positive\", \"neutral\", or \"negative\".", "Also output a confidence score between 0 and 1.", "Return an array of objects: [{\"label\": \"...\", \"confidence\": 0.0}].", "Items:"]
prompt_lines.extend([f"{idx + 1}) {text}" for idx, text in enumerate(chunk)])
prompt = "\n".join(prompt_lines)
try:
response = self._client.responses.create( # type: ignore[union-attr]
model=self.model,
input=[
{"role": "system", "content": CLASSIFICATION_SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0,
max_output_tokens=500,
)
output_text = self._extract_text(response)
parsed = json.loads(output_text)
for item in parsed:
results.append(
SentimentResult(
label=item.get("label", "neutral"),
confidence=float(item.get("confidence", 0.5)),
)
)
except Exception as exc: # noqa: BLE001
self.logger.warning("Classification fallback to VADER due to error: %s", exc)
for text in chunk:
results.append(self._vader_sentiment(text))
# Ensure the output length matches input
if len(results) != len(texts):
# align by padding with neutral
results.extend([SentimentResult(label="neutral", confidence=0.33)] * (len(texts) - len(results)))
return results
def summarize_overall(self, findings: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Create an executive summary using OpenAI."""
if not self.available():
raise ServiceWarning("OpenAI API key missing. Summary unavailable.")
prompt_lines = [
"Given these labeled items and their short rationales, write:",
"- 5 bullet \"Highlights\"",
"- 5 bullet \"Risks & Concerns\"",
"- One-line \"Overall Tone\" (Positive/Neutral/Negative with brief justification)",
"- 3 \"Recommended Actions\"",
"Keep it under 180 words total. Be specific but neutral in tone.",
"Items:",
]
for idx, item in enumerate(findings, start=1):
prompt_lines.append(
f"{idx}) [{item.get('label','neutral').upper()}] {item.get('text','')}"
)
prompt = "\n".join(prompt_lines)
try:
response = self._client.responses.create( # type: ignore[union-attr]
model=self.model,
input=[
{"role": "system", "content": SUMMARY_SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.2,
max_output_tokens=800,
)
output_text = self._extract_text(response)
return {"raw": output_text}
except Exception as exc: # noqa: BLE001
self.logger.error("Failed to generate summary: %s", exc)
raise ServiceWarning("Unable to generate executive summary at this time.") from exc
def _vader_sentiment(self, text: str) -> SentimentResult:
scores = self._analyzer.polarity_scores(text)
compound = scores["compound"]
if compound >= 0.2:
label = "positive"
elif compound <= -0.2:
label = "negative"
else:
label = "neutral"
confidence = min(1.0, max(0.0, abs(compound)))
return SentimentResult(label=label, confidence=confidence)
def _extract_text(self, response: Any) -> str:
"""Support multiple OpenAI client response shapes."""
if hasattr(response, "output") and response.output:
content = response.output[0].content[0]
return getattr(content, "text", str(content))
if hasattr(response, "choices"):
return response.choices[0].message.content # type: ignore[return-value]
raise ValueError("Unknown response structure from OpenAI client.")

View File

@@ -0,0 +1,141 @@
"""Reddit data collection service using PRAW."""
from __future__ import annotations
import time
from datetime import datetime, timezone
from typing import Dict, Iterable, List, Optional
import praw
from praw.models import Comment, Submission
from .utils import (
NormalizedItem,
ServiceError,
ServiceWarning,
ensure_timezone,
sanitize_text,
)
TIME_FILTER_MAP = {
"24h": "day",
"7d": "week",
"30d": "month",
}
def _iter_submissions(subreddit: praw.models.Subreddit, query: str, limit: int, time_filter: str) -> Iterable[Submission]:
return subreddit.search(query=query, sort="new", time_filter=time_filter, limit=limit * 3)
def _iter_comments(submission: Submission) -> Iterable[Comment]:
submission.comments.replace_more(limit=0)
return submission.comments.list()
def _normalize_submission(submission: Submission) -> NormalizedItem:
created = datetime.fromtimestamp(submission.created_utc, tz=timezone.utc)
return NormalizedItem(
source="reddit",
id=submission.id,
url=f"https://www.reddit.com{submission.permalink}",
author=str(submission.author) if submission.author else None,
timestamp=ensure_timezone(created),
text=f"{submission.title}\n\n{submission.selftext or ''}",
meta={
"score": submission.score,
"num_comments": submission.num_comments,
"subreddit": submission.subreddit.display_name,
"type": "submission",
},
)
def _normalize_comment(comment: Comment, submission: Submission) -> NormalizedItem:
created = datetime.fromtimestamp(comment.created_utc, tz=timezone.utc)
return NormalizedItem(
source="reddit",
id=comment.id,
url=f"https://www.reddit.com{comment.permalink}",
author=str(comment.author) if comment.author else None,
timestamp=ensure_timezone(created),
text=comment.body,
meta={
"score": comment.score,
"subreddit": submission.subreddit.display_name,
"type": "comment",
"submission_title": submission.title,
},
)
def fetch_mentions(
brand: str,
credentials: Dict[str, str],
limit: int = 25,
date_filter: str = "7d",
min_upvotes: int = 0,
) -> List[NormalizedItem]:
"""Fetch recent Reddit submissions/comments mentioning the brand."""
client_id = credentials.get("client_id")
client_secret = credentials.get("client_secret")
user_agent = credentials.get("user_agent")
if not all([client_id, client_secret, user_agent]):
raise ServiceWarning("Reddit credentials are missing. Provide them in the sidebar to enable this source.")
try:
reddit = praw.Reddit(
client_id=client_id,
client_secret=client_secret,
user_agent=user_agent,
)
reddit.read_only = True
except Exception as exc: # noqa: BLE001
raise ServiceError(f"Failed to initialize Reddit client: {exc}") from exc
time_filter = TIME_FILTER_MAP.get(date_filter.lower(), "week")
subreddit = reddit.subreddit("all")
results: List[NormalizedItem] = []
seen_ids: set[str] = set()
try:
for submission in _iter_submissions(subreddit, query=brand, limit=limit, time_filter=time_filter):
if submission.id in seen_ids:
continue
if submission.score < min_upvotes:
continue
normalized_submission = _normalize_submission(submission)
normalized_submission["text"] = sanitize_text(normalized_submission["text"])
if normalized_submission["text"]:
results.append(normalized_submission)
seen_ids.add(submission.id)
if len(results) >= limit:
break
# Fetch comments mentioning the brand
match_count = 0
for comment in _iter_comments(submission):
if brand.lower() not in (comment.body or "").lower():
continue
if comment.score < min_upvotes:
continue
normalized_comment = _normalize_comment(comment, submission)
normalized_comment["text"] = sanitize_text(normalized_comment["text"])
if not normalized_comment["text"]:
continue
if normalized_comment["id"] in seen_ids:
continue
results.append(normalized_comment)
seen_ids.add(normalized_comment["id"])
match_count += 1
if len(results) >= limit:
break
if len(results) >= limit:
break
# Respect rate limits
if match_count:
time.sleep(1)
except Exception as exc: # noqa: BLE001
raise ServiceError(f"Error while fetching Reddit data: {exc}") from exc
return results

View File

@@ -0,0 +1,138 @@
"""Trustpilot scraping service with polite crawling safeguards."""
from __future__ import annotations
import time
from datetime import datetime, timezone
from typing import Dict, List
from urllib.parse import urlencode
from urllib.robotparser import RobotFileParser
import requests
from bs4 import BeautifulSoup
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from .utils import (
NormalizedItem,
ServiceError,
ServiceWarning,
ensure_timezone,
random_user_agent,
sanitize_text,
)
BASE_URL = "https://www.trustpilot.com"
SEARCH_PATH = "/search"
class BlockedError(ServiceWarning):
"""Raised when Trustpilot blocks the scraping attempt."""
def _check_robots(user_agent: str) -> None:
parser = RobotFileParser()
parser.set_url(f"{BASE_URL}/robots.txt")
parser.read()
if not parser.can_fetch(user_agent, SEARCH_PATH):
raise ServiceWarning(
"Trustpilot robots.txt disallows scraping the search endpoint. "
"Please use the official API or upload data manually."
)
@retry(
reraise=True,
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=8),
retry=retry_if_exception_type((requests.RequestException, BlockedError)),
)
def _fetch_page(session: requests.Session, user_agent: str, page: int, brand: str, language: str) -> str:
params = {"query": brand, "page": page}
if language:
params["languages"] = language
url = f"{BASE_URL}{SEARCH_PATH}?{urlencode(params)}"
response = session.get(
url,
headers={"User-Agent": user_agent, "Accept-Language": language or "en"},
timeout=20,
)
if response.status_code in (401, 403):
raise BlockedError("Trustpilot denied access (HTTP 403).")
response.raise_for_status()
return response.text
def _parse_reviews(html: str, user_agent: str) -> List[NormalizedItem]:
soup = BeautifulSoup(html, "html.parser")
cards = soup.select("article[data-service-review-card-layout]")
items: List[NormalizedItem] = []
now = datetime.now(timezone.utc)
for card in cards:
link = card.select_one("a.link_internal__YpiJI")
url = f"{BASE_URL}{link['href']}" if link and link.get("href") else ""
title_el = card.select_one("h2")
title = title_el.get_text(strip=True) if title_el else ""
text_el = card.select_one("[data-review-description-typography]")
text = text_el.get_text(separator=" ", strip=True) if text_el else ""
rating_el = card.select_one("img[alt*='stars']")
rating = rating_el["alt"] if rating_el and rating_el.get("alt") else ""
author_el = card.select_one("span.styles_consumerDetails__ZF4I6")
author = author_el.get_text(strip=True) if author_el else None
date_el = card.select_one("time")
timestamp = now
if date_el and date_el.get("datetime"):
try:
timestamp = datetime.fromisoformat(date_el["datetime"].replace("Z", "+00:00"))
except ValueError:
timestamp = now
body = sanitize_text(f"{title}\n\n{text}")
if len(body) < 15:
continue
items.append(
NormalizedItem(
source="trustpilot",
id=card.get("data-review-id", str(hash(body))),
url=url,
author=author,
timestamp=ensure_timezone(timestamp),
text=body,
meta={
"rating": rating,
"user_agent": user_agent,
},
)
)
return items
def fetch_reviews(brand: str, language: str = "en", pages: int = 2) -> List[NormalizedItem]:
"""Scrape Trustpilot search results for recent reviews."""
if not brand:
raise ServiceWarning("Brand name is required for Trustpilot scraping.")
session = requests.Session()
user_agent = random_user_agent()
_check_robots(user_agent)
aggregated: List[NormalizedItem] = []
seen_ids: set[str] = set()
for page in range(1, pages + 1):
try:
html = _fetch_page(session, user_agent=user_agent, page=page, brand=brand, language=language)
except BlockedError as exc:
raise ServiceWarning(
"Trustpilot blocked the scraping attempt. Consider using their official API or providing CSV uploads."
) from exc
except requests.RequestException as exc: # noqa: BLE001
raise ServiceError(f"Trustpilot request failed: {exc}") from exc
page_items = _parse_reviews(html, user_agent)
for item in page_items:
if item["id"] in seen_ids:
continue
aggregated.append(item)
seen_ids.add(item["id"])
time.sleep(1.5) # gentle crawl delay
return aggregated

View File

@@ -0,0 +1,98 @@
"""Twitter (X) data collection using the v2 recent search API."""
from __future__ import annotations
import time
from datetime import datetime, timezone
from typing import Dict, List, Optional
import requests
from .utils import NormalizedItem, ServiceError, ServiceWarning, ensure_timezone, sanitize_text
SEARCH_URL = "https://api.twitter.com/2/tweets/search/recent"
def _build_query(brand: str, language: str) -> str:
terms = [brand]
if language:
terms.append(f"lang:{language}")
return " ".join(terms)
def fetch_mentions(
brand: str,
bearer_token: Optional[str],
limit: int = 25,
min_likes: int = 0,
language: str = "en",
) -> List[NormalizedItem]:
"""Fetch recent tweets mentioning the brand."""
if not bearer_token:
raise ServiceWarning(
"Twitter bearer token not provided. Add it in the sidebar to enable Twitter ingestion."
)
headers = {
"Authorization": f"Bearer {bearer_token}",
"User-Agent": "ReputationRadar/1.0",
}
params = {
"query": _build_query(brand, language),
"max_results": min(100, limit),
"tweet.fields": "author_id,created_at,lang,public_metrics",
"expansions": "author_id",
"user.fields": "name,username",
}
collected: List[NormalizedItem] = []
next_token: Optional[str] = None
while len(collected) < limit:
if next_token:
params["next_token"] = next_token
response = requests.get(SEARCH_URL, headers=headers, params=params, timeout=15)
if response.status_code == 401:
raise ServiceWarning("Twitter API authentication failed. Please verify the bearer token.")
if response.status_code == 429:
time.sleep(5)
continue
if response.status_code >= 400:
raise ServiceError(f"Twitter API error {response.status_code}: {response.text}")
payload = response.json()
data = payload.get("data", [])
includes = payload.get("includes", {})
users_index = {user["id"]: user for user in includes.get("users", [])}
for tweet in data:
created_at = datetime.fromisoformat(tweet["created_at"].replace("Z", "+00:00"))
author_info = users_index.get(tweet["author_id"], {})
item = NormalizedItem(
source="twitter",
id=tweet["id"],
url=f"https://twitter.com/{author_info.get('username','')}/status/{tweet['id']}",
author=author_info.get("username"),
timestamp=ensure_timezone(created_at),
text=sanitize_text(tweet["text"]),
meta={
"likes": tweet.get("public_metrics", {}).get("like_count", 0),
"retweets": tweet.get("public_metrics", {}).get("retweet_count", 0),
"replies": tweet.get("public_metrics", {}).get("reply_count", 0),
"quote_count": tweet.get("public_metrics", {}).get("quote_count", 0),
},
)
if not item["text"]:
continue
if item["meta"]["likes"] < min_likes:
continue
collected.append(item)
if len(collected) >= limit:
break
next_token = payload.get("meta", {}).get("next_token")
if not next_token:
break
time.sleep(1) # stay friendly to rate limits
return collected[:limit]

View File

@@ -0,0 +1,217 @@
"""Utility helpers for ReputationRadar services."""
from __future__ import annotations
import json
import logging
import os
import random
import re
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, TypedDict
from bs4 import BeautifulSoup
from fuzzywuzzy import fuzz
LOG_FILE = Path(__file__).resolve().parents[1] / "logs" / "app.log"
MIN_TEXT_LENGTH = 15
SIMILARITY_THRESHOLD = 90
class NormalizedItem(TypedDict):
"""Canonical representation of a fetched mention."""
source: str
id: str
url: str
author: Optional[str]
timestamp: datetime
text: str
meta: Dict[str, object]
class ServiceError(RuntimeError):
"""Raised when a service hard fails."""
class ServiceWarning(RuntimeError):
"""Raised for recoverable issues that should surface to the UI."""
def initialize_logger(name: str = "reputation_radar") -> logging.Logger:
"""Configure and return a module-level logger."""
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
handlers=[
logging.FileHandler(LOG_FILE, encoding="utf-8"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
return logger
def load_sample_items(name: str) -> List[NormalizedItem]:
"""Load demo data from the samples directory."""
samples_dir = Path(__file__).resolve().parents[1] / "samples"
sample_path = samples_dir / f"{name}.json"
if not sample_path.exists():
return []
with sample_path.open("r", encoding="utf-8") as handle:
raw_items = json.load(handle)
cleaned: List[NormalizedItem] = []
for item in raw_items:
try:
cleaned.append(
NormalizedItem(
source=item["source"],
id=str(item["id"]),
url=item.get("url", ""),
author=item.get("author"),
timestamp=datetime.fromisoformat(item["timestamp"]),
text=item["text"],
meta=item.get("meta", {}),
)
)
except (KeyError, ValueError):
continue
return cleaned
def strip_html(value: str) -> str:
"""Remove HTML tags and normalize whitespace."""
if not value:
return ""
soup = BeautifulSoup(value, "html.parser")
text = soup.get_text(separator=" ", strip=True)
text = re.sub(r"\s+", " ", text)
text = text.encode("utf-8", "ignore").decode("utf-8", "ignore")
return text.strip()
def sanitize_text(value: str) -> str:
"""Clean text and remove excessive noise."""
text = strip_html(value)
text = re.sub(r"http\S+", "", text) # drop inline URLs
text = re.sub(r"\s{2,}", " ", text)
return text.strip()
def drop_short_items(items: Iterable[NormalizedItem], minimum_length: int = MIN_TEXT_LENGTH) -> List[NormalizedItem]:
"""Filter out items that are too short to analyze."""
return [
item
for item in items
if len(item["text"]) >= minimum_length
]
def fuzzy_deduplicate(items: Sequence[NormalizedItem], threshold: int = SIMILARITY_THRESHOLD) -> List[NormalizedItem]:
"""Remove duplicates based on URL or fuzzy text similarity."""
seen_urls: set[str] = set()
deduped: List[NormalizedItem] = []
for item in items:
url = item.get("url") or ""
text = item.get("text") or ""
if url and url in seen_urls:
continue
duplicate_found = False
for existing in deduped:
if not text or not existing.get("text"):
continue
if fuzz.token_set_ratio(text, existing["text"]) >= threshold:
duplicate_found = True
break
if not duplicate_found:
deduped.append(item)
if url:
seen_urls.add(url)
return deduped
def normalize_items(items: Sequence[NormalizedItem]) -> List[NormalizedItem]:
"""Apply sanitization, deduplication, and drop noisy entries."""
sanitized: List[NormalizedItem] = []
for item in items:
cleaned_text = sanitize_text(item.get("text", ""))
if len(cleaned_text) < MIN_TEXT_LENGTH:
continue
sanitized.append(
NormalizedItem(
source=item["source"],
id=item["id"],
url=item.get("url", ""),
author=item.get("author"),
timestamp=item["timestamp"],
text=cleaned_text,
meta=item.get("meta", {}),
)
)
return fuzzy_deduplicate(sanitized)
def parse_date_range(option: str) -> datetime:
"""Return a UTC timestamp threshold for the given range identifier."""
now = datetime.now(timezone.utc)
option = option.lower()
delta = {
"24h": timedelta(days=1),
"7d": timedelta(days=7),
"30d": timedelta(days=30),
}.get(option, timedelta(days=7))
return now - delta
def random_user_agent() -> str:
"""Return a random user agent string for polite scraping."""
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_3) AppleWebKit/605.1.15 "
"(KHTML, like Gecko) Version/16.4 Safari/605.1.15",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:125.0) Gecko/20100101 Firefox/125.0",
]
return random.choice(user_agents)
def chunked(iterable: Sequence[str], size: int) -> Iterator[Sequence[str]]:
"""Yield successive chunks from iterable."""
for start in range(0, len(iterable), size):
yield iterable[start : start + size]
def validate_openai_key(api_key: Optional[str]) -> Tuple[Optional[str], List[str]]:
"""Validate an OpenAI key following the guidance from day1 notebook."""
warnings: List[str] = []
if not api_key:
warnings.append("No OpenAI API key detected. VADER fallback will be used.")
return None, warnings
if not api_key.startswith("sk-"):
warnings.append(
"Provided OpenAI API key does not start with the expected prefix (sk-)."
)
if api_key.strip() != api_key:
warnings.append("OpenAI API key looks like it has leading or trailing whitespace.")
api_key = api_key.strip()
return api_key, warnings
def ensure_timezone(ts: datetime) -> datetime:
"""Guarantee timestamps are timezone-aware in UTC."""
if ts.tzinfo is None:
return ts.replace(tzinfo=timezone.utc)
return ts.astimezone(timezone.utc)
def safe_int(value: Optional[object], default: int = 0) -> int:
"""Convert a value to int with a fallback."""
try:
return int(value) # type: ignore[arg-type]
except (TypeError, ValueError):
return default

View File

@@ -0,0 +1,6 @@
import pathlib
import sys
PROJECT_ROOT = pathlib.Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))

View File

@@ -0,0 +1,19 @@
import pytest
from services import llm
from services.utils import ServiceWarning
def test_llm_fallback_uses_vader():
service = llm.LLMService(api_key=None)
results = service.classify_sentiment_batch(
["I absolutely love this product!", "This is the worst experience ever."]
)
assert results[0].label == "positive"
assert results[1].label == "negative"
def test_summary_requires_openai_key():
service = llm.LLMService(api_key=None)
with pytest.raises(ServiceWarning):
service.summarize_overall([{"label": "positive", "text": "Example"}])

View File

@@ -0,0 +1,35 @@
import datetime as dt
from services import utils
def test_normalize_items_deduplicates():
ts = dt.datetime(2025, 1, 1, tzinfo=dt.timezone.utc)
items = [
utils.NormalizedItem(
source="reddit",
id="1",
url="https://example.com/a",
author="alice",
timestamp=ts,
text="ReputationRadar is great!",
meta={},
),
utils.NormalizedItem(
source="reddit",
id="2",
url="https://example.com/a",
author="bob",
timestamp=ts,
text="ReputationRadar is great!",
meta={},
),
]
cleaned = utils.normalize_items(items)
assert len(cleaned) == 1
def test_sanitize_text_removes_html():
raw = "<p>Hello <strong>world</strong> &nbsp; <a href='https://example.com'>link</a></p>"
cleaned = utils.sanitize_text(raw)
assert cleaned == "Hello world link"