Remove nested git and add folder

This commit is contained in:
Abhinav M
2025-07-10 15:40:40 +05:30
parent 0fe9e4e7d7
commit 865bf2dd2c
5 changed files with 2476 additions and 0 deletions

View File

@@ -0,0 +1,473 @@
import urllib.request
import urllib.parse
import urllib.error
import html.parser
import re
from datetime import datetime
import time
import ssl
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from functools import partial
class HTMLParser(html.parser.HTMLParser):
"""Custom HTML parser to extract title, links, and text content"""
def __init__(self):
super().__init__()
self.title = ""
self.links = []
self.text_content = []
self.in_title = False
self.in_body = False
self.current_tag = ""
def handle_starttag(self, tag, attrs):
self.current_tag = tag.lower()
if tag.lower() == 'title':
self.in_title = True
elif tag.lower() == 'body':
self.in_body = True
elif tag.lower() == 'a':
# Extract href attribute
for attr, value in attrs:
if attr.lower() == 'href' and value:
self.links.append(value)
def handle_endtag(self, tag):
if tag.lower() == 'title':
self.in_title = False
elif tag.lower() == 'body':
self.in_body = False
def handle_data(self, data):
if self.in_title:
self.title += data
elif self.in_body and self.current_tag in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'div', 'span', 'li']:
# Clean the text data
cleaned_data = re.sub(r'\s+', ' ', data.strip())
if cleaned_data:
self.text_content.append(cleaned_data)
def get_text(self):
"""Return all extracted text content as a single string"""
return ' '.join(self.text_content)
def get_clean_text(self, max_length=500):
"""Return cleaned text content with length limit"""
text = self.get_text()
# Remove extra whitespace and limit length
text = re.sub(r'\s+', ' ', text.strip())
if len(text) > max_length:
text = text[:max_length] + "..."
return text
class Website:
"""Class to store website data"""
def __init__(self, title, url, content, depth, links=None, load_time=None):
self.title = title or "No Title"
self.url = url
self.content = content
self.depth = depth
self.links = links or []
self.load_time = load_time
self.timestamp = datetime.now()
def get_word_count(self):
"""Get word count from content"""
if not self.content:
return 0
# Extract text content and count words
text_content = re.sub(r'<[^>]+>', '', self.content)
words = text_content.split()
return len(words)
def get_domain(self):
"""Extract domain from URL"""
try:
parsed = urlparse(self.url)
return parsed.netloc
except:
return ""
def get_normalized_domain(self):
"""Get domain without www prefix for consistent filtering"""
domain = self.get_domain()
if domain.startswith('www.'):
return domain[4:]
return domain
def search_content(self, query):
"""Search for query in content"""
if not self.content or not query:
return False
return query.lower() in self.content.lower()
def get_text_preview(self, max_length=200):
"""Get a text preview of the content"""
if not self.content:
return "No content available"
# Extract text content
text_content = re.sub(r'<[^>]+>', '', self.content)
text_content = re.sub(r'\s+', ' ', text_content.strip())
if len(text_content) > max_length:
return text_content[:max_length] + "..."
return text_content
class WebScraper:
"""Web scraper with multithreading support and robust duplicate detection"""
def __init__(self):
self.websites = []
self.visited_urls = set()
self.visited_domains = set() # Track visited domains
self.start_domain = None # Store the starting domain
self.lock = threading.Lock()
self.max_workers = 10 # Number of concurrent threads
# Removed all page limits - unlimited crawling
self.domain_page_counts = {} # Track page count per domain (for statistics only)
self._stop_requested = False # Flag to stop scraping
def normalize_url(self, url):
"""Normalize URL to handle www prefixes and remove fragments"""
if not url:
return url
# Remove fragments (#) to prevent duplicate content
if '#' in url:
url = url.split('#')[0]
# Remove trailing slashes for consistency
url = url.rstrip('/')
# Remove www prefix for consistent domain handling
if url.startswith('https://www.'):
return url.replace('https://www.', 'https://', 1)
elif url.startswith('http://www.'):
return url.replace('http://www.', 'http://', 1)
return url
def get_domain_from_url(self, url):
"""Extract and normalize domain from URL"""
try:
parsed = urlparse(url)
domain = parsed.netloc
if domain.startswith('www.'):
return domain[4:]
return domain
except:
return ""
def should_skip_url(self, url, current_depth):
"""Check if URL should be skipped based on various criteria"""
normalized_url = self.normalize_url(url)
# Skip if already visited
if normalized_url in self.visited_urls:
return True, "Already visited"
# Skip if not a valid HTTP/HTTPS URL
if not normalized_url.startswith(('http://', 'https://')):
return True, "Not HTTP/HTTPS URL"
# Get domain
domain = self.get_domain_from_url(normalized_url)
if not domain:
return True, "Invalid domain"
# Removed all domain page limits - unlimited crawling
# Removed external domain depth limits - crawl as deep as needed
return False, "OK"
def scrape_url(self, url, depth):
"""Scrape a single URL with error handling and rate limiting"""
try:
# Check if stop was requested
if self._stop_requested:
return None
# Check if URL should be skipped
should_skip, reason = self.should_skip_url(url, depth)
if should_skip:
print(f"Skipping {url}: {reason}")
return None
# Normalize URL
normalized_url = self.normalize_url(url)
# Mark as visited and update domain count (for statistics only)
with self.lock:
self.visited_urls.add(normalized_url)
domain = self.get_domain_from_url(normalized_url)
if domain:
self.domain_page_counts[domain] = self.domain_page_counts.get(domain, 0) + 1
# Add small delay to prevent overwhelming servers
time.sleep(0.1)
start_time = time.time()
# Create request with headers
req = urllib.request.Request(
normalized_url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
)
# Fetch the page with timeout
with urllib.request.urlopen(req, timeout=15) as response:
# Check content type
content_type = response.headers.get('content-type', '').lower()
if 'text/html' not in content_type and 'application/xhtml' not in content_type:
print(f"Skipping {url}: Not HTML content ({content_type})")
return None
html_content = response.read().decode('utf-8', errors='ignore')
load_time = time.time() - start_time
# Skip if content is too small (likely error page)
if len(html_content) < 100:
print(f"Skipping {url}: Content too small ({len(html_content)} chars)")
return None
# Parse HTML
parser = HTMLParser()
parser.feed(html_content)
# Extract links and normalize them with duplicate detection
links = []
base_url = normalized_url
seen_links = set() # Track links within this page to avoid duplicates
for link in parser.links:
try:
absolute_url = urljoin(base_url, link)
normalized_link = self.normalize_url(absolute_url)
# Skip if already seen in this page or should be skipped
if normalized_link in seen_links:
continue
seen_links.add(normalized_link)
should_skip, reason = self.should_skip_url(normalized_link, depth + 1)
if should_skip:
continue
# Only include http/https links and filter out common non-content URLs
if (normalized_link.startswith(('http://', 'https://')) and
not any(skip in normalized_link.lower() for skip in [
'mailto:', 'tel:', 'javascript:', 'data:', 'file:',
'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.zip', '.rar',
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.ico',
'.css', '.js', '.xml', '.json', '.txt', '.log'
])):
links.append(normalized_link)
except:
continue
# Create Website object
website = Website(
title=parser.title,
url=normalized_url,
content=html_content,
depth=depth,
links=links,
load_time=load_time
)
return website
except urllib.error.HTTPError as e:
print(f"HTTP Error scraping {url}: {e.code} - {e.reason}")
return None
except urllib.error.URLError as e:
print(f"URL Error scraping {url}: {e.reason}")
return None
except Exception as e:
print(f"Error scraping {url}: {str(e)}")
return None
def crawl_website(self, start_url, max_depth=3, progress_callback=None):
"""Crawl website with multithreading support and no page limits"""
if not start_url.startswith(('http://', 'https://')):
start_url = 'https://' + start_url
# Initialize tracking
self.websites = []
self.visited_urls = set()
self.visited_domains = set()
self.domain_page_counts = {}
self.start_domain = self.get_domain_from_url(start_url)
self._stop_requested = False # Reset stop flag
print(f"Starting crawl from: {start_url}")
print(f"Starting domain: {self.start_domain}")
print(f"Max depth: {max_depth}")
print(f"Unlimited crawling - no page limits")
# Start with the initial URL
urls_to_scrape = [(start_url, 0)]
max_depth_reached = 0
consecutive_empty_levels = 0
max_consecutive_empty = 3 # Stop if 3 consecutive levels have no new URLs
total_pages_scraped = 0
# Removed all page limits - unlimited crawling
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for current_depth in range(max_depth + 1):
# Check if stop was requested
if self._stop_requested:
print("Scraping stopped by user request")
break
if not urls_to_scrape:
print(f"Stopping at depth {current_depth}: No more URLs to scrape")
break
# Check if we've reached too many consecutive empty levels
if consecutive_empty_levels >= max_consecutive_empty:
print(f"Stopping at depth {current_depth}: {max_consecutive_empty} consecutive empty levels")
break
# Removed absolute page limit check - unlimited pages
print(f"Scraping depth {current_depth} with {len(urls_to_scrape)} URLs")
# Submit all URLs at current depth for concurrent scraping
future_to_url = {
executor.submit(self.scrape_url, url, depth): url
for url, depth in urls_to_scrape
}
# Collect results and prepare next level
urls_to_scrape = []
level_results = 0
for future in as_completed(future_to_url):
# Check if stop was requested
if self._stop_requested:
print("Stopping processing of current level")
break
website = future.result()
if website:
with self.lock:
self.websites.append(website)
level_results += 1
total_pages_scraped += 1
# Emit progress if callback provided
if progress_callback:
progress_callback(website)
# Add links for next depth level (no limits)
if current_depth < max_depth:
for link in website.links:
# Removed URL limit per level - process all URLs
should_skip, reason = self.should_skip_url(link, current_depth + 1)
if not should_skip:
urls_to_scrape.append((link, current_depth + 1))
# Check if stop was requested after processing level
if self._stop_requested:
break
# Update depth tracking
if level_results > 0:
max_depth_reached = current_depth
consecutive_empty_levels = 0
else:
consecutive_empty_levels += 1
# Only stop if we've reached the actual max depth
if current_depth >= max_depth:
print(f"Reached maximum depth: {max_depth}")
break
# Print progress summary
print(f"Depth {current_depth} completed: {level_results} pages, Total: {len(self.websites)}")
if self.domain_page_counts:
print(f"Domain breakdown: {dict(self.domain_page_counts)}")
print(f"Crawling completed. Max depth reached: {max_depth_reached}, Total pages: {len(self.websites)}")
print(f"Visited URLs: {len(self.visited_urls)}")
print(f"Domain breakdown: {dict(self.domain_page_counts)}")
return self.websites
def reset(self):
"""Reset the scraper state for a new crawl"""
self.websites = []
self.visited_urls = set()
self.visited_domains = set()
self.domain_page_counts = {}
self.start_domain = None
self._stop_requested = False # Reset stop flag
def get_statistics(self):
"""Get scraping statistics with enhanced tracking information"""
if not self.websites:
return {
'total_pages': 0,
'total_links': 0,
'total_words': 0,
'avg_load_time': 0,
'max_depth_reached': 0,
'domains': {},
'visited_urls_count': 0,
'domain_page_counts': {},
'start_domain': self.start_domain
}
total_pages = len(self.websites)
total_links = sum(len(w.links) for w in self.websites)
total_words = sum(w.get_word_count() for w in self.websites)
load_times = [w.load_time for w in self.websites if w.load_time]
avg_load_time = sum(load_times) / len(load_times) if load_times else 0
max_depth_reached = max(w.depth for w in self.websites)
# Count domains
domains = {}
for website in self.websites:
domain = website.get_normalized_domain()
domains[domain] = domains.get(domain, 0) + 1
return {
'total_pages': total_pages,
'total_links': total_links,
'total_words': total_words,
'avg_load_time': avg_load_time,
'max_depth_reached': max_depth_reached,
'domains': domains,
'visited_urls_count': len(self.visited_urls),
'domain_page_counts': dict(self.domain_page_counts),
'start_domain': self.start_domain
}
def filter_by_domain(self, domain):
"""Filter websites by domain"""
normalized_domain = self.normalize_url(domain)
return [w for w in self.websites if w.get_normalized_domain() == normalized_domain]
def search_websites(self, query):
"""Search websites by query"""
return [w for w in self.websites if w.search_content(query)]
def stop_scraping(self):
"""Request graceful stop of the scraping process"""
self._stop_requested = True