Clean the NTSA project repo

This commit is contained in:
The Top Dev
2025-10-24 07:29:26 +03:00
parent 341121c68e
commit be618f4fa7
41 changed files with 362 additions and 1410 deletions

View File

@@ -0,0 +1,31 @@
# ChromaDB and vector databases
langchain_chroma_db/
*.db
*.sqlite3
# Large knowledge bases (keep only samples)
ntsa_comprehensive_knowledge_base/
ntsa_knowledge_base/
# Python cache
__pycache__/
*.pyc
*.pyo
# Jupyter notebook checkpoints
.ipynb_checkpoints/
# Environment files
.env
.venv/
# OS files
.DS_Store
Thumbs.db
# Logs
*.log
# Temporary files
*.tmp
*.temp

View File

@@ -1 +0,0 @@
invalid type: string "1. [mailto:info@ntsa.go.ke](mailto:info@ntsa.go.ke)\n2. [https://ntsa.go.ke/careers](https://ntsa.go.ke/careers)\n3. [https://ntsa.go.ke/downloads](https://ntsa.go.ke/downloads)\n4. [https://ntsa.go.ke/faqs](https://ntsa.go.ke/faqs)\n5. [https://ntsa.go.ke/feedback](https://ntsa.go.ke/feedback)\n6. [https://serviceportal.ntsa.go.ke/](https://serviceportal.ntsa.go.ke/)\nenter)

View File

@@ -1,407 +0,0 @@
#!/usr/bin/env python3
"""
LangChain Integration for NTSA Knowledge Base
Provides advanced document processing and conversational AI capabilities
"""
import os
import json
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime
# Optional imports with fallbacks
try:
import plotly.graph_objects as go
import plotly.express as px
PLOTLY_AVAILABLE = True
except ImportError:
PLOTLY_AVAILABLE = False
try:
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
SKLEARN_AVAILABLE = True
except ImportError:
SKLEARN_AVAILABLE = False
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
# LangChain imports
try:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain.llms import OpenAI
from langchain_openai import ChatOpenAI
LANGCHAIN_AVAILABLE = True
except ImportError:
LANGCHAIN_AVAILABLE = False
class LangChainKnowledgeBase:
"""Advanced knowledge base using LangChain for document processing and conversational AI"""
def __init__(self, knowledge_base_dir: str = "ntsa_comprehensive_knowledge_base",
vector_db_dir: str = "langchain_chroma_db"):
self.knowledge_base_dir = Path(knowledge_base_dir)
self.vector_db_dir = Path(vector_db_dir)
self.documents = []
self.vectorstore = None
self.qa_chain = None
self.memory = None
# Initialize components
self._setup_directories()
self._load_documents()
def _setup_directories(self):
"""Setup required directories"""
self.vector_db_dir.mkdir(exist_ok=True)
print(f"✅ Vector database directory: {self.vector_db_dir}")
def _load_documents(self):
"""Load documents from the knowledge base"""
print("📚 Loading documents from knowledge base...")
if not self.knowledge_base_dir.exists():
print(f"❌ Knowledge base directory not found: {self.knowledge_base_dir}")
return
documents = []
for md_file in self.knowledge_base_dir.rglob("*.md"):
try:
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read()
documents.append({
'file': str(md_file),
'content': content,
'title': md_file.stem,
'category': md_file.parent.name
})
except Exception as e:
print(f"⚠️ Error reading {md_file}: {e}")
self.documents = documents
print(f"✅ Loaded {len(documents)} documents")
def create_vector_store(self, chunk_size: int = 1000, chunk_overlap: int = 200):
"""Create vector store from documents"""
if not LANGCHAIN_AVAILABLE:
print("❌ LangChain not available. Cannot create vector store.")
return False
if not self.documents:
print("❌ No documents loaded. Cannot create vector store.")
return False
try:
print("🔧 Creating vector store...")
# Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
)
texts = []
metadatas = []
for doc in self.documents:
chunks = text_splitter.split_text(doc['content'])
for chunk in chunks:
texts.append(chunk)
metadatas.append({
'source': doc['file'],
'title': doc['title'],
'category': doc['category']
})
print(f"📄 Created {len(texts)} text chunks")
# Create embeddings
try:
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
print("✅ HuggingFace embeddings loaded")
except Exception as e:
print(f"⚠️ HuggingFace embeddings failed: {e}")
print("🔄 Using OpenAI embeddings as fallback...")
from langchain.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
# Create vector store
self.vectorstore = Chroma.from_texts(
texts=texts,
embedding=embeddings,
metadatas=metadatas,
persist_directory=str(self.vector_db_dir)
)
# Persist the vector store
self.vectorstore.persist()
print(f"✅ Vector store created and persisted to {self.vector_db_dir}")
return True
except Exception as e:
print(f"❌ Error creating vector store: {e}")
return False
def load_existing_vector_store(self):
"""Load existing vector store"""
if not LANGCHAIN_AVAILABLE:
print("❌ LangChain not available. Cannot load vector store.")
return False
try:
print("📂 Loading existing vector store...")
# Create embeddings
try:
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
except Exception as e:
print(f"⚠️ HuggingFace embeddings failed: {e}")
print("🔄 Using OpenAI embeddings as fallback...")
from langchain.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
# Load vector store
self.vectorstore = Chroma(
persist_directory=str(self.vector_db_dir),
embedding_function=embeddings
)
print("✅ Vector store loaded successfully")
return True
except Exception as e:
print(f"❌ Error loading vector store: {e}")
return False
def create_qa_chain(self, model_name: str = "gpt-3.5-turbo"):
"""Create question-answering chain"""
if not LANGCHAIN_AVAILABLE:
print("❌ LangChain not available. Cannot create QA chain.")
return False
if not self.vectorstore:
print("❌ Vector store not available. Cannot create QA chain.")
return False
try:
print(f"🔧 Creating QA chain with {model_name}...")
# Initialize LLM
llm = ChatOpenAI(
model_name=model_name,
temperature=0.7,
max_tokens=1000
)
# Create memory
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# Create QA chain
self.qa_chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=self.vectorstore.as_retriever(search_kwargs={"k": 3}),
memory=self.memory,
output_key="answer"
)
print("✅ QA chain created successfully")
return True
except Exception as e:
print(f"❌ Error creating QA chain: {e}")
return False
def ask_question(self, question: str) -> str:
"""Ask a question to the knowledge base"""
if not self.qa_chain:
return "❌ QA chain not available. Please create it first."
try:
result = self.qa_chain({"question": question})
return result["answer"]
except Exception as e:
return f"❌ Error answering question: {e}"
def search_documents(self, query: str, k: int = 5) -> List[Dict]:
"""Search documents using vector similarity"""
if not self.vectorstore:
return []
try:
results = self.vectorstore.similarity_search_with_score(query, k=k)
return [
{
"content": doc.page_content,
"metadata": doc.metadata,
"score": score
}
for doc, score in results
]
except Exception as e:
print(f"❌ Error searching documents: {e}")
return []
def visualize_embeddings(self, n_samples: int = 50, method: str = "tsne"):
"""Visualize document embeddings"""
if not PLOTLY_AVAILABLE:
print("❌ Plotly not available. Cannot create visualization.")
return
if not SKLEARN_AVAILABLE:
print("❌ Scikit-learn not available. Cannot create visualization.")
return
if not NUMPY_AVAILABLE:
print("❌ NumPy not available. Cannot create visualization.")
return
if not self.vectorstore:
print("❌ Vector store not available. Cannot create visualization.")
return
try:
print("📊 Visualizing embeddings...")
# Get all documents and embeddings
all_docs = self.vectorstore.get()
if not all_docs or not all_docs.get('embeddings'):
print("❌ No embeddings found in vector store.")
return
n_samples = min(n_samples, len(all_docs['ids']))
embeddings_array = np.array(all_docs['embeddings'][:n_samples])
texts = all_docs['documents'][:n_samples]
if method == "tsne":
# t-SNE dimensionality reduction
tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, n_samples-1))
embeddings_2d = tsne.fit_transform(embeddings_array)
else:
# PCA dimensionality reduction
pca = PCA(n_components=2, random_state=42)
embeddings_2d = pca.fit_transform(embeddings_array)
# Create visualization
fig = go.Figure()
# Add scatter plot
fig.add_trace(go.Scatter(
x=embeddings_2d[:, 0],
y=embeddings_2d[:, 1],
mode='markers',
marker=dict(
size=8,
color=range(n_samples),
colorscale='Viridis',
showscale=True
),
text=[text[:100] + "..." if len(text) > 100 else text for text in texts],
hovertemplate='<b>%{text}</b><br>X: %{x}<br>Y: %{y}<extra></extra>'
))
fig.update_layout(
title=f"Document Embeddings Visualization ({method.upper()})",
xaxis_title="Dimension 1",
yaxis_title="Dimension 2",
showlegend=False
)
# Save and show
fig.write_html("embeddings_visualization.html")
fig.show()
print("✅ Embeddings visualization created and saved as 'embeddings_visualization.html'")
except Exception as e:
print(f"❌ Error creating visualization: {e}")
print("💡 This might be due to numpy compatibility issues.")
print("💡 Try using OpenAI embeddings instead of HuggingFace embeddings.")
def get_statistics(self) -> Dict[str, Any]:
"""Get knowledge base statistics"""
stats = {
"total_documents": len(self.documents),
"vector_store_available": self.vectorstore is not None,
"qa_chain_available": self.qa_chain is not None,
"categories": {}
}
# Count documents by category
for doc in self.documents:
category = doc.get('category', 'unknown')
if category not in stats['categories']:
stats['categories'][category] = 0
stats['categories'][category] += 1
return stats
def reset_memory(self):
"""Reset conversation memory"""
if self.memory:
self.memory.clear()
print("✅ Conversation memory cleared")
def main():
"""Main function to demonstrate the knowledge base"""
print("🚀 NTSA LangChain Knowledge Base")
print("=" * 50)
# Initialize knowledge base
kb = LangChainKnowledgeBase()
# Create vector store
if kb.create_vector_store():
print("✅ Vector store created successfully")
# Create QA chain
if kb.create_qa_chain():
print("✅ QA chain created successfully")
# Test the system
test_questions = [
"What is NTSA?",
"How do I apply for a driving license?",
"What services does NTSA provide?"
]
print("\n🤖 Testing QA system:")
for question in test_questions:
print(f"\nQ: {question}")
answer = kb.ask_question(question)
print(f"A: {answer[:200]}{'...' if len(answer) > 200 else ''}")
# Show statistics
stats = kb.get_statistics()
print(f"\n📊 Knowledge Base Statistics:")
print(f"Total documents: {stats['total_documents']}")
print(f"Categories: {stats['categories']}")
else:
print("❌ Failed to create QA chain")
else:
print("❌ Failed to create vector store")
if __name__ == "__main__":
main()

View File

@@ -59,9 +59,18 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 5,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"✓ All libraries imported\n",
"✓ API Keys: OpenAI=True, Gemini=True, Claude=True\n"
]
}
],
"source": [
"import os\n",
"import sys\n",
@@ -98,9 +107,22 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 6,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Configuration:\n",
" base_url: https://ntsa.go.ke\n",
" kb_dir: ntsa_knowledge_base\n",
" max_depth: 2\n",
" vector_db_dir: ./langchain_chroma_db\n",
" chunk_size: 1000\n"
]
}
],
"source": [
"CONFIG = {\n",
" 'base_url': 'https://ntsa.go.ke',\n",
@@ -124,9 +146,148 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 7,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🚀 Starting comprehensive NTSA scraping with Selenium...\n",
"✅ Created directory structure in ntsa_comprehensive_knowledge_base\n",
"🚀 Starting comprehensive NTSA scraping...\n",
"📋 Starting URLs: 6\n",
"📄 Max pages: 15\n",
"🔍 Max depth: 3\n",
"✅ Chrome driver initialized successfully\n",
"\n",
"📄 Processing (1/15): https://ntsa.go.ke\n",
"🔍 Depth: 0\n",
"🌐 Loading: https://ntsa.go.ke\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\services\\ntsa_NTSA__Keep_our_roads_safe_f13d765c.md\n",
"📊 Content: 6068 chars\n",
"🔗 Found 10 new links\n",
"\n",
"📄 Processing (2/15): https://ntsa.go.ke/about\n",
"🔍 Depth: 0\n",
"🌐 Loading: https://ntsa.go.ke/about\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\about\\ntsa_NTSA__About_Us_05bb6415.md\n",
"📊 Content: 1422 chars\n",
"🔗 Found 10 new links\n",
"\n",
"📄 Processing (3/15): https://ntsa.go.ke/services\n",
"🔍 Depth: 0\n",
"🌐 Loading: https://ntsa.go.ke/services\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\services\\ntsa_NTSA__NTSA_Services_7a9ee5d0.md\n",
"📊 Content: 1994 chars\n",
"🔗 Found 10 new links\n",
"\n",
"📄 Processing (4/15): https://ntsa.go.ke/contact\n",
"🔍 Depth: 0\n",
"🌐 Loading: https://ntsa.go.ke/contact\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\services\\ntsa_NTSA__Contact_Us_7bdb748a.md\n",
"📊 Content: 1587 chars\n",
"🔗 Found 10 new links\n",
"\n",
"📄 Processing (5/15): https://ntsa.go.ke/news\n",
"🔍 Depth: 0\n",
"🌐 Loading: https://ntsa.go.ke/news\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\news\\ntsa_NTSA__Media_Center_-_News__Updates_e765915c.md\n",
"📊 Content: 2481 chars\n",
"🔗 Found 10 new links\n",
"\n",
"📄 Processing (6/15): https://ntsa.go.ke/tenders\n",
"🔍 Depth: 0\n",
"🌐 Loading: https://ntsa.go.ke/tenders\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\tenders\\ntsa_NTSA__Tenders_73ac6e93.md\n",
"📊 Content: 354 chars\n",
"🔗 Found 10 new links\n",
"\n",
"📄 Processing (7/15): https://ntsa.go.ke/news/new-digital-licensing-system-goes-live\n",
"🔍 Depth: 1\n",
"🌐 Loading: https://ntsa.go.ke/news/new-digital-licensing-system-goes-live\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\news\\ntsa_NTSA__New_Digital_Licensing_System_Goes_Live__NTSA_50d5938e.md\n",
"📊 Content: 1003 chars\n",
"🔗 Found 10 new links\n",
"\n",
"📄 Processing (8/15): https://ntsa.go.ke/news/ntsa-launches-new-road-safety-campaign\n",
"🔍 Depth: 1\n",
"🌐 Loading: https://ntsa.go.ke/news/ntsa-launches-new-road-safety-campaign\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\news\\ntsa_NTSA__NTSA_Launches_New_Road_Safety_Campaign__NTSA_63481444.md\n",
"📊 Content: 1113 chars\n",
"🔗 Found 10 new links\n",
"\n",
"📄 Processing (9/15): https://ntsa.go.ke/news/8th-un-global-road-safety-week-concludes-with-nationwide-activities\n",
"🔍 Depth: 1\n",
"🌐 Loading: https://ntsa.go.ke/news/8th-un-global-road-safety-week-concludes-with-nationwide-activities\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\news\\ntsa_NTSA__8th_UN_Global_Road_Safety_Week_Concludes_wit_9636f22e.md\n",
"📊 Content: 1494 chars\n",
"🔗 Found 10 new links\n",
"\n",
"📄 Processing (10/15): https://ntsa.go.ke/about/who-we-are\n",
"🔍 Depth: 1\n",
"🌐 Loading: https://ntsa.go.ke/about/who-we-are\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\about\\ntsa_NTSA__About_Us_-_Who_We_Are_47583408.md\n",
"📊 Content: 2204 chars\n",
"🔗 Found 10 new links\n",
"\n",
"📄 Processing (11/15): https://ntsa.go.ke/careers\n",
"🔍 Depth: 1\n",
"🌐 Loading: https://ntsa.go.ke/careers\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\careers\\ntsa_Career_Opportunities__NTSA_3e462d97.md\n",
"📊 Content: 477 chars\n",
"🔗 Found 10 new links\n",
"\n",
"📄 Processing (12/15): https://ntsa.go.ke/services/vehicles-services\n",
"🔍 Depth: 1\n",
"🌐 Loading: https://ntsa.go.ke/services/vehicles-services\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\services\\ntsa_NTSA__Vehicles_Services_57ba53a1.md\n",
"📊 Content: 814 chars\n",
"🔗 Found 9 new links\n",
"\n",
"📄 Processing (13/15): https://ntsa.go.ke/faqs\n",
"🔍 Depth: 1\n",
"🌐 Loading: https://ntsa.go.ke/faqs\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\services\\ntsa_NTSA__Frequently_Asked_Questions__NTSA_Kenya_291931bf.md\n",
"📊 Content: 819 chars\n",
"🔗 Found 8 new links\n",
"\n",
"📄 Processing (14/15): https://ntsa.go.ke/privacy-policy\n",
"🔍 Depth: 1\n",
"🌐 Loading: https://ntsa.go.ke/privacy-policy\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\services\\ntsa_NTSA__Privacy_Policy__NTSA_68960874.md\n",
"📊 Content: 1130 chars\n",
"🔗 Found 7 new links\n",
"\n",
"📄 Processing (15/15): https://ntsa.go.ke/\n",
"🔍 Depth: 1\n",
"🌐 Loading: https://ntsa.go.ke/\n",
"✅ Saved: ntsa_comprehensive_knowledge_base\\services\\ntsa_NTSA__Keep_our_roads_safe_0a8e8522.md\n",
"📊 Content: 6068 chars\n",
"🔗 Found 10 new links\n",
"✅ Index file created: ntsa_comprehensive_knowledge_base\\INDEX.md\n",
"✅ Metadata saved to ntsa_comprehensive_knowledge_base\\metadata\\comprehensive_metadata.json\n",
"\n",
"🎉 Comprehensive scraping completed!\n",
"📊 Total pages scraped: 15\n",
"❌ Failed pages: 0\n",
"📁 Output directory: c:\\Users\\Joshua\\OneDrive\\Desktop\\Projects\\AI\\Andela - Gen AI Learning\\llm_engineering\\week5\\community-contributions\\NTSA_knowledge_base_and_chatbot\\ntsa_comprehensive_knowledge_base\n",
"🔚 Driver closed\n",
"\n",
"✅ Comprehensive scraping completed!\n",
"📊 Total pages scraped: 15\n",
"\n",
"📋 Pages by category:\n",
" - About: 2\n",
" - Careers: 1\n",
" - News: 4\n",
" - Services: 7\n",
" - Tenders: 1\n",
"\n",
"📁 Updated knowledge base directory: ntsa_comprehensive_knowledge_base\n"
]
}
],
"source": [
"# Use the comprehensive scraper for better content extraction\n",
"print(\"🚀 Starting comprehensive NTSA scraping with Selenium...\")\n",

View File

@@ -1,6 +1,6 @@
# NTSA Knowledge Base Index
**Generated:** 2025-10-24 05:34:52
**Generated:** 2025-10-24 07:24:42
**Total Pages:** 15
## Services
@@ -42,7 +42,7 @@
## About
- [NTSA | NTSA | About Us](ntsa_comprehensive_knowledge_base\about\ntsa_NTSA__NTSA__About_Us_05bb6415.md)
- [NTSA | About Us](ntsa_comprehensive_knowledge_base\about\ntsa_NTSA__About_Us_05bb6415.md)
- URL: https://ntsa.go.ke/about
- Content: 1422 chars
- Depth: 0

View File

@@ -1,7 +1,7 @@
# NTSA | About Us - Who We Are
**URL:** https://ntsa.go.ke/about/who-we-are
**Scraped:** 2025-10-24T05:34:27.946216
**Scraped:** 2025-10-24T07:24:13.128350
**Content Length:** 2204 characters
---

View File

@@ -1,7 +1,7 @@
# Career Opportunities | NTSA
**URL:** https://ntsa.go.ke/careers
**Scraped:** 2025-10-24T05:34:32.578853
**Scraped:** 2025-10-24T07:24:18.790660
**Content Length:** 477 characters
---

View File

@@ -3,7 +3,7 @@
"base_url": "https://ntsa.go.ke",
"total_pages_scraped": 15,
"failed_pages": 0,
"scraping_timestamp": "2025-10-24T05:34:52.991790",
"scraping_timestamp": "2025-10-24T07:24:42.107607",
"output_directory": "ntsa_comprehensive_knowledge_base"
},
"scraped_pages": [
@@ -17,8 +17,8 @@
},
{
"url": "https://ntsa.go.ke/about",
"title": "NTSA | NTSA | About Us",
"file_path": "ntsa_comprehensive_knowledge_base\\about\\ntsa_NTSA__NTSA__About_Us_05bb6415.md",
"title": "NTSA | About Us",
"file_path": "ntsa_comprehensive_knowledge_base\\about\\ntsa_NTSA__About_Us_05bb6415.md",
"category": "about",
"content_length": 1422,
"depth": 0

View File

@@ -1,7 +1,7 @@
# NTSA | 8th UN Global Road Safety Week Concludes with Nationwide Activities | NTSA Kenya
**URL:** https://ntsa.go.ke/news/8th-un-global-road-safety-week-concludes-with-nationwide-activities
**Scraped:** 2025-10-24T05:34:23.386582
**Scraped:** 2025-10-24T07:24:08.503078
**Content Length:** 1494 characters
---

View File

@@ -1,7 +1,7 @@
# NTSA | Media Center - News & Updates
**URL:** https://ntsa.go.ke/news
**Scraped:** 2025-10-24T05:34:04.407247
**Scraped:** 2025-10-24T07:23:48.561059
**Content Length:** 2481 characters
---

View File

@@ -1,7 +1,7 @@
# NTSA | NTSA Launches New Road Safety Campaign | NTSA Kenya
**URL:** https://ntsa.go.ke/news/ntsa-launches-new-road-safety-campaign
**Scraped:** 2025-10-24T05:34:18.816453
**Scraped:** 2025-10-24T07:24:03.599976
**Content Length:** 1113 characters
---

View File

@@ -1,7 +1,7 @@
# NTSA | New Digital Licensing System Goes Live | NTSA Kenya
**URL:** https://ntsa.go.ke/news/new-digital-licensing-system-goes-live
**Scraped:** 2025-10-24T05:34:14.170148
**Scraped:** 2025-10-24T07:23:58.993952
**Content Length:** 1003 characters
---

View File

@@ -1,7 +1,7 @@
<html lang="en"><head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="csrf-token" content="YjKD4sb1xvPZFtwPjzii931pAAxYSPnS0Q5UKssV">
<meta name="csrf-token" content="whLsjsWqCYeljl6my3R798zngx7B2cRfIm9OGRMU">
<title>Career Opportunities | NTSA</title>

View File

@@ -1,7 +1,7 @@
<html lang="en"><head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="csrf-token" content="YjKD4sb1xvPZFtwPjzii931pAAxYSPnS0Q5UKssV">
<meta name="csrf-token" content="whLsjsWqCYeljl6my3R798zngx7B2cRfIm9OGRMU">
<title>NTSA | 8th UN Global Road Safety Week Concludes with Nationwide Activities | NTSA Kenya</title>

View File

@@ -1,7 +1,7 @@
<html lang="en"><head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="csrf-token" content="YjKD4sb1xvPZFtwPjzii931pAAxYSPnS0Q5UKssV">
<meta name="csrf-token" content="whLsjsWqCYeljl6my3R798zngx7B2cRfIm9OGRMU">
<title>NTSA | About Us - Who We Are</title>

View File

@@ -1,7 +1,7 @@
<html lang="en"><head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="csrf-token" content="YjKD4sb1xvPZFtwPjzii931pAAxYSPnS0Q5UKssV">
<meta name="csrf-token" content="whLsjsWqCYeljl6my3R798zngx7B2cRfIm9OGRMU">
<title>NTSA | Contact Us</title>

View File

@@ -1,7 +1,7 @@
<html lang="en"><head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="csrf-token" content="YjKD4sb1xvPZFtwPjzii931pAAxYSPnS0Q5UKssV">
<meta name="csrf-token" content="whLsjsWqCYeljl6my3R798zngx7B2cRfIm9OGRMU">
<title>NTSA | Frequently Asked Questions | NTSA Kenya</title>

View File

@@ -1,7 +1,7 @@
<html lang="en"><head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="csrf-token" content="YjKD4sb1xvPZFtwPjzii931pAAxYSPnS0Q5UKssV">
<meta name="csrf-token" content="whLsjsWqCYeljl6my3R798zngx7B2cRfIm9OGRMU">
<title>NTSA | NTSA Launches New Road Safety Campaign | NTSA Kenya</title>

View File

@@ -1,7 +1,7 @@
<html lang="en"><head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="csrf-token" content="YjKD4sb1xvPZFtwPjzii931pAAxYSPnS0Q5UKssV">
<meta name="csrf-token" content="whLsjsWqCYeljl6my3R798zngx7B2cRfIm9OGRMU">
<title>NTSA | NTSA Services</title>

View File

@@ -1,7 +1,7 @@
<html lang="en"><head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="csrf-token" content="YjKD4sb1xvPZFtwPjzii931pAAxYSPnS0Q5UKssV">
<meta name="csrf-token" content="whLsjsWqCYeljl6my3R798zngx7B2cRfIm9OGRMU">
<title>NTSA | New Digital Licensing System Goes Live | NTSA Kenya</title>

View File

@@ -1,7 +1,7 @@
<html lang="en"><head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="csrf-token" content="YjKD4sb1xvPZFtwPjzii931pAAxYSPnS0Q5UKssV">
<meta name="csrf-token" content="whLsjsWqCYeljl6my3R798zngx7B2cRfIm9OGRMU">
<title>NTSA | Tenders</title>

View File

@@ -1,7 +1,7 @@
# NTSA | Contact Us
**URL:** https://ntsa.go.ke/contact
**Scraped:** 2025-10-24T05:33:58.531154
**Scraped:** 2025-10-24T07:23:43.605483
**Content Length:** 1587 characters
---

View File

@@ -1,7 +1,7 @@
# NTSA | Frequently Asked Questions | NTSA Kenya
**URL:** https://ntsa.go.ke/faqs
**Scraped:** 2025-10-24T05:34:41.706480
**Scraped:** 2025-10-24T07:24:28.754233
**Content Length:** 819 characters
---

View File

@@ -1,7 +1,7 @@
# NTSA | Keep our roads safe
**URL:** https://ntsa.go.ke/
**Scraped:** 2025-10-24T05:34:50.569331
**Scraped:** 2025-10-24T07:24:38.822420
**Content Length:** 6068 characters
---

View File

@@ -1,7 +1,7 @@
# NTSA | Keep our roads safe
**URL:** https://ntsa.go.ke
**Scraped:** 2025-10-24T05:33:40.256848
**Scraped:** 2025-10-24T07:23:28.981272
**Content Length:** 6068 characters
---

View File

@@ -1,7 +1,7 @@
# NTSA | NTSA Services
**URL:** https://ntsa.go.ke/services
**Scraped:** 2025-10-24T05:33:50.657769
**Scraped:** 2025-10-24T07:23:38.582012
**Content Length:** 1994 characters
---

View File

@@ -1,7 +1,7 @@
# NTSA | Privacy Policy | NTSA
**URL:** https://ntsa.go.ke/privacy-policy
**Scraped:** 2025-10-24T05:34:46.121750
**Scraped:** 2025-10-24T07:24:33.755242
**Content Length:** 1130 characters
---

View File

@@ -1,7 +1,7 @@
# NTSA | Vehicles Services
**URL:** https://ntsa.go.ke/services/vehicles-services
**Scraped:** 2025-10-24T05:34:37.062846
**Scraped:** 2025-10-24T07:24:23.702092
**Content Length:** 814 characters
---

View File

@@ -1,7 +1,7 @@
# NTSA | Tenders
**URL:** https://ntsa.go.ke/tenders
**Scraped:** 2025-10-24T05:34:09.765585
**Scraped:** 2025-10-24T07:23:53.707639
**Content Length:** 354 characters
---

View File

@@ -1,49 +1,14 @@
# NTSA AI Chatbot - Complete Dependencies
# Install with: pip install -r requirements.txt
# Core dependencies
requests>=2.25.0
beautifulsoup4>=4.9.0
selenium>=4.0.0
webdriver-manager>=3.8.0
# Core web scraping
requests>=2.31.0
beautifulsoup4>=4.12.0
lxml>=5.1.0
# Configuration
python-dotenv>=1.0.0
# LangChain framework
langchain>=0.1.0
langchain-community>=0.0.20
langchain-openai>=0.0.5
langchain-chroma>=0.1.0
langchain-huggingface>=0.0.1
# HuggingFace transformers
transformers>=4.36.0
sentence-transformers>=2.3.1
torch>=2.1.0
# Vector database
chromadb>=0.4.22
# LLM APIs
openai>=1.12.0
anthropic>=0.18.0
google-generativeai>=0.3.0
# Data processing and visualization
pandas>=2.0.0
numpy>=1.24.0
matplotlib>=3.7.0
plotly>=5.18.0
scikit-learn>=1.3.0
# Web interface
gradio>=4.19.0
# Jupyter
# Jupyter notebook
jupyter>=1.0.0
ipykernel>=6.25.0
ipywidgets>=8.1.0
selenium>=4.15.0
requests-html>=0.10.0
webdriver-manager>=4.0.0
playwright>=1.42.0
ipykernel>=6.0.0
# Optional: For advanced features
# langchain>=0.1.0
# chromadb>=0.4.0
# openai>=1.0.0

View File

@@ -1,463 +0,0 @@
"""
scraper_utils.py
Web scraping utilities for NTSA knowledge base
"""
import requests
from bs4 import BeautifulSoup
import os
import json
import time
import re
from urllib.parse import urljoin, urlparse
from pathlib import Path
from datetime import datetime
import hashlib
import ssl
import urllib3
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class NTSAKnowledgeBaseScraper:
def __init__(self, base_url="https://ntsa.go.ke", output_dir="ntsa_knowledge_base"):
self.base_url = base_url
self.output_dir = Path(output_dir)
self.visited_urls = set()
self.scraped_data = []
# Category mapping based on URL patterns and content
self.categories = {
'driving_licenses': ['driving', 'license', 'dl', 'learner', 'provisional'],
'vehicle_registration': ['registration', 'vehicle', 'logbook', 'number plate', 'transfer'],
'road_safety': ['safety', 'inspection', 'accident', 'compliance'],
'services': ['service', 'application', 'fee', 'payment', 'online'],
'requirements': ['requirement', 'document', 'eligibility', 'criteria'],
'procedures': ['procedure', 'process', 'step', 'how to', 'guide'],
'about': ['about', 'contact', 'mission', 'vision', 'staff'],
'news': ['news', 'announcement', 'press', 'notice'],
'downloads': ['download', 'form', 'pdf', 'document'],
}
self.setup_directories()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
# Create session with SSL handling
self.session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# Disable SSL verification for problematic sites
self.session.verify = False
def setup_directories(self):
"""Create folder structure for knowledge base"""
self.output_dir.mkdir(exist_ok=True)
for category in self.categories.keys():
(self.output_dir / category).mkdir(exist_ok=True)
(self.output_dir / 'metadata').mkdir(exist_ok=True)
print(f"✓ Created directory structure in {self.output_dir}")
def get_page(self, url, retries=3):
"""Fetch page content with retry logic and SSL handling"""
for attempt in range(retries):
try:
# Try with session first (with SSL disabled)
response = self.session.get(
url,
headers=self.headers,
timeout=15,
verify=False,
allow_redirects=True
)
response.raise_for_status()
return response
except requests.exceptions.SSLError as e:
if attempt == retries - 1:
print(f"✗ SSL Error for {url}: {e}")
# Try with HTTP instead of HTTPS
http_url = url.replace('https://', 'http://')
try:
response = self.session.get(
http_url,
headers=self.headers,
timeout=15,
verify=False
)
response.raise_for_status()
print(f"✓ Successfully accessed via HTTP: {http_url}")
return response
except Exception as http_e:
print(f"✗ HTTP fallback failed for {http_url}: {http_e}")
return None
else:
print(f"⚠️ SSL Error (attempt {attempt + 1}/{retries}): {e}")
time.sleep(2 ** attempt)
except requests.RequestException as e:
if attempt == retries - 1:
print(f"✗ Failed to fetch {url}: {e}")
return None
print(f"⚠️ Request failed (attempt {attempt + 1}/{retries}): {e}")
time.sleep(2 ** attempt)
return None
def test_connection(self, url):
"""Test connection to a URL with various methods"""
print(f"🔍 Testing connection to {url}...")
# Test 1: HTTPS with SSL disabled
try:
response = self.session.get(url, timeout=10, verify=False)
if response.status_code == 200:
print(f"✓ HTTPS connection successful (SSL disabled)")
return True
except Exception as e:
print(f"✗ HTTPS failed: {e}")
# Test 2: HTTP fallback
http_url = url.replace('https://', 'http://')
try:
response = self.session.get(http_url, timeout=10)
if response.status_code == 200:
print(f"✓ HTTP connection successful")
return True
except Exception as e:
print(f"✗ HTTP failed: {e}")
# Test 3: Try with different user agent
try:
old_headers = self.session.headers.copy()
self.session.headers.update({
'User-Agent': 'curl/7.68.0'
})
response = self.session.get(url, timeout=10, verify=False)
if response.status_code == 200:
print(f"✓ Connection successful with curl user agent")
self.session.headers.update(old_headers)
return True
self.session.headers.update(old_headers)
except Exception as e:
print(f"✗ Curl user agent failed: {e}")
print(f"✗ All connection methods failed for {url}")
return False
def get_alternative_urls(self, base_url):
"""Get alternative URLs to try if the main URL fails"""
alternatives = [
base_url,
base_url.replace('https://', 'http://'),
f"{base_url}/index.php",
f"{base_url}/index.html",
f"{base_url}/home",
f"{base_url}/main"
]
return list(set(alternatives)) # Remove duplicates
def clean_text(self, text):
"""Clean and normalize text"""
if not text:
return ""
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'[^\w\s\-.,;:!?()\[\]"\'/]', '', text)
return text.strip()
def categorize_content(self, url, title, content):
"""Determine category based on URL and content"""
url_lower = url.lower()
title_lower = title.lower()
content_lower = content.lower()
category_scores = {}
for category, keywords in self.categories.items():
score = 0
for keyword in keywords:
if keyword in url_lower:
score += 5
if keyword in title_lower:
score += 3
if keyword in content_lower:
score += 1
category_scores[category] = score
best_category = max(category_scores, key=category_scores.get)
return best_category if category_scores[best_category] > 0 else 'services'
def extract_links(self, soup, current_url):
"""Extract all relevant links from page"""
links = []
for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(current_url, href)
if urlparse(full_url).netloc == urlparse(self.base_url).netloc:
if not any(full_url.endswith(ext) for ext in ['.pdf', '.doc', '.docx', '.jpg', '.png']):
if '#' in full_url:
full_url = full_url.split('#')[0]
links.append(full_url)
return list(set(links))
def extract_content(self, soup, url):
"""Extract main content from page with improved logic"""
# Remove unwanted elements
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
element.decompose()
main_content = None
content_selectors = [
'main', 'article', '.content', '#content',
'.main-content', '#main-content', '.post-content',
'.entry-content', 'div[role="main"]',
'.container', '.wrapper', '#main', '.main',
'body' # Fallback to body if no specific content area found
]
for selector in content_selectors:
main_content = soup.select_one(selector)
if main_content:
break
if not main_content:
main_content = soup.body
if not main_content:
return ""
content_parts = []
# Look for more element types
for element in main_content.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'li', 'td', 'div', 'span']):
text = self.clean_text(element.get_text())
if text and len(text) > 5: # Reduced minimum length
content_parts.append(text)
# If no content found with specific elements, try getting all text
if not content_parts:
all_text = self.clean_text(main_content.get_text())
if all_text and len(all_text) > 10:
content_parts.append(all_text)
return ' '.join(content_parts)
def create_markdown(self, title, url, content, category, metadata):
"""Create markdown document"""
filename_base = re.sub(r'[^\w\s-]', '', title.lower())
filename_base = re.sub(r'[-\s]+', '_', filename_base)[:50]
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
filename = f"{filename_base}_{url_hash}.md"
md_content = f"""# {title}
**Source:** [{url}]({url})
**Category:** {category}
**Scraped:** {metadata['scraped_date']}
---
## Content
{content}
---
## Metadata
- **Word Count:** {metadata['word_count']}
- **URL:** {url}
- **Category:** {category}
"""
filepath = self.output_dir / category / filename
with open(filepath, 'w', encoding='utf-8') as f:
f.write(md_content)
return filepath
def scrape_page(self, url, depth=0, max_depth=3):
"""Scrape a single page and follow links"""
if depth > max_depth or url in self.visited_urls:
return
self.visited_urls.add(url)
print(f"{' ' * depth}📄 Scraping: {url}")
response = self.get_page(url)
if not response:
return
soup = BeautifulSoup(response.content, 'html.parser')
title = soup.title.string if soup.title else url.split('/')[-1]
title = self.clean_text(title)
content = self.extract_content(soup, url)
if len(content) < 50:
print(f"{' ' * depth} ⊘ Skipped (insufficient content: {len(content)} chars)")
print(f"{' ' * depth} 📝 Content preview: {content[:100]}...")
return
category = self.categorize_content(url, title, content)
metadata = {
'url': url,
'title': title,
'category': category,
'scraped_date': datetime.now().isoformat(),
'word_count': len(content.split()),
'depth': depth
}
filepath = self.create_markdown(title, url, content, category, metadata)
print(f"{' ' * depth} ✓ Saved to {category}/{filepath.name}")
self.scraped_data.append(metadata)
time.sleep(1)
if depth < max_depth:
links = self.extract_links(soup, url)
for link in links[:10]:
if link not in self.visited_urls:
self.scrape_page(link, depth + 1, max_depth)
def save_metadata(self):
"""Save scraping metadata to JSON"""
metadata_file = self.output_dir / 'metadata' / 'scraping_metadata.json'
summary = {
'scraping_date': datetime.now().isoformat(),
'total_pages': len(self.scraped_data),
'categories': {},
'pages': self.scraped_data
}
for page in self.scraped_data:
category = page['category']
summary['categories'][category] = summary['categories'].get(category, 0) + 1
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(summary, f, indent=2)
print(f"\n✓ Metadata saved to {metadata_file}")
return summary
def create_index(self):
"""Create an index markdown file"""
index_content = f"""# NTSA Knowledge Base Index
**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Total Documents:** {len(self.scraped_data)}
---
## Categories
"""
by_category = {}
for page in self.scraped_data:
category = page['category']
if category not in by_category:
by_category[category] = []
by_category[category].append(page)
for category, pages in sorted(by_category.items()):
index_content += f"\n### {category.replace('_', ' ').title()} ({len(pages)} documents)\n\n"
for page in sorted(pages, key=lambda x: x['title']):
filename_base = re.sub(r'[^\w\s-]', '', page['title'].lower())
filename_base = re.sub(r'[-\s]+', '_', filename_base)[:50]
url_hash = hashlib.md5(page['url'].encode()).hexdigest()[:8]
filename = f"{filename_base}_{url_hash}.md"
index_content += f"- [{page['title']}](./{category}/{filename})\n"
index_file = self.output_dir / 'INDEX.md'
with open(index_file, 'w', encoding='utf-8') as f:
f.write(index_content)
print(f"✓ Index created at {index_file}")
def run(self, start_urls=None, max_depth=2):
"""Run the complete scraping process"""
print("="*60)
print("NTSA Knowledge Base Scraper")
print("="*60)
if start_urls is None:
start_urls = [self.base_url]
print(f"\nStarting scraping from {len(start_urls)} URL(s)...")
print(f"Max depth: {max_depth}\n")
# Test connections first and try alternatives
working_urls = []
for url in start_urls:
if self.test_connection(url):
working_urls.append(url)
else:
print(f"⚠️ Main URL failed, trying alternatives...")
alternatives = self.get_alternative_urls(url)
found_working = False
for alt_url in alternatives:
if alt_url != url and self.test_connection(alt_url):
working_urls.append(alt_url)
found_working = True
print(f"✅ Found working alternative: {alt_url}")
break
if not found_working:
print(f"❌ All alternatives failed for {url}")
if not working_urls:
print("❌ No working URLs found. Please check your internet connection and the website availability.")
return None
print(f"\n✅ Found {len(working_urls)} working URL(s). Starting scraping...\n")
for url in working_urls:
self.scrape_page(url, depth=0, max_depth=max_depth)
print("\n" + "="*60)
print("Finalizing knowledge base...")
print("="*60)
summary = self.save_metadata()
self.create_index()
print("\n" + "="*60)
print("SCRAPING COMPLETE!")
print("="*60)
print(f"\nTotal pages scraped: {len(self.scraped_data)}")
print(f"Output directory: {self.output_dir.absolute()}")
print("\nPages by category:")
for category, count in sorted(summary['categories'].items()):
print(f" - {category.replace('_', ' ').title()}: {count}")
return summary

View File

@@ -1,450 +0,0 @@
#!/usr/bin/env python3
"""
Simple Comprehensive Selenium Scraper for NTSA Website
A simplified, working version of the comprehensive scraper
"""
import os
import json
import time
import hashlib
from pathlib import Path
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Set, Optional
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
class SimpleComprehensiveScraper:
"""Simple comprehensive scraper for NTSA website"""
def __init__(self, base_url: str = "https://ntsa.go.ke", output_dir: str = "ntsa_comprehensive_knowledge_base",
wait_time: int = 10, page_load_sleep: int = 3, link_follow_limit: int = 10,
min_content_length: int = 50):
self.base_url = base_url
self.output_dir = Path(output_dir)
self.wait_time = wait_time
self.page_load_sleep = page_load_sleep
self.link_follow_limit = link_follow_limit
self.min_content_length = min_content_length
# Create output directory structure
self._create_directory_structure()
# Initialize tracking
self.scraped_urls: Set[str] = set()
self.failed_urls: Set[str] = set()
self.scraped_data: List[Dict] = []
# Initialize driver
self.driver = None
def _create_directory_structure(self):
"""Create the output directory structure"""
directories = [
'about', 'services', 'news', 'tenders', 'careers', 'downloads',
'driving_licenses', 'vehicle_registration', 'road_safety',
'procedures', 'requirements', 'raw_html', 'screenshots', 'metadata'
]
for directory in directories:
(self.output_dir / directory).mkdir(parents=True, exist_ok=True)
print(f"✅ Created directory structure in {self.output_dir}")
def _setup_driver(self):
"""Setup Chrome driver with options"""
try:
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=chrome_options)
self.driver.set_page_load_timeout(30)
print("✅ Chrome driver initialized successfully")
return True
except Exception as e:
print(f"❌ Failed to initialize Chrome driver: {e}")
return False
def _get_page_content(self, url: str) -> Optional[Dict]:
"""Get page content using Selenium"""
try:
print(f"🌐 Loading: {url}")
self.driver.get(url)
# Wait for page to load
time.sleep(self.page_load_sleep)
# Wait for content to be present
WebDriverWait(self.driver, self.wait_time).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Get page source and parse with BeautifulSoup
page_source = self.driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# Extract title
title = soup.find('title')
title_text = title.get_text().strip() if title else "NTSA Page"
# Extract main content
content_selectors = [
'main', 'article', '.content', '#content', '.main-content',
'.page-content', '.post-content', '.entry-content'
]
content = ""
for selector in content_selectors:
elements = soup.select(selector)
if elements:
content = " ".join([elem.get_text().strip() for elem in elements])
break
# If no specific content found, get all text
if not content or len(content) < self.min_content_length:
# Remove script and style elements
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
content = soup.get_text()
# Clean content
content = content.strip()
if len(content) < self.min_content_length:
print(f"⚠️ Content too short ({len(content)} chars): {url}")
return None
return {
'url': url,
'title': title_text,
'content': content,
'html': page_source,
'timestamp': datetime.now().isoformat(),
'content_length': len(content)
}
except TimeoutException:
print(f"⏰ Timeout loading: {url}")
return None
except WebDriverException as e:
print(f"🚫 WebDriver error for {url}: {e}")
return None
except Exception as e:
print(f"❌ Error processing {url}: {e}")
return None
def _extract_links_from_page(self, url: str) -> List[str]:
"""Extract links from the current page"""
try:
# Wait for page to load
WebDriverWait(self.driver, self.wait_time).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Find all links
links = self.driver.find_elements(By.TAG_NAME, "a")
extracted_links = []
for link in links:
try:
href = link.get_attribute("href")
if href:
# Convert relative URLs to absolute
absolute_url = urljoin(url, href)
parsed_url = urlparse(absolute_url)
# Only include links from the same domain
if parsed_url.netloc == urlparse(self.base_url).netloc:
extracted_links.append(absolute_url)
except Exception as e:
continue
return list(set(extracted_links)) # Remove duplicates
except Exception as e:
print(f"❌ Error extracting links from {url}: {e}")
return []
def _save_content(self, content_data: Dict) -> str:
"""Save content to file and return file path"""
try:
# Generate filename from URL
url_hash = hashlib.md5(content_data['url'].encode()).hexdigest()[:8]
safe_title = "".join(c for c in content_data['title'] if c.isalnum() or c in (' ', '-', '_')).rstrip()
safe_title = safe_title.replace(' ', '_')[:50]
filename = f"ntsa_{safe_title}_{url_hash}.md"
# Determine category based on URL
category = self._categorize_url(content_data['url'])
category_dir = self.output_dir / category
category_dir.mkdir(exist_ok=True)
# Save markdown content
md_file = category_dir / filename
with open(md_file, 'w', encoding='utf-8') as f:
f.write(f"# {content_data['title']}\n\n")
f.write(f"**URL:** {content_data['url']}\n")
f.write(f"**Scraped:** {content_data['timestamp']}\n")
f.write(f"**Content Length:** {content_data['content_length']} characters\n\n")
f.write("---\n\n")
f.write(content_data['content'])
# Save raw HTML
html_file = self.output_dir / 'raw_html' / f"{safe_title}_{url_hash}.html"
with open(html_file, 'w', encoding='utf-8') as f:
f.write(content_data['html'])
return str(md_file)
except Exception as e:
print(f"❌ Error saving content: {e}")
return ""
def _categorize_url(self, url: str) -> str:
"""Categorize URL based on path"""
url_lower = url.lower()
if '/about' in url_lower:
return 'about'
elif '/services' in url_lower:
return 'services'
elif '/news' in url_lower or '/media' in url_lower:
return 'news'
elif '/tenders' in url_lower:
return 'tenders'
elif '/careers' in url_lower or '/jobs' in url_lower:
return 'careers'
elif '/downloads' in url_lower:
return 'downloads'
elif '/driving' in url_lower or '/license' in url_lower:
return 'driving_licenses'
elif '/vehicle' in url_lower or '/registration' in url_lower:
return 'vehicle_registration'
elif '/safety' in url_lower or '/road' in url_lower:
return 'road_safety'
elif '/procedures' in url_lower:
return 'procedures'
elif '/requirements' in url_lower:
return 'requirements'
else:
return 'services' # Default category
def scrape_comprehensive(self, start_urls: List[str], max_pages: int = 50, max_depth: int = 3) -> List[Dict]:
"""Comprehensive scraping of NTSA website"""
print("🚀 Starting comprehensive NTSA scraping...")
print(f"📋 Starting URLs: {len(start_urls)}")
print(f"📄 Max pages: {max_pages}")
print(f"🔍 Max depth: {max_depth}")
if not self._setup_driver():
print("❌ Failed to initialize driver. Cannot proceed.")
return []
try:
# Initialize queue with start URLs
url_queue = [(url, 0) for url in start_urls] # (url, depth)
processed_count = 0
while url_queue and processed_count < max_pages:
current_url, depth = url_queue.pop(0)
# Skip if already processed or too deep
if current_url in self.scraped_urls or depth > max_depth:
continue
print(f"\n📄 Processing ({processed_count + 1}/{max_pages}): {current_url}")
print(f"🔍 Depth: {depth}")
# Get page content
content_data = self._get_page_content(current_url)
if content_data:
# Save content
file_path = self._save_content(content_data)
if file_path:
self.scraped_urls.add(current_url)
self.scraped_data.append({
'url': current_url,
'title': content_data['title'],
'file_path': file_path,
'category': self._categorize_url(current_url),
'content_length': content_data['content_length'],
'depth': depth
})
print(f"✅ Saved: {file_path}")
print(f"📊 Content: {content_data['content_length']} chars")
# Extract links for further crawling (if not at max depth)
if depth < max_depth:
links = self._extract_links_from_page(current_url)
new_links = [link for link in links if link not in self.scraped_urls and link not in self.failed_urls]
# Limit new links to avoid infinite crawling
new_links = new_links[:self.link_follow_limit]
if new_links:
print(f"🔗 Found {len(new_links)} new links")
for link in new_links:
url_queue.append((link, depth + 1))
else:
print("🔗 No new links found")
else:
print(f"❌ Failed to save content for: {current_url}")
self.failed_urls.add(current_url)
else:
print(f"❌ Failed to get content for: {current_url}")
self.failed_urls.add(current_url)
processed_count += 1
# Small delay between requests
time.sleep(1)
# Save metadata
self._save_metadata()
print(f"\n🎉 Comprehensive scraping completed!")
print(f"📊 Total pages scraped: {len(self.scraped_data)}")
print(f"❌ Failed pages: {len(self.failed_urls)}")
print(f"📁 Output directory: {self.output_dir.absolute()}")
return self.scraped_data
except Exception as e:
print(f"❌ Error during comprehensive scraping: {e}")
return []
finally:
if self.driver:
self.driver.quit()
print("🔚 Driver closed")
def _save_metadata(self):
"""Save scraping metadata"""
try:
metadata = {
'scraping_info': {
'base_url': self.base_url,
'total_pages_scraped': len(self.scraped_data),
'failed_pages': len(self.failed_urls),
'scraping_timestamp': datetime.now().isoformat(),
'output_directory': str(self.output_dir)
},
'scraped_pages': self.scraped_data,
'failed_urls': list(self.failed_urls)
}
metadata_file = self.output_dir / 'metadata' / 'comprehensive_metadata.json'
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
# Create index file
self._create_index_file()
print(f"✅ Metadata saved to {metadata_file}")
except Exception as e:
print(f"❌ Error saving metadata: {e}")
def _create_index_file(self):
"""Create an index file of all scraped content"""
try:
index_file = self.output_dir / 'INDEX.md'
with open(index_file, 'w', encoding='utf-8') as f:
f.write("# NTSA Knowledge Base Index\n\n")
f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"**Total Pages:** {len(self.scraped_data)}\n\n")
# Group by category
categories = {}
for item in self.scraped_data:
category = item['category']
if category not in categories:
categories[category] = []
categories[category].append(item)
for category, items in categories.items():
f.write(f"## {category.title()}\n\n")
for item in items:
f.write(f"- [{item['title']}]({item['file_path']})\n")
f.write(f" - URL: {item['url']}\n")
f.write(f" - Content: {item['content_length']} chars\n")
f.write(f" - Depth: {item['depth']}\n\n")
print(f"✅ Index file created: {index_file}")
except Exception as e:
print(f"❌ Error creating index file: {e}")
def main():
"""Main function to run the scraper"""
print("🚀 NTSA Comprehensive Scraper")
print("=" * 50)
# Configuration
config = {
'base_url': 'https://ntsa.go.ke',
'start_urls': [
'https://ntsa.go.ke',
'https://ntsa.go.ke/about',
'https://ntsa.go.ke/services',
'https://ntsa.go.ke/contact',
'https://ntsa.go.ke/news',
'https://ntsa.go.ke/tenders'
],
'output_dir': 'ntsa_comprehensive_knowledge_base',
'max_pages': 100,
'max_depth': 3,
'wait_time': 10,
'page_load_sleep': 3,
'link_follow_limit': 10,
'min_content_length': 50
}
# Initialize scraper
scraper = SimpleComprehensiveScraper(
base_url=config['base_url'],
output_dir=config['output_dir'],
wait_time=config['wait_time'],
page_load_sleep=config['page_load_sleep'],
link_follow_limit=config['link_follow_limit'],
min_content_length=config['min_content_length']
)
# Run scraping
result = scraper.scrape_comprehensive(
start_urls=config['start_urls'],
max_pages=config['max_pages'],
max_depth=config['max_depth']
)
if result:
print(f"\n✅ Scraping completed successfully!")
print(f"📊 Total pages scraped: {len(result)}")
else:
print("\n❌ Scraping failed or no pages were scraped.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,116 @@
#!/usr/bin/env python3
"""
Simple NTSA Web Scraper with Selenium
A minimal scraper that handles JavaScript-rendered content
"""
import time
import json
from pathlib import Path
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
def scrape_ntsa_page(url: str) -> dict:
"""Scrape a single NTSA page using Selenium"""
driver = None
try:
# Setup Chrome driver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
# Load page
driver.get(url)
time.sleep(3) # Wait for JavaScript to load
# Wait for content
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Get page source and parse
page_source = driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# Extract title
title = soup.find('title')
title_text = title.get_text().strip() if title else "NTSA Page"
# Extract main content
content = soup.get_text().strip()
return {
'url': url,
'title': title_text,
'content': content,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
print(f"Error scraping {url}: {e}")
return None
finally:
if driver:
driver.quit()
def main():
"""Main scraping function"""
print("🕷️ Simple NTSA Scraper")
# Sample URLs to scrape
urls = [
"https://ntsa.go.ke",
"https://ntsa.go.ke/about",
"https://ntsa.go.ke/services"
]
results = []
output_dir = Path("sample_ntsa_data")
output_dir.mkdir(exist_ok=True)
for url in urls:
print(f"Scraping: {url}")
data = scrape_ntsa_page(url)
if data:
results.append(data)
# Save to file
safe_title = "".join(c for c in data['title'] if c.isalnum() or c in (' ', '-', '_')).strip()
safe_title = safe_title.replace(' ', '_')[:30]
filename = f"ntsa_{safe_title}.md"
filepath = output_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"# {data['title']}\n\n")
f.write(f"**URL:** {data['url']}\n")
f.write(f"**Scraped:** {data['timestamp']}\n\n")
f.write(data['content'][:1000] + "...")
# Save metadata
metadata = {
'scraping_date': datetime.now().isoformat(),
'total_pages': len(results),
'pages': results
}
with open(output_dir / 'metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
print(f"✅ Scraped {len(results)} pages to {output_dir}")
if __name__ == "__main__":
main()