Week 5 Investor Relations bot with RAG

This commit is contained in:
Eleftherios Chaniotakis
2025-07-21 23:53:30 +03:00
parent 96b796dfc9
commit e225095eaf
4 changed files with 1089 additions and 0 deletions

View File

@@ -0,0 +1,101 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "18d85036",
"metadata": {},
"source": [
"\n",
"![image](img/spider_bot.png)\n",
"\n",
"## Investor Relations Web Scraping bot\n",
"This code will pop up a Gradio interface to start scraping a website. This is a utility notebook, created to quickly gather documents from IR sites to create a KB. \n",
"I've tuned the scraper to go through the Investor Relations tree of a company website and save all documents with extensions (xls, pdf, word, etc), but not the HTML content.\n",
"\n",
"Due to the way scrapy works with async loops, I had to make a separate script and run it as a subprocess, in order for it to work in a Jupyter notebook.\n",
"\n",
"Can be used to scrape multiple websites (one at a time). Saves scraped files in a kb/{domain} subdirectory (it does **not** preserve website tree structure)\n",
"\n",
"Uses **spider_runner.py**, which needs to be in the same directory as the notebook (will check and abort if not present).\n",
"\n",
"\n",
"### Scraping logic\n",
"scrapy does a pretty decent job of getting the necessary files, although some dynamic sites will not yield the best results. For a more robust scraper I probably need to move to Selenium in a future upgrade. Still, the tool is quite practical for many occasions, as many companies keep their IR websites static. You may need to tweak the follow-on link scraping patterns, I have kept it very simple (it will follow whatever link has 'investor-relations/' in it and limit the links to follow per page to avoid infinite scraping)\n",
"\n",
"In a real application environment we would be running the spider class inside the application - this would enable simpler real-time updates in the output. For an interactive notebook I find this approach sufficient enough."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "69f99b6a",
"metadata": {},
"outputs": [],
"source": [
"import subprocess, os, sys\n",
"import gradio as gr\n",
"from urllib.parse import urlparse, urljoin\n",
"\n",
"\n",
"# from urllib.parse import urljoin, urlparse\n",
"# from scrapy.crawler import CrawlerRunner\n",
"# from scrapy.utils.log import configure_logging\n",
"# from twisted.internet import reactor, defer\n",
"# import asyncio\n",
"\n",
"is_scraper_completed = False # global variable to check if the scraper has completed\n",
"status_value= \"Ready\"\n",
"\n",
"with gr.Blocks() as scraper_ui:\n",
" gr.Markdown(\"## Web Scraper\")\n",
" gr.Markdown(\"This is a simple web scraper that can be used to scrape investor relations pages.\")\n",
" \n",
" url = gr.Textbox(label=\"Enter URL\", placeholder=\"https://example.com\")\n",
" \n",
" status = gr.Textbox(label=\"Status\", interactive=False, value=\"Ready to scrape. Enter a URL and press Enter.\", lines=5)\n",
"\n",
" def run_scraper(url):\n",
" # Run the spider as a subprocess\n",
" if not url.startswith(\"http\"):\n",
" url = \"http://\" + url\n",
" # Extract the domain from the URL\n",
" parsed_url = urlparse(url)\n",
" domain = parsed_url.netloc.replace(\"www.\", \"\")\n",
" if not domain:\n",
" return \"Invalid URL. Please enter a valid URL.\"\n",
" # Check if the spider_runner.py file exists\n",
" if not os.path.exists('spider_runner.py'):\n",
" return \"Error: spider_runner.py not found. Please ensure it is in the current directory.\"\n",
" # Run the spider using subprocess\n",
" try:\n",
" result = subprocess.run([sys.executable, 'spider_runner.py', url, domain], check=True, text=True, capture_output=True)\n",
" status_value = f\"Scraping completed for {url}.\"\n",
" is_scraper_completed = True # Set the global variable to True\n",
" return result.stderr, status_value\n",
" except subprocess.CalledProcessError as e:\n",
" is_scraper_completed = True\n",
" status_value = \"Error during scraping. Check the logs for details.\"\n",
" return f\"Error: {e}\", status_value\n",
" \n",
" output = gr.Textbox(label=\"Output\", interactive=False)\n",
" \n",
" url.submit(run_scraper, inputs=url, outputs=[output,status]) \n",
"\n",
"scraper_ui.launch(inbrowser=True)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "llms",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -0,0 +1,303 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "2f01b288",
"metadata": {},
"source": [
"# RAG personal bot\n",
"\n",
"Exercise for week 5 of LLM Engineering course.\n",
"\n",
"This notebook will create a personal RAG bot. It will use a the ./kb directory to store the files that we want to include in the RAG. Subdirectories will be used to denote categories for the files.\n",
"**Important: only one level of subdirectories will be used for the categories**\n",
"\n",
"It uses LangChain to create and process the RAG pipeline and chat.\n",
"The voector database persistent sotre is in the ./vdb folder. \n",
"\n",
"In this version we use chromadb for the vector store.\n",
"The store is recreated each run. This is not efficient for large datasets. \n",
"\n",
"Future upgrades - To Do (in no particular order): \n",
"- [X] Create a fully local version for security and privacy\n",
"- [ ] Create persistent data store - only load, chunk and embed changed documents. \n",
"- [ ] Provide selection of vector db engines (Chroma DB as default, or connect to external vector db e.g. ElasticSearch or AWS Opensearch)\n",
"- [ ] Add an interface to upload documents in data store - including user-defined metadata tags\n",
"- [ ] Add more document data types\n",
"- [ ] Add online search capability - use web crawler tool to crawl a website and create website-specific RAG bot\n",
"- [ ] Read e-mails/calendars/online docs (Amazon S3 bucket, Google Drive)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6dfe8e48",
"metadata": {},
"outputs": [],
"source": [
"# These were necessary as langchain does not install them by default\n",
"!pip install pypdf\n",
"!pip install pdfminer.six\n",
"!pip install python-docx\n",
"!pip install docx2txt"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "193171c0",
"metadata": {},
"outputs": [],
"source": [
"# imports\n",
"\n",
"import os\n",
"import glob\n",
"from dotenv import load_dotenv\n",
"import gradio as gr\n",
"\n",
"# imports for langchain, plotly and Chroma\n",
"# plotly is commented out, as it is not used in the current code\n",
"\n",
"from langchain.document_loaders import DirectoryLoader, TextLoader, PDFMinerLoader, Docx2txtLoader\n",
"from langchain.text_splitter import RecursiveCharacterTextSplitter\n",
"# from langchain.schema import Document\n",
"from langchain_openai import OpenAIEmbeddings, ChatOpenAI\n",
"from langchain_chroma import Chroma\n",
"#import matplotlib.pyplot as plt\n",
"#from sklearn.manifold import TSNE\n",
"#import numpy as np\n",
"#import plotly.graph_objects as go\n",
"from langchain.memory import ConversationBufferMemory\n",
"from langchain.chains import ConversationalRetrievalChain\n",
"# from langchain.embeddings import HuggingFaceEmbeddings"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d22d2e48",
"metadata": {},
"outputs": [],
"source": [
"MODEL = \"gpt-4o-mini\"\n",
"db_name = \"vdb\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fc23bf8c",
"metadata": {},
"outputs": [],
"source": [
"# Load environment variables in a file called .env\n",
"\n",
"load_dotenv(override=True)\n",
"os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY', 'your-key-if-not-using-env')\n"
]
},
{
"cell_type": "markdown",
"id": "0103ef35",
"metadata": {},
"source": [
"## Loading the documents\n",
"In the code below we read in the KB documents and create the vector store. \n",
"We will be adding PDF documents, Word documents and text/markdown documents.\n",
"Each document has its own loader, which we are calling separately through DirectoryLoader.\n",
"At the end, we are combining the results, and then start splitting the documents using the Recursive Character Text Splitter."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2f20fd20",
"metadata": {},
"outputs": [],
"source": [
"# Read in documents using LangChain's loaders\n",
"# Take everything in all the sub-folders of our knowledgebase\n",
"\n",
"folders = glob.glob(\"kb/*\")\n",
"print(f\"Found {len(folders)} folders in the knowledge base.\")\n",
"\n",
"def add_metadata(doc, doc_type):\n",
" doc.metadata[\"doc_type\"] = doc_type\n",
" return doc\n",
"\n",
"# For text files\n",
"text_loader_kwargs = {'encoding': 'utf-8'}\n",
"\n",
"documents = []\n",
"for folder in folders:\n",
" print(f\"Loading documents from folder: {folder}\")\n",
" doc_type = os.path.basename(folder)\n",
" # PDF Loader\n",
" pdf_loader = DirectoryLoader(folder, glob=\"**/*.pdf\", loader_cls=PDFMinerLoader)\n",
" # Text loaders\n",
" txt_loader = DirectoryLoader(folder, glob=\"**/*.txt\", loader_cls=TextLoader, loader_kwargs=text_loader_kwargs)\n",
" md_loader = DirectoryLoader(folder, glob=\"**/*.md\", loader_cls=TextLoader, loader_kwargs=text_loader_kwargs)\n",
" # Load MS Word documents - UnstructuredWordDocumentLoader does not play well with numpy > 1.24.0, and we use Docx2txtLoader instead. \n",
" # doc_loader = DirectoryLoader(folder, glob=\"**/*.doc\", loader_cls=UnstructuredWordDocumentLoader)\n",
" docx_loader = DirectoryLoader(folder, glob=\"**/*.docx\", loader_cls=Docx2txtLoader)\n",
" # document doc_type is used to identify the type of document\n",
" # Load documents from PDF, text and word files and combine the results\n",
" pdf_docs = pdf_loader.load()\n",
" print(f\"Loaded {len(pdf_docs)} PDF documents from {folder}\")\n",
" text_docs = txt_loader.load() + md_loader.load()\n",
" print(f\"Loaded {len(text_docs)} text documents from {folder}\")\n",
" word_docs = docx_loader.load()\n",
" print(f\"Loaded {len(word_docs)} Word documents from {folder}\")\n",
" folder_docs = pdf_docs + text_docs + word_docs\n",
" # Add metadata to each document\n",
" if not folder_docs:\n",
" print(f\"No documents found in folder: {folder}\")\n",
" continue\n",
" documents.extend([add_metadata(doc, doc_type) for doc in folder_docs])\n",
"\n",
"# Split the documents into chunks\n",
"text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)\n",
"chunks = text_splitter.split_documents(documents)\n",
"\n",
"# Print out some basic info for the loaded documents and chunks\n",
"print(f\"Total number of documents: {len(documents)}\")\n",
"print(f\"Total number of chunks: {len(chunks)}\")\n",
"print(f\"Document types found: {set(doc.metadata['doc_type'] for doc in documents)}\")\n"
]
},
{
"cell_type": "markdown",
"id": "021cadc7",
"metadata": {},
"source": [
"## Vector Store\n",
"\n",
"We use Chromadb for vector store\n",
"Same code as the one in the lesson notebook, minus the visualization part"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "efc70e3a",
"metadata": {},
"outputs": [],
"source": [
"# embeddings = OpenAIEmbeddings()\n",
"\n",
"# If you would rather use the free Vector Embeddings from HuggingFace sentence-transformers\n",
"# Then replace embeddings = OpenAIEmbeddings()\n",
"# with:\n",
"from langchain.embeddings import HuggingFaceEmbeddings\n",
"embeddings = HuggingFaceEmbeddings(model_name=\"sentence-transformers/all-MiniLM-L6-v2\")\n",
"\n",
"# Delete if already exists\n",
"\n",
"if os.path.exists(db_name):\n",
" Chroma(persist_directory=db_name, embedding_function=embeddings).delete_collection()\n",
"\n",
"# Create vectorstore\n",
"\n",
"vectorstore = Chroma.from_documents(documents=chunks, embedding=embeddings, persist_directory=db_name)\n",
"print(f\"Vectorstore created with {vectorstore._collection.count()} documents\")\n",
"\n",
"# Let's investigate the vectors\n",
"\n",
"collection = vectorstore._collection\n",
"count = collection.count()\n",
"\n",
"sample_embedding = collection.get(limit=1, include=[\"embeddings\"])[\"embeddings\"][0]\n",
"dimensions = len(sample_embedding)\n",
"print(f\"There are {count:,} vectors with {dimensions:,} dimensions in the vector store\")"
]
},
{
"cell_type": "markdown",
"id": "c9af1d32",
"metadata": {},
"source": [
"## LangChain\n",
"Create Langchain chat, memory and retrievers.\n",
"\n",
"Note: for this localized version, Gemma3 4B worked much better than Llama 3.2, with my documents. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2360006e",
"metadata": {},
"outputs": [],
"source": [
"# create a new Chat with OpenAI\n",
"#llm = ChatOpenAI(temperature=0.7, model_name=MODEL)\n",
"\n",
"# Alternative - if you'd like to use Ollama locally, uncomment this line instead\n",
"llm = ChatOpenAI(temperature=0.7, model_name='gemma3:4b', base_url='http://localhost:11434/v1', api_key='ollama')\n",
"\n",
"# set up the conversation memory for the chat\n",
"memory = ConversationBufferMemory(memory_key='chat_history', return_messages=True)\n",
"\n",
"# the retriever is an abstraction over the VectorStore that will be used during RAG\n",
"retriever = vectorstore.as_retriever(search_kwargs={\"k\": 20}) # k is the number of documents to retrieve\n",
"\n",
"# putting it together: set up the conversation chain with the GPT 3.5 LLM, the vector store and memory\n",
"conversation_chain = ConversationalRetrievalChain.from_llm(llm=llm, retriever=retriever, memory=memory)"
]
},
{
"cell_type": "markdown",
"id": "88a21bb3",
"metadata": {},
"source": [
"## UI part\n",
"Create Gradio interface\n",
"\n",
"Simple built-in chat interface\n",
"\n",
"To Do: Add upload interface to include additional documents in data store."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0dfe7d75",
"metadata": {},
"outputs": [],
"source": [
"# Wrapping that in a function\n",
"\n",
"def chat(question, history):\n",
" result = conversation_chain.invoke({\"question\": question})\n",
" return result[\"answer\"]\n",
"\n",
"# And in Gradio:\n",
"\n",
"view = gr.ChatInterface(chat, type=\"messages\").launch(inbrowser=True)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "llms",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -0,0 +1,547 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "2f01b288",
"metadata": {},
"source": [
"![image](img/librarian_bot.png)\n",
"\n",
"# RAG bot for investor information\n",
"\n",
"Exercise for week 5 of LLM Engineering course\n",
"\n",
"Specialized bot focusing on analysing financial documents from Investor Relations webpages. \n",
"Comes together with a web crawler spider to gather documents quickly.\n",
"\n",
"This notebook will create a personal RAG bot. It will use a the ./kb directory to store the files that we want to include in the RAG. Subdirectories will be used to denote categories for the files.\n",
"**Important: only one level of subdirectories will be used for the categories**\n",
"\n",
"It uses LangChain to create and process the RAG pipeline and chat.\n",
"The vector database persistent sotre is in the ./vdb folder. \n",
"\n",
"In this version we use chromadb for the vector store.\n",
"The store is recreated each run. This is not efficient for large datasets. \n",
"\n",
"Future upgrades - To Do (in no particular order): \n",
"- [x] Create a fully local version for security and privacy (*see v01_local*) <span style=\"color:orange\">\n",
" NOTE: will require a fairly advanced LLM to answer questions without losing context. 2-4bn parameters LLM's struggle and tend to hallucinate. Best options are gpt-4o-mini and claude-3.5-haiku.</span>\n",
"- [x] Fine tune the pdf scraper to handle financial reports better\n",
"- [x] Create custom retriever for financial information\n",
"- [ ] Create persistent data store between runs - only load, chunk and embed changed documents. \n",
"- [ ] Provide selection of vector db engines (Chroma DB as default, or connect to external vector db e.g. ElasticSearch or AWS Opensearch)\n",
"- [ ] Add an interface to upload documents in data store - including user-defined metadata tags\n",
"- [ ] Multimodality: Process more document data types (e.g. ppt) \n",
"- [x] Add online search capability - use web crawler tool to crawl a website and create website-specific RAG bot\n",
"- [ ] Read e-mails/calendars/online docs (Amazon S3 bucket, Google Drive)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6dfe8e48",
"metadata": {},
"outputs": [],
"source": [
"# These were necessary as langchain does not install them by default\n",
"# !pip install pypdf\n",
"# !pip install pdfminer.six\n",
"# !pip install python-docx\n",
"!pip install docx2txt\n",
"!pip install pymupdf4llm"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "193171c0",
"metadata": {},
"outputs": [],
"source": [
"# imports\n",
"\n",
"import os\n",
"import glob\n",
"from dotenv import load_dotenv\n",
"import gradio as gr\n",
"\n",
"# imports for langchain, plotly and Chroma\n",
"# plotly is commented out, as it is not used in the current code\n",
"\n",
"from langchain.document_loaders import DirectoryLoader, TextLoader, PDFMinerLoader, Docx2txtLoader\n",
"from langchain.text_splitter import RecursiveCharacterTextSplitter\n",
"# from langchain.schema import Document\n",
"from langchain_openai import OpenAIEmbeddings, ChatOpenAI\n",
"from langchain_chroma import Chroma\n",
"#import matplotlib.pyplot as plt\n",
"#from sklearn.manifold import TSNE\n",
"#import numpy as np\n",
"#import plotly.graph_objects as go\n",
"from langchain.memory import ConversationBufferMemory\n",
"from langchain.chains import ConversationalRetrievalChain\n",
"from langchain.embeddings import HuggingFaceEmbeddings"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d22d2e48",
"metadata": {},
"outputs": [],
"source": [
"MODEL = \"gpt-4o-mini\"\n",
"db_name = \"vdb\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fc23bf8c",
"metadata": {},
"outputs": [],
"source": [
"# Load environment variables in a file called .env\n",
"\n",
"load_dotenv(override=True)\n",
"os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY', 'your-key-if-not-using-env')\n"
]
},
{
"cell_type": "markdown",
"id": "0103ef35",
"metadata": {},
"source": [
"## Loading the documents\n",
"\n",
"In the code below we read in the KB documents and create the vector store. \n",
"We will be adding PDF documents, Word documents and text/markdown documents. \n",
"Each document has its own loader, which we are calling separately through DirectoryLoader.\n",
"For PDF we implement custom loader to manage financial data. \n",
"\n",
"At the end, we are combining the results, and then start splitting the documents using the Recursive Character Text Splitter.\n",
"\n",
"This approach is not optimal for financial tables.\n",
"TO DO:\n",
" - [x] Replace splitter with better technique that preserves tables.\n",
" - [x] Replace PDF Reader with pymupdf4llm"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "918cbbf0",
"metadata": {},
"outputs": [],
"source": [
"# Utility functions for EU financial reporting (read from PDF)\n",
"# We're using pymupdf4llm for better handling of financial reports\n",
"# This function does not utilize a loader class, but directly processes the PDF file\n",
"# It extracts financial sections and returns them as Document objects\\\n",
"\n",
"import pymupdf4llm\n",
"from langchain.schema import Document\n",
"import re\n",
"import string\n",
"from pathlib import Path\n",
"\n",
"def extract_eu_financial_reports(pdf_path):\n",
" \"\"\"\n",
" Extracts financial sections from an EU financial report PDF using pymupdf4llm.\n",
"\n",
" Args:\n",
" pdf_path (str): Path to the PDF file.\n",
"\n",
" Returns:\n",
" list[Document]: A list of LangChain Document objects, each representing a detected financial section\n",
" (e.g., income statement, balance sheet, cash flow statement, etc.) with associated metadata.\n",
"\n",
" The function processes the PDF, detects section headers based on common financial report section names,\n",
" and splits the content accordingly. Each Document contains the section text and metadata including section name,\n",
" content type, source file, and page range.\n",
" \"\"\"\n",
" md_text = pymupdf4llm.to_markdown(\n",
" pdf_path,\n",
" page_chunks=True, # Preserve page boundaries\n",
" write_images=False,\n",
" embed_images=False\n",
" )\n",
" \n",
" # EU financial reports have predictable structures\n",
" financial_sections = [\n",
" \"consolidated income statement\", \"profit and loss\", \"p&l\", \"remuneration report\",\n",
" \"balance sheet\", \"cash flow statement\", \"statement of financial position\",\n",
" \"notes to the consolidated financial statements\", \"segment reporting\",\n",
" \"risk management\", \"capital adequacy\", \"basel\", \"ifrs\", \"regulatory capital\"\n",
" ]\n",
" \n",
" documents = []\n",
" current_section = None\n",
" current_content = \"\"\n",
" start_page = 1\n",
" \n",
" for page_dict in md_text:\n",
" # Extract the actual text content from the dictionary\n",
" page_content = page_dict.get(\"text\", \"\")\n",
" page_num = page_dict.get(\"page\", start_page)\n",
"\n",
" # Detect financial section headers\n",
" content_lower = page_content.lower()\n",
" detected_section = None\n",
" \n",
" for section in financial_sections:\n",
" if section in content_lower:\n",
" detected_section = section\n",
" break\n",
" \n",
" # Process section changes\n",
" if detected_section and detected_section != current_section:\n",
" if current_content:\n",
" # Save previous section\n",
" documents.append(Document(\n",
" page_content=current_content.strip(),\n",
" metadata={\n",
" \"content_type\": \"financial_statement\",\n",
" \"section\": current_section or \"general\",\n",
" \"source\": pdf_path,\n",
" \"pages\": f\"{start_page}-{page_num-1}\"\n",
" }\n",
" ))\n",
" current_section = detected_section\n",
" current_content = page_content\n",
" else:\n",
" current_content += \"\\n---\\n\" + page_content\n",
" \n",
" # Handle final section\n",
" if current_content:\n",
" documents.append(Document(\n",
" page_content=current_content.strip(),\n",
" metadata={\n",
" \"content_type\": \"financial_statement\",\n",
" \"section\": current_section or \"general\",\n",
" \"source\": pdf_path,\n",
" \"pages\": f\"{start_page}-{page_num}\"\n",
" }\n",
" ))\n",
" \n",
" return documents\n",
"\n",
"# Utility functions for loading documents from a folder\n",
"def load_eu_financial_reports_from_directory(directory_path: str, glob_pattern: str = \"*.pdf\"):\n",
" \"\"\"\n",
" Load and process all EU financial reports from a directory.\n",
"\n",
" Args:\n",
" directory_path (str): Path to the directory containing PDF files\n",
" glob_pattern (str, optional): Pattern to match PDF files. Defaults to \"*.pdf\"\n",
"\n",
" Returns:\n",
" list[Document]: A list of LangChain Document objects containing the extracted financial sections\n",
" from all successfully processed PDFs in the directory.\n",
"\n",
" The function iterates through PDF files in the specified directory that match the glob pattern,\n",
" processes each file using extract_eu_financial_reports(), and combines the results into a single list.\n",
" Files that cannot be processed are skipped with an error message printed to stdout.\n",
" \"\"\"\n",
" all_documents = []\n",
" directory = Path(directory_path)\n",
" \n",
" for pdf_file in directory.glob(glob_pattern):\n",
" try:\n",
" documents = extract_eu_financial_reports(str(pdf_file))\n",
" all_documents.extend(documents)\n",
" except Exception as e:\n",
" print(f\"Error processing {pdf_file}: {e}\")\n",
" \n",
" return all_documents\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2f20fd20",
"metadata": {},
"outputs": [],
"source": [
"# Read in documents using LangChain's loaders\n",
"# Take everything in all the sub-folders of our knowledgebase\n",
"\n",
"folders = glob.glob(\"kb/*\")\n",
"print(f\"Found {len(folders)} folders in the knowledge base.\")\n",
"\n",
"def add_metadata(doc, doc_type):\n",
" doc.metadata[\"doc_type\"] = doc_type\n",
" return doc\n",
"\n",
"# For text files\n",
"text_loader_kwargs = {'encoding': 'utf-8'}\n",
"\n",
"documents = []\n",
"for folder in folders:\n",
" print(f\"Loading documents from folder: {folder}\")\n",
" doc_type = os.path.basename(folder)\n",
" # PDF Loader\n",
" # We're not using the PDFMinerLoader as it does not handle EU financial reports well.\n",
" # Instead, we use our custom extract_eu_financial_reports function.\n",
" # Uncomment the next line if you want to use the standard loader for PDF files\n",
" # pdf_loader = DirectoryLoader(folder, glob=\"**/*.pdf\", loader_cls=extract_eu_financial_reports)\n",
" # Text loaders\n",
" txt_loader = DirectoryLoader(folder, glob=\"**/*.txt\", loader_cls=TextLoader, loader_kwargs=text_loader_kwargs)\n",
" md_loader = DirectoryLoader(folder, glob=\"**/*.md\", loader_cls=TextLoader, loader_kwargs=text_loader_kwargs)\n",
" # Load MS Word documents - UnstructuredWordDocumentLoader does not play well with numpy > 1.24.0, and we use Docx2txtLoader instead. \n",
" # doc_loader = DirectoryLoader(folder, glob=\"**/*.doc\", loader_cls=UnstructuredWordDocumentLoader)\n",
" docx_loader = DirectoryLoader(folder, glob=\"**/*.docx\", loader_cls=Docx2txtLoader)\n",
" # document doc_type is used to identify the type of document\n",
" # Load documents from PDF, text and word files and combine the results\n",
" pdf_docs = load_eu_financial_reports_from_directory(folder)\n",
" print(f\"Loaded {len(pdf_docs)} PDF documents from {folder}\")\n",
" text_docs = txt_loader.load() + md_loader.load()\n",
" print(f\"Loaded {len(text_docs)} text documents from {folder}\")\n",
" word_docs = docx_loader.load()\n",
" print(f\"Loaded {len(word_docs)} Word documents from {folder}\")\n",
" folder_docs = pdf_docs + text_docs + word_docs\n",
" # Add metadata to each document\n",
" if not folder_docs:\n",
" print(f\"No documents found in folder: {folder}\")\n",
" continue\n",
" documents.extend([add_metadata(doc, doc_type) for doc in folder_docs])\n",
"\n",
"# Split the documents into chunks\n",
"text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)\n",
"chunks = text_splitter.split_documents(documents)\n",
"\n",
"# Print out some basic info for the loaded documents and chunks\n",
"print(f\"Total number of documents: {len(documents)}\")\n",
"print(f\"Total number of chunks: {len(chunks)}\")\n",
"print(f\"Document types found: {set(doc.metadata['doc_type'] for doc in documents)}\")\n"
]
},
{
"cell_type": "markdown",
"id": "749ad5d8",
"metadata": {},
"source": [
"## Vector Store\n",
"\n",
"We use Chromadb for vector store.\n",
"\n",
"Same code as the one in the lesson notebook, minus the visualization part\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "efc70e3a",
"metadata": {},
"outputs": [],
"source": [
"#embeddings = OpenAIEmbeddings()\n",
"\n",
"# If you would rather use the free Vector Embeddings from HuggingFace sentence-transformers\n",
"# Then replace embeddings = OpenAIEmbeddings()\n",
"# with:\n",
"from langchain.embeddings import HuggingFaceEmbeddings\n",
"embeddings = HuggingFaceEmbeddings(model_name=\"sentence-transformers/all-mpnet-base-v2\") # A bit slower, but better than all-MiniLM-L6-v2 for financial documents\n",
"\n",
"# Delete if already exists\n",
"\n",
"if os.path.exists(db_name):\n",
" Chroma(persist_directory=db_name, embedding_function=embeddings).delete_collection()\n",
"\n",
"# Create vectorstore\n",
"\n",
"vectorstore = Chroma.from_documents(documents=chunks, embedding=embeddings, persist_directory=db_name)\n",
"print(f\"Vectorstore created with {vectorstore._collection.count()} documents\")\n",
"\n",
"# Let's investigate the vectors\n",
"\n",
"collection = vectorstore._collection\n",
"count = collection.count()\n",
"\n",
"sample_embedding = collection.get(limit=1, include=[\"embeddings\"])[\"embeddings\"][0]\n",
"dimensions = len(sample_embedding)\n",
"print(f\"There are {count:,} vectors with {dimensions:,} dimensions in the vector store\")"
]
},
{
"cell_type": "markdown",
"id": "c9af1d32",
"metadata": {},
"source": [
"## LangChain\n",
"Create Langchain chat, memory and retrievers.\n",
"\n",
"Trying a number of LLM's for ollama. They are not very good at sortingo out the relevant information from financial documents - they do provide results, but tend to be overly chatty and especially the specific numbers can be hallucinated or taken out of context. \n",
"\n",
"GPT-4o-mini provided much more accurate answers to specific questions, even with huggingface's embeddings for the vector store. \n",
"\n",
"Implemented (with Claude's help) a custom retriever and prompt to focus on financial statement analysis.\n",
"\n",
"### OpenAI rate limits\n",
"*Note*: If using OpenAI's embeddings, there's a limit of 300K tokens per request. This requires special handling when calling Chroma.from_documents.\n",
"###TO DO:\n",
"- [ ] Add rate limiter for encoding documents and encode in batches."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "59f75e5d",
"metadata": {},
"outputs": [],
"source": [
"# Specialized Retriever for consolidated financials\n",
"\n",
"from langchain.schema import BaseRetriever, Document\n",
"from typing import List\n",
"\n",
"from langchain.vectorstores.base import VectorStoreRetriever\n",
"\n",
"class EUFinancialRetriever(VectorStoreRetriever):\n",
" def _get_relevant_documents(self, query: str, *, run_manager=None) -> List[Document]:\n",
" query_lower = query.lower()\n",
" k = self.search_kwargs.get(\"k\", 5)\n",
" \n",
" # Section-aware search logic\n",
" section_queries = {\n",
" 'income': ['income', 'revenue', 'profit', 'earnings'],\n",
" 'balance': ['balance', 'assets', 'liabilities', 'equity'],\n",
" 'cash': ['cash flow', 'operating cash', 'free cash']\n",
" }\n",
" \n",
" for section, terms in section_queries.items():\n",
" if any(term in query_lower for term in terms):\n",
" try:\n",
" return self.vectorstore.similarity_search(\n",
" query, k=k, filter={\"section\": section}\n",
" )\n",
" except:\n",
" break\n",
" \n",
" # Fallback to standard search\n",
" return self.vectorstore.similarity_search(query, k=k)\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "aca30d15",
"metadata": {},
"outputs": [],
"source": [
"# Specialized prompt for the retriever\n",
"\n",
"financial_prompt = \"\"\"\n",
"You are analyzing EU bank and corporate financial statements. When answering:\n",
"\n",
"1. For numerical data, ALWAYS cite the specific financial statement section\n",
"2. Consider regulatory context (IFRS, Basel III for banks)\n",
"3. Note if data spans multiple periods or segments\n",
"4. Highlight any footnotes or adjustments mentioned\n",
"5. Be precise about currency and units (EUR millions, thousands, etc.)\n",
"\n",
"Context from financial statements:\n",
"{context}\n",
"\n",
"Question: {question}\n",
"\n",
"Answer:\n",
"\"\"\"\n",
"# Updated chain with financial-aware prompt\n",
"from langchain.prompts import PromptTemplate\n",
"\n",
"prompt = PromptTemplate(\n",
" input_variables=[\"context\", \"question\"],\n",
" template=financial_prompt\n",
")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2360006e",
"metadata": {},
"outputs": [],
"source": [
"# create a new Chat with OpenAI\n",
"llm = ChatOpenAI(temperature=0.7, model_name=MODEL)\n",
"\n",
"# Alternative - if you'd like to use Ollama locally, uncomment this line instead\n",
"#llm = ChatOpenAI(temperature=0.7, model_name='gemma3:4b', base_url='http://localhost:11434/v1', api_key='ollama')\n",
"\n",
"# set up the conversation memory for the chat\n",
"memory = ConversationBufferMemory(memory_key='chat_history', return_messages=True)\n",
"\n",
"# the retriever is an abstraction over the VectorStore that will be used during RAG\n",
"retriever = EUFinancialRetriever(\n",
" vectorstore=vectorstore, \n",
" search_kwargs={\"k\": 5}\n",
")\n",
"# putting it together: set up the conversation chain with the GPT 3.5 LLM, the vector store and memory\n",
"conversation_chain = ConversationalRetrievalChain.from_llm(\n",
" llm=llm, \n",
" retriever=retriever, \n",
" memory=memory, \n",
" combine_docs_chain_kwargs={\"prompt\": prompt},\n",
" return_source_documents=False\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "88a21bb3",
"metadata": {},
"source": [
"## UI part\n",
"Create Gradio interface\n",
"\n",
"Simple built-in chat interface\n",
"\n",
"###To Do: \n",
"- [ ] Add model selector for Claude 3.5 Haiku\n",
"- [ ] Update interface to handle sources (with **return_source_documents=True**)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0dfe7d75",
"metadata": {},
"outputs": [],
"source": [
"# Wrapping that in a function\n",
"\n",
"def chat(question, history):\n",
" result = conversation_chain.invoke({\"question\": question})\n",
" return result[\"answer\"]\n",
"\n",
"# And in Gradio:\n",
"\n",
"view = gr.ChatInterface(chat, type=\"messages\").launch(inbrowser=True)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "llms",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -0,0 +1,138 @@
import scrapy
import os
from urllib.parse import urljoin, urlparse
from scrapy.crawler import CrawlerProcess
class IRWebSpider(scrapy.Spider):
name= 'ir_web_spider'
custom_settings = {
'LOG_LEVEL': 'INFO', # DEBUG, INFO, WARNING, ERROR
'DOWNLOAD_DELAY': 1, # Be nice to the server
'ROBOTSTXT_OBEY': True,
}
num_pages = 10 # how many links to follow per page (Excluding documents)
def __init__(self, start_urls=None, allowed_domains=None, *args, **kwargs):
super(IRWebSpider, self).__init__(*args, **kwargs)
# Handle start_urls
if start_urls:
if isinstance(start_urls, str):
self.start_urls = [start_urls]
else:
self.start_urls = list(start_urls)
else:
self.start_urls = []
# Handle allowed_domains
if allowed_domains:
if isinstance(allowed_domains, str):
self.allowed_domains = [allowed_domains]
else:
self.allowed_domains = list(allowed_domains)
else:
# Auto-extract domains from start_urls if not provided
self.allowed_domains = []
for url in self.start_urls:
domain = urlparse(url).netloc
if domain and domain not in self.allowed_domains:
self.allowed_domains.append(domain)
# Log initialization
self.logger.info(f"Spider initialized with start_urls: {self.start_urls}")
self.logger.info(f"Allowed domains: {self.allowed_domains}")
def start_requests(self):
urls = self.start_urls
if not urls:
raise ValueError("No URLs provided to scrape.")
for url in urls:
self.logger.info(f"Starting request to: {url}")
yield scrapy.Request(url=url, callback=self.parse)
def parse(self, response):
self.logger.info(f"Parsing response from: {response.url}")
self.logger.info(f"Response status: {response.status}")
# Save the page content
# Extract document links with better selectors
doc_selectors = [
'a[href$=".pdf"]::attr(href)',
'a[href$=".xlsx"]::attr(href)',
'a[href$=".xls"]::attr(href)',
'a[href$=".docx"]::attr(href)',
'a[href$=".doc"]::attr(href)',
'a[href$=".pptx"]::attr(href)',
'a[href$=".ppt"]::attr(href)',
]
doc_links = []
for selector in doc_selectors:
links = response.css(selector).getall()
doc_links.extend(links)
self.logger.debug(f"Found {len(links)} links with selector: {selector}")
self.logger.info(f"Total document links found: {len(doc_links)}")
if not doc_links:
self.logger.warning("No document links found. Checking page content...")
# Log some of the page content for debugging
self.logger.debug(f"Page title: {response.css('title::text').get()}")
self.logger.debug(f"First 500 chars: {response.text[:500]}")
for link in doc_links:
full_url = urljoin(response.url, link)
self.logger.info(f"Queuing document: {full_url}")
yield scrapy.Request(
url=full_url,
callback=self.save_document
)
# Look for more investor relations pages
ir_links = response.css('a[href*="investor-relations/"]::attr(href)').getall()
for link in ir_links[:self.num_pages]: # Limit to avoid infinite crawling
full_url = urljoin(response.url, link)
if full_url != response.url: # Avoid self-loops
self.logger.info(f"Following IR link: {full_url}")
yield scrapy.Request(url=full_url, callback=self.parse)
def save_document(self, response):
"""Save the document to the local file system.
Will create a directory structure based on the domain and save the file with its original name or a hash if no name is available.
All documents are saved in the 'kb' directory."""
self.logger.info(f"Downloading document from: {response.url}")
parsed_url = urlparse(response.url)
domain = parsed_url.netloc.replace("www.", "")
filename = os.path.basename(parsed_url.path)
if not filename:
filename = f"document_{hash(response.url) % 10000}.bin"
os.makedirs(f'kb/{domain}', exist_ok=True)
filepath = f'kb/{domain}/{filename}'
with open(filepath, 'wb') as f:
f.write(response.body)
file_size = len(response.body)
self.logger.info(f"Saved document: {filepath} ({file_size} bytes)")
if __name__ == '__main__':
import sys
start_urls = sys.argv[1] if len(sys.argv) > 1 else 'http://example.com/investor-relations'
allowed_domains = sys.argv[2] if len(sys.argv) > 2 else 'example.com'
process = CrawlerProcess({
'LOG_LEVEL': 'INFO',
'DOWNLOAD_DELAY': 1,
'ROBOTSTXT_OBEY': True,
})
process.crawl(IRWebSpider,
start_urls=start_urls,
allowed_domains=allowed_domains)
process.start()