Files
LLM_Engineering_OLD/week1/community-contributions/llm-page-summarizer-with-pyppeteer/page-summarizer.py

225 lines
6.8 KiB
Python

import dotenv
import asyncio
import os
os.environ['PYPPETEER_CHROMIUM_REVISION'] = '1263111'
from rich.console import Console
from rich.markdown import Markdown
from openai import OpenAI
from openai.types.chat import ChatCompletion
from typing import Optional, Union, Dict, List
from pyppeteer import launch
from pyppeteer_stealth import stealth
from random import randint
console = Console()
class Config:
def __init__(self, filename: str = ".env"):
dotenv.load_dotenv(filename)
self._config = dotenv.dotenv_values(filename)
def get(self, key: str) -> str:
return self._config.get(key, None)
def get_int(self, key: str) -> int:
value = self.get(key)
if value is not None:
return int(value)
return None
def get_bool(self, key: str) -> bool:
value = self.get(key)
if value is not None:
return value.lower() in ("true", "1", "yes")
return None
@property
def openai_api_key(self) -> str:
return self.get("OPENAI_API_KEY")
class Website:
__url: str
__title: str
__text: str
@property
def url(self) -> str:
return self.__url
@property
def title(self) -> str:
return self.__title
@property
def text(self) -> str:
return self.__text
@url.setter
def url(self, url: str) -> None:
self.__url = url
self.__scrape()
def __scrape(self) -> None:
"""
Scrape the website using pyppeteer.
"""
import asyncio
async def main() -> None:
browser = await launch(headless=True)
page = await browser.newPage()
await stealth(page)
# randomize user agent
user_agents: List[str] = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36",
]
ua = user_agents[randint(0, len(user_agents) - 1)]
await page.setUserAgent(ua)
await page.setRequestInterception(True)
page.on("request", lambda req: asyncio.ensure_future(
req.abort() if req.resourceType == "stylesheet" else req.continue_()
))
try:
await page.goto(self.url, {"timeout": 60000})
self.__title = await page.title()
self.__text = await page.evaluate('() => document.body.innerText')
except Exception as e:
console.print(f"[red]Error scraping {self.url}: {e}[red]")
finally:
await page.close()
await browser.close()
asyncio.run(main())
def __init__(self, url: str) -> None:
self.url = url
def __str__(self) -> str:
return f"Website(url={self.url}, title=\"{self.title}\")"
class LlmSummarizer:
#region Config
__config: Config
@property
def config(self) -> Config:
if self.__config is None:
raise ValueError("Config not initialized")
return self.__config
#endregion
#region OpenAI
__openai: OpenAI = None
@property
def openai(self) -> OpenAI:
"""
Lazy load the OpenAI client. This is done to avoid creating the client if it's not needed.
"""
if self.__openai is None:
self.__openai = OpenAI(api_key=self.config.openai_api_key)
return self.__openai
#endregion
#region System behavior
__system_behavior: Dict[str, str] = None
@property
def system_behavior(self) -> Dict[str, str]:
"""
Lazy load the system behavior. This is done to avoid creating the system behavior if it's not needed.
"""
if self.__system_behavior is None:
self.__system_behavior = {
"role": "system",
"content": (
"You are an assistant that analyzes the contents of a website "
"and provides a short summary, ignoring the text that might be navigation-related."
"Respond in markdown and be concise."
)
}
return self.__system_behavior
#endregion
#region user_prompt_for
def user_prompt_for(self, website: Website) -> Dict[str, str]:
user_prompt_content: str = (
f"You are looking at the website titled \"{website.title}\""
"The content of this website is as follows; "
"please provide a short summary of this website in markdown."
"If it includes news or announcements, then summarize these too.\n\n"
f"\"\"\"\n{website.text}\n\"\"\"\n\n"
)
return {
"role": "user",
"content": user_prompt_content
}
#endregion
#region messages_for
def messages_for(self, website: Website) -> List[Dict[str, str]]:
"""
Create the messages for the OpenAI API.
"""
return [
self.system_behavior,
self.user_prompt_for(website)
]
#endregion
#region summarize
def summarize(self, website: Union[Website, str]) -> Optional[str]:
"""
Summarize the website using the OpenAI API.
"""
if isinstance(website, str):
website = Website(website)
messages: List[Dict[str, str]] = self.messages_for(website)
try:
response: ChatCompletion = self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0.2,
max_tokens=512,
)
return response.choices[0].message.content
except Exception as e:
console.print(f"[red]Error summarizing {website if isinstance(website, str) else website.url}: {e}[red]")
return None
#endregion
def __init__(self, config: Config) -> None:
self.__config = config
def display_markdown(content: str) -> None:
"""
Display the markdown content using rich.
"""
console.print(Markdown(content))
def show_summary(summary: str) -> None:
"""
Show the summary of the website using rich.
"""
if summary:
display_markdown(summary)
else:
console.print("No summary found.")
if __name__ == "__main__":
summarizer = LlmSummarizer(Config())
summary = summarizer.summarize("https://cnn.com")
show_summary(summary)