Merge pull request #506 from website-deployer/main

Adding my project to repo
This commit is contained in:
Ed Donner
2025-07-10 08:08:36 -04:00
committed by GitHub
5 changed files with 2476 additions and 0 deletions

View File

@@ -0,0 +1,159 @@
# Web Scraper & Data Analyzer
A modern Python application with a sleek PyQt5 GUI for web scraping, data analysis, visualization, and AI-powered website insights. Features a clean, minimalistic design with real-time progress tracking, comprehensive data filtering, and an integrated AI chat assistant for advanced analysis.
## Features
- **Modern UI**: Clean, minimalistic design with dark theme and smooth animations
- **Web Scraping**: Multi-threaded scraping with configurable depth (max 100 levels)
- **Data Visualization**: Interactive table with sorting and filtering capabilities
- **Content Preview**: Dual preview system with both text and visual HTML rendering
- **Data Analysis**: Comprehensive statistics and domain breakdown
- **AI-Powered Analysis**: Chat-based assistant for website insights, SEO suggestions, and content analysis
- **Export Functionality**: JSON export with full metadata
- **URL Normalization**: Handles www/non-www domains intelligently
- **Real-time Progress**: Live progress updates during scraping operations
- **Loop Prevention**: Advanced duplicate detection to prevent infinite loops
- **Smart Limits**: Configurable limits to prevent runaway scraping
## AI Analysis Tab
The application features an advanced **AI Analysis** tab:
- **Conversational Chat UI**: Ask questions about your scraped websites in a modern chat interface (like ChatGPT)
- **Quick Actions**: One-click questions for structure, SEO, content themes, and performance
- **Markdown Responses**: AI replies are formatted for clarity and readability
- **Context Awareness**: AI uses your scraped data for tailored insights
- **Requirements**: Internet connection and the `openai` Python package (see Installation)
- **Fallback**: If `openai` is not installed, a placeholder response is shown
## Loop Prevention & Duplicate Detection
The scraper includes robust protection against infinite loops and circular references:
### 🔄 URL Normalization
- Removes `www.` prefixes for consistent domain handling
- Strips URL fragments (`#section`) to prevent duplicate content
- Removes trailing slashes for consistency
- Normalizes query parameters
### 🚫 Duplicate Detection
- **Visited URL Tracking**: Maintains a set of all visited URLs
- **Unlimited Crawling**: No page limits per domain or total pages
- **Per-Page Duplicate Filtering**: Removes duplicate links within the same page
### 🛡️ Smart Restrictions
- **No Depth Limits**: Crawl as deep as the specified max_depth allows
- **Content Type Filtering**: Only scrapes HTML content
- **File Type Filtering**: Skips non-content files (PDFs, images, etc.)
- **Consecutive Empty Level Detection**: Stops if 3 consecutive levels have no new content
### 📊 Enhanced Tracking
- **Domain Page Counts**: Tracks pages scraped per domain (for statistics)
- **URL Check Counts**: Shows total URLs checked vs. pages scraped
- **Detailed Statistics**: Comprehensive reporting on scraping efficiency
- **Unlimited Processing**: No artificial limits on crawling scope
## Installation
1. **Clone or download the project files**
2. **Install dependencies**:
```bash
pip install -r requirements.txt
```
- This will install all required packages, including `PyQt5`, `PyQtWebEngine` (for visual preview), and `openai` (for AI features).
3. **Run the application**:
```bash
python web_scraper_app.py
```
## Usage
### 1. Scraping Configuration
- Enter a starting URL (with or without http/https)
- Set maximum crawl depth (1-100)
- Click "Start Scraping" to begin
### 2. Data View & Filtering
- View scraped data in an interactive table
- Filter by search terms or specific domains
- Double-click any row to preview content
- Export data to JSON format
### 3. Analysis & Statistics
- View comprehensive scraping statistics
- See domain breakdown and word counts
- Preview content in both text and visual formats
- Analyze load times and link counts
- Monitor duplicate detection efficiency
### 4. AI Analysis (New!)
- Switch to the **AI Analysis** tab
- Type your question or use quick action buttons (e.g., "Analyze the website structure", "Suggest SEO improvements")
- The AI will analyze your scraped data and provide actionable insights
- Requires an internet connection and the `openai` package
## Visual Preview Feature
The application includes a visual HTML preview feature that renders scraped web pages in a browser-like view:
- **Requirements**: PyQtWebEngine (automatically installed with requirements.txt)
- **Functionality**: Displays HTML content with proper styling and formatting
- **Fallback**: If PyQtWebEngine is not available, shows a text-only preview
- **Error Handling**: Graceful error messages for invalid HTML content
## Technical Details
- **Backend**: Pure Python with urllib and html.parser (no compilation required)
- **Frontend**: PyQt5 with custom modern styling
- **Threading**: Multi-threaded scraping for better performance
- **Data Storage**: Website objects with full metadata
- **URL Handling**: Intelligent normalization and domain filtering
- **Loop Prevention**: Multi-layered duplicate detection system
- **AI Integration**: Uses OpenAI API (via openrouter) for chat-based analysis
## File Structure
```
Testing/
├── web_scraper_app.py # Main application (with AI and GUI)
├── module.py # Core scraping logic
├── test.py # Basic functionality tests
├── requirements.txt # Dependencies
└── README.md # This file
```
## Troubleshooting
### Visual Preview Not Working
1. Ensure PyQtWebEngine is installed: `pip install PyQtWebEngine`
2. Check console output for import errors
### AI Analysis Not Working
1. Ensure the `openai` package is installed: `pip install openai`
2. Check your internet connection (AI requires online access)
3. If not installed, the AI tab will show a placeholder response
### Scraping Issues
1. Verify internet connection
2. Check URL format (add https:// if needed)
3. Try with a lower depth setting
4. Check console for error messages
### Loop Prevention
1. The scraper automatically prevents infinite loops
2. Check the analysis tab for detailed statistics
3. Monitor "Total URLs Checked" vs "Total Pages" for efficiency
4. Use lower depth settings for sites with many internal links
### Performance
- Use lower depth settings for faster scraping
- Filter data to focus on specific domains
- Close other applications to free up resources
- Monitor domain page counts to avoid hitting limits
## License
This project is open source and available under the MIT License.

View File

@@ -0,0 +1,473 @@
import urllib.request
import urllib.parse
import urllib.error
import html.parser
import re
from datetime import datetime
import time
import ssl
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from functools import partial
class HTMLParser(html.parser.HTMLParser):
"""Custom HTML parser to extract title, links, and text content"""
def __init__(self):
super().__init__()
self.title = ""
self.links = []
self.text_content = []
self.in_title = False
self.in_body = False
self.current_tag = ""
def handle_starttag(self, tag, attrs):
self.current_tag = tag.lower()
if tag.lower() == 'title':
self.in_title = True
elif tag.lower() == 'body':
self.in_body = True
elif tag.lower() == 'a':
# Extract href attribute
for attr, value in attrs:
if attr.lower() == 'href' and value:
self.links.append(value)
def handle_endtag(self, tag):
if tag.lower() == 'title':
self.in_title = False
elif tag.lower() == 'body':
self.in_body = False
def handle_data(self, data):
if self.in_title:
self.title += data
elif self.in_body and self.current_tag in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'div', 'span', 'li']:
# Clean the text data
cleaned_data = re.sub(r'\s+', ' ', data.strip())
if cleaned_data:
self.text_content.append(cleaned_data)
def get_text(self):
"""Return all extracted text content as a single string"""
return ' '.join(self.text_content)
def get_clean_text(self, max_length=500):
"""Return cleaned text content with length limit"""
text = self.get_text()
# Remove extra whitespace and limit length
text = re.sub(r'\s+', ' ', text.strip())
if len(text) > max_length:
text = text[:max_length] + "..."
return text
class Website:
"""Class to store website data"""
def __init__(self, title, url, content, depth, links=None, load_time=None):
self.title = title or "No Title"
self.url = url
self.content = content
self.depth = depth
self.links = links or []
self.load_time = load_time
self.timestamp = datetime.now()
def get_word_count(self):
"""Get word count from content"""
if not self.content:
return 0
# Extract text content and count words
text_content = re.sub(r'<[^>]+>', '', self.content)
words = text_content.split()
return len(words)
def get_domain(self):
"""Extract domain from URL"""
try:
parsed = urlparse(self.url)
return parsed.netloc
except:
return ""
def get_normalized_domain(self):
"""Get domain without www prefix for consistent filtering"""
domain = self.get_domain()
if domain.startswith('www.'):
return domain[4:]
return domain
def search_content(self, query):
"""Search for query in content"""
if not self.content or not query:
return False
return query.lower() in self.content.lower()
def get_text_preview(self, max_length=200):
"""Get a text preview of the content"""
if not self.content:
return "No content available"
# Extract text content
text_content = re.sub(r'<[^>]+>', '', self.content)
text_content = re.sub(r'\s+', ' ', text_content.strip())
if len(text_content) > max_length:
return text_content[:max_length] + "..."
return text_content
class WebScraper:
"""Web scraper with multithreading support and robust duplicate detection"""
def __init__(self):
self.websites = []
self.visited_urls = set()
self.visited_domains = set() # Track visited domains
self.start_domain = None # Store the starting domain
self.lock = threading.Lock()
self.max_workers = 10 # Number of concurrent threads
# Removed all page limits - unlimited crawling
self.domain_page_counts = {} # Track page count per domain (for statistics only)
self._stop_requested = False # Flag to stop scraping
def normalize_url(self, url):
"""Normalize URL to handle www prefixes and remove fragments"""
if not url:
return url
# Remove fragments (#) to prevent duplicate content
if '#' in url:
url = url.split('#')[0]
# Remove trailing slashes for consistency
url = url.rstrip('/')
# Remove www prefix for consistent domain handling
if url.startswith('https://www.'):
return url.replace('https://www.', 'https://', 1)
elif url.startswith('http://www.'):
return url.replace('http://www.', 'http://', 1)
return url
def get_domain_from_url(self, url):
"""Extract and normalize domain from URL"""
try:
parsed = urlparse(url)
domain = parsed.netloc
if domain.startswith('www.'):
return domain[4:]
return domain
except:
return ""
def should_skip_url(self, url, current_depth):
"""Check if URL should be skipped based on various criteria"""
normalized_url = self.normalize_url(url)
# Skip if already visited
if normalized_url in self.visited_urls:
return True, "Already visited"
# Skip if not a valid HTTP/HTTPS URL
if not normalized_url.startswith(('http://', 'https://')):
return True, "Not HTTP/HTTPS URL"
# Get domain
domain = self.get_domain_from_url(normalized_url)
if not domain:
return True, "Invalid domain"
# Removed all domain page limits - unlimited crawling
# Removed external domain depth limits - crawl as deep as needed
return False, "OK"
def scrape_url(self, url, depth):
"""Scrape a single URL with error handling and rate limiting"""
try:
# Check if stop was requested
if self._stop_requested:
return None
# Check if URL should be skipped
should_skip, reason = self.should_skip_url(url, depth)
if should_skip:
print(f"Skipping {url}: {reason}")
return None
# Normalize URL
normalized_url = self.normalize_url(url)
# Mark as visited and update domain count (for statistics only)
with self.lock:
self.visited_urls.add(normalized_url)
domain = self.get_domain_from_url(normalized_url)
if domain:
self.domain_page_counts[domain] = self.domain_page_counts.get(domain, 0) + 1
# Add small delay to prevent overwhelming servers
time.sleep(0.1)
start_time = time.time()
# Create request with headers
req = urllib.request.Request(
normalized_url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
)
# Fetch the page with timeout
with urllib.request.urlopen(req, timeout=15) as response:
# Check content type
content_type = response.headers.get('content-type', '').lower()
if 'text/html' not in content_type and 'application/xhtml' not in content_type:
print(f"Skipping {url}: Not HTML content ({content_type})")
return None
html_content = response.read().decode('utf-8', errors='ignore')
load_time = time.time() - start_time
# Skip if content is too small (likely error page)
if len(html_content) < 100:
print(f"Skipping {url}: Content too small ({len(html_content)} chars)")
return None
# Parse HTML
parser = HTMLParser()
parser.feed(html_content)
# Extract links and normalize them with duplicate detection
links = []
base_url = normalized_url
seen_links = set() # Track links within this page to avoid duplicates
for link in parser.links:
try:
absolute_url = urljoin(base_url, link)
normalized_link = self.normalize_url(absolute_url)
# Skip if already seen in this page or should be skipped
if normalized_link in seen_links:
continue
seen_links.add(normalized_link)
should_skip, reason = self.should_skip_url(normalized_link, depth + 1)
if should_skip:
continue
# Only include http/https links and filter out common non-content URLs
if (normalized_link.startswith(('http://', 'https://')) and
not any(skip in normalized_link.lower() for skip in [
'mailto:', 'tel:', 'javascript:', 'data:', 'file:',
'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.zip', '.rar',
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.ico',
'.css', '.js', '.xml', '.json', '.txt', '.log'
])):
links.append(normalized_link)
except:
continue
# Create Website object
website = Website(
title=parser.title,
url=normalized_url,
content=html_content,
depth=depth,
links=links,
load_time=load_time
)
return website
except urllib.error.HTTPError as e:
print(f"HTTP Error scraping {url}: {e.code} - {e.reason}")
return None
except urllib.error.URLError as e:
print(f"URL Error scraping {url}: {e.reason}")
return None
except Exception as e:
print(f"Error scraping {url}: {str(e)}")
return None
def crawl_website(self, start_url, max_depth=3, progress_callback=None):
"""Crawl website with multithreading support and no page limits"""
if not start_url.startswith(('http://', 'https://')):
start_url = 'https://' + start_url
# Initialize tracking
self.websites = []
self.visited_urls = set()
self.visited_domains = set()
self.domain_page_counts = {}
self.start_domain = self.get_domain_from_url(start_url)
self._stop_requested = False # Reset stop flag
print(f"Starting crawl from: {start_url}")
print(f"Starting domain: {self.start_domain}")
print(f"Max depth: {max_depth}")
print(f"Unlimited crawling - no page limits")
# Start with the initial URL
urls_to_scrape = [(start_url, 0)]
max_depth_reached = 0
consecutive_empty_levels = 0
max_consecutive_empty = 3 # Stop if 3 consecutive levels have no new URLs
total_pages_scraped = 0
# Removed all page limits - unlimited crawling
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for current_depth in range(max_depth + 1):
# Check if stop was requested
if self._stop_requested:
print("Scraping stopped by user request")
break
if not urls_to_scrape:
print(f"Stopping at depth {current_depth}: No more URLs to scrape")
break
# Check if we've reached too many consecutive empty levels
if consecutive_empty_levels >= max_consecutive_empty:
print(f"Stopping at depth {current_depth}: {max_consecutive_empty} consecutive empty levels")
break
# Removed absolute page limit check - unlimited pages
print(f"Scraping depth {current_depth} with {len(urls_to_scrape)} URLs")
# Submit all URLs at current depth for concurrent scraping
future_to_url = {
executor.submit(self.scrape_url, url, depth): url
for url, depth in urls_to_scrape
}
# Collect results and prepare next level
urls_to_scrape = []
level_results = 0
for future in as_completed(future_to_url):
# Check if stop was requested
if self._stop_requested:
print("Stopping processing of current level")
break
website = future.result()
if website:
with self.lock:
self.websites.append(website)
level_results += 1
total_pages_scraped += 1
# Emit progress if callback provided
if progress_callback:
progress_callback(website)
# Add links for next depth level (no limits)
if current_depth < max_depth:
for link in website.links:
# Removed URL limit per level - process all URLs
should_skip, reason = self.should_skip_url(link, current_depth + 1)
if not should_skip:
urls_to_scrape.append((link, current_depth + 1))
# Check if stop was requested after processing level
if self._stop_requested:
break
# Update depth tracking
if level_results > 0:
max_depth_reached = current_depth
consecutive_empty_levels = 0
else:
consecutive_empty_levels += 1
# Only stop if we've reached the actual max depth
if current_depth >= max_depth:
print(f"Reached maximum depth: {max_depth}")
break
# Print progress summary
print(f"Depth {current_depth} completed: {level_results} pages, Total: {len(self.websites)}")
if self.domain_page_counts:
print(f"Domain breakdown: {dict(self.domain_page_counts)}")
print(f"Crawling completed. Max depth reached: {max_depth_reached}, Total pages: {len(self.websites)}")
print(f"Visited URLs: {len(self.visited_urls)}")
print(f"Domain breakdown: {dict(self.domain_page_counts)}")
return self.websites
def reset(self):
"""Reset the scraper state for a new crawl"""
self.websites = []
self.visited_urls = set()
self.visited_domains = set()
self.domain_page_counts = {}
self.start_domain = None
self._stop_requested = False # Reset stop flag
def get_statistics(self):
"""Get scraping statistics with enhanced tracking information"""
if not self.websites:
return {
'total_pages': 0,
'total_links': 0,
'total_words': 0,
'avg_load_time': 0,
'max_depth_reached': 0,
'domains': {},
'visited_urls_count': 0,
'domain_page_counts': {},
'start_domain': self.start_domain
}
total_pages = len(self.websites)
total_links = sum(len(w.links) for w in self.websites)
total_words = sum(w.get_word_count() for w in self.websites)
load_times = [w.load_time for w in self.websites if w.load_time]
avg_load_time = sum(load_times) / len(load_times) if load_times else 0
max_depth_reached = max(w.depth for w in self.websites)
# Count domains
domains = {}
for website in self.websites:
domain = website.get_normalized_domain()
domains[domain] = domains.get(domain, 0) + 1
return {
'total_pages': total_pages,
'total_links': total_links,
'total_words': total_words,
'avg_load_time': avg_load_time,
'max_depth_reached': max_depth_reached,
'domains': domains,
'visited_urls_count': len(self.visited_urls),
'domain_page_counts': dict(self.domain_page_counts),
'start_domain': self.start_domain
}
def filter_by_domain(self, domain):
"""Filter websites by domain"""
normalized_domain = self.normalize_url(domain)
return [w for w in self.websites if w.get_normalized_domain() == normalized_domain]
def search_websites(self, query):
"""Search websites by query"""
return [w for w in self.websites if w.search_content(query)]
def stop_scraping(self):
"""Request graceful stop of the scraping process"""
self._stop_requested = True

View File

@@ -0,0 +1,5 @@
PyQt5>=5.15.0
PyQtWebEngine>=5.15.0
urllib3==2.0.7
openai>=1.0.0
python-dotenv>=1.0.0

View File

@@ -0,0 +1,161 @@
#!/usr/bin/env python3
"""
Simple test script to verify the web scraping functionality
"""
import module
def test_basic_scraping():
"""Test basic scraping functionality"""
print("Testing basic web scraping...")
# Create a scraper instance
scraper = module.WebScraper()
# Test with a simple website (httpbin.org is a safe test site)
test_url = "https://httpbin.org/html"
print(f"Scraping {test_url} with depth 1...")
try:
# Scrape with depth 1 to keep it fast
websites = scraper.crawl_website(test_url, max_depth=1)
print(f"Successfully scraped {len(websites)} websites")
if websites:
# Show first website details
first_site = websites[0]
print(f"\nFirst website:")
print(f" Title: {first_site.title}")
print(f" URL: {first_site.url}")
print(f" Depth: {first_site.depth}")
print(f" Links found: {len(first_site.links)}")
print(f" Word count: {first_site.get_word_count()}")
# Show statistics
stats = scraper.get_statistics()
print(f"\nStatistics:")
print(f" Total pages: {stats['total_pages']}")
print(f" Total links: {stats['total_links']}")
print(f" Total words: {stats['total_words']}")
print(f" Average load time: {stats['avg_load_time']:.2f}s")
return True
else:
print("No websites were scraped")
return False
except Exception as e:
print(f"Error during scraping: {e}")
return False
def test_website_class():
"""Test the Website class functionality"""
print("\nTesting Website class...")
# Create a test website
website = module.Website(
title="Test Website",
url="https://example.com",
content="<html><body><h1>Test Content</h1><p>This is a test paragraph.</p></body></html>",
depth=0,
links=["https://example.com/page1", "https://example.com/page2"]
)
# Test methods
print(f"Website title: {website.title}")
print(f"Website URL: {website.url}")
print(f"Word count: {website.get_word_count()}")
print(f"Domain: {website.get_domain()}")
print(f"Normalized domain: {website.get_normalized_domain()}")
print(f"Search for 'test': {website.search_content('test')}")
print(f"Search for 'nonexistent': {website.search_content('nonexistent')}")
return True
def test_html_parser():
"""Test the HTML parser functionality"""
print("\nTesting HTML Parser...")
parser = module.HTMLParser()
test_html = """
<html>
<head><title>Test Page</title></head>
<body>
<h1>Welcome</h1>
<p>This is a <a href="https://example.com">link</a> to example.com</p>
<p>Here's another <a href="/relative-link">relative link</a></p>
</body>
</html>
"""
parser.feed(test_html)
print(f"Title extracted: {parser.title}")
print(f"Links found: {parser.links}")
print(f"Text content length: {len(parser.get_text())}")
return True
def test_url_normalization():
"""Test URL normalization to handle www. prefixes"""
print("\nTesting URL Normalization...")
scraper = module.WebScraper()
# Test URLs with and without www.
test_urls = [
"https://www.example.com/page",
"https://example.com/page",
"http://www.test.com/path?param=value#fragment",
"http://test.com/path?param=value#fragment"
]
print("URL Normalization Results:")
for url in test_urls:
normalized = scraper.normalize_url(url)
print(f" Original: {url}")
print(f" Normalized: {normalized}")
print()
# Test domain filtering
print("Domain Filtering Test:")
test_websites = [
module.Website("Site 1", "https://www.example.com", "content", 0),
module.Website("Site 2", "https://example.com", "content", 0),
module.Website("Site 3", "https://www.test.com", "content", 0)
]
scraper.websites = test_websites
# Test filtering by domain with and without www.
domains_to_test = ["example.com", "www.example.com", "test.com", "www.test.com"]
for domain in domains_to_test:
filtered = scraper.filter_by_domain(domain)
print(f" Filter '{domain}': {len(filtered)} results")
for site in filtered:
print(f" - {site.title} ({site.url})")
return True
if __name__ == "__main__":
print("Web Scraper Test Suite")
print("=" * 50)
# Test HTML parser
test_html_parser()
# Test Website class
test_website_class()
# Test URL normalization
test_url_normalization()
# Test basic scraping (uncomment to test actual scraping)
# Note: This requires internet connection
# test_basic_scraping()
print("\nTest completed!")
print("\nTo run the full application:")
print("python web_scraper_app.py")

File diff suppressed because it is too large Load Diff