147 lines
4.8 KiB
Python
147 lines
4.8 KiB
Python
from pathlib import Path
|
|
from openai import OpenAI
|
|
from dotenv import load_dotenv
|
|
from pydantic import BaseModel, Field
|
|
from chromadb import PersistentClient
|
|
from tqdm import tqdm
|
|
from litellm import completion
|
|
from multiprocessing import Pool
|
|
from tenacity import retry, wait_exponential
|
|
|
|
|
|
load_dotenv(override=True)
|
|
|
|
MODEL = "openai/gpt-4.1-nano"
|
|
|
|
DB_NAME = str(Path(__file__).parent.parent / "preprocessed_db")
|
|
collection_name = "docs"
|
|
embedding_model = "text-embedding-3-large"
|
|
KNOWLEDGE_BASE_PATH = Path(__file__).parent.parent / "knowledge-base"
|
|
AVERAGE_CHUNK_SIZE = 100
|
|
wait = wait_exponential(multiplier=1, min=10, max=240)
|
|
|
|
|
|
WORKERS = 3
|
|
|
|
openai = OpenAI()
|
|
|
|
|
|
class Result(BaseModel):
|
|
page_content: str
|
|
metadata: dict
|
|
|
|
|
|
class Chunk(BaseModel):
|
|
headline: str = Field(
|
|
description="A brief heading for this chunk, typically a few words, that is most likely to be surfaced in a query",
|
|
)
|
|
summary: str = Field(
|
|
description="A few sentences summarizing the content of this chunk to answer common questions"
|
|
)
|
|
original_text: str = Field(
|
|
description="The original text of this chunk from the provided document, exactly as is, not changed in any way"
|
|
)
|
|
|
|
def as_result(self, document):
|
|
metadata = {"source": document["source"], "type": document["type"]}
|
|
return Result(
|
|
page_content=self.headline + "\n\n" + self.summary + "\n\n" + self.original_text,
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
class Chunks(BaseModel):
|
|
chunks: list[Chunk]
|
|
|
|
|
|
def fetch_documents():
|
|
"""A homemade version of the LangChain DirectoryLoader"""
|
|
|
|
documents = []
|
|
|
|
for folder in KNOWLEDGE_BASE_PATH.iterdir():
|
|
doc_type = folder.name
|
|
for file in folder.rglob("*.md"):
|
|
with open(file, "r", encoding="utf-8") as f:
|
|
documents.append({"type": doc_type, "source": file.as_posix(), "text": f.read()})
|
|
|
|
print(f"Loaded {len(documents)} documents")
|
|
return documents
|
|
|
|
|
|
def make_prompt(document):
|
|
how_many = (len(document["text"]) // AVERAGE_CHUNK_SIZE) + 1
|
|
return f"""
|
|
You take a document and you split the document into overlapping chunks for a KnowledgeBase.
|
|
|
|
The document is from the shared drive of a company called Insurellm.
|
|
The document is of type: {document["type"]}
|
|
The document has been retrieved from: {document["source"]}
|
|
|
|
A chatbot will use these chunks to answer questions about the company.
|
|
You should divide up the document as you see fit, being sure that the entire document is returned across the chunks - don't leave anything out.
|
|
This document should probably be split into at least {how_many} chunks, but you can have more or less as appropriate, ensuring that there are individual chunks to answer specific questions.
|
|
There should be overlap between the chunks as appropriate; typically about 25% overlap or about 50 words, so you have the same text in multiple chunks for best retrieval results.
|
|
|
|
For each chunk, you should provide a headline, a summary, and the original text of the chunk.
|
|
Together your chunks should represent the entire document with overlap.
|
|
|
|
Here is the document:
|
|
|
|
{document["text"]}
|
|
|
|
Respond with the chunks.
|
|
"""
|
|
|
|
|
|
def make_messages(document):
|
|
return [
|
|
{"role": "user", "content": make_prompt(document)},
|
|
]
|
|
|
|
|
|
@retry(wait=wait)
|
|
def process_document(document):
|
|
messages = make_messages(document)
|
|
response = completion(model=MODEL, messages=messages, response_format=Chunks)
|
|
reply = response.choices[0].message.content
|
|
doc_as_chunks = Chunks.model_validate_json(reply).chunks
|
|
return [chunk.as_result(document) for chunk in doc_as_chunks]
|
|
|
|
|
|
def create_chunks(documents):
|
|
"""
|
|
Create chunks using a number of workers in parallel.
|
|
If you get a rate limit error, set the WORKERS to 1.
|
|
"""
|
|
chunks = []
|
|
with Pool(processes=WORKERS) as pool:
|
|
for result in tqdm(pool.imap_unordered(process_document, documents), total=len(documents)):
|
|
chunks.extend(result)
|
|
return chunks
|
|
|
|
|
|
def create_embeddings(chunks):
|
|
chroma = PersistentClient(path=DB_NAME)
|
|
if collection_name in [c.name for c in chroma.list_collections()]:
|
|
chroma.delete_collection(collection_name)
|
|
|
|
texts = [chunk.page_content for chunk in chunks]
|
|
emb = openai.embeddings.create(model=embedding_model, input=texts).data
|
|
vectors = [e.embedding for e in emb]
|
|
|
|
collection = chroma.get_or_create_collection(collection_name)
|
|
|
|
ids = [str(i) for i in range(len(chunks))]
|
|
metas = [chunk.metadata for chunk in chunks]
|
|
|
|
collection.add(ids=ids, embeddings=vectors, documents=texts, metadatas=metas)
|
|
print(f"Vectorstore created with {collection.count()} documents")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
documents = fetch_documents()
|
|
chunks = create_chunks(documents)
|
|
create_embeddings(chunks)
|
|
print("Ingestion complete")
|