diff --git a/.gitignore b/.gitignore
index ae3e438..b8c3ff8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -70,3 +70,4 @@ htmlcov/
.coverage.*
coverage.xml
*.cover
+/example-site/*
diff --git a/README-test-environment.md b/README-test-environment.md
new file mode 100644
index 0000000..a481bcb
--- /dev/null
+++ b/README-test-environment.md
@@ -0,0 +1,72 @@
+# Web Scraper Test Environment
+
+This directory contains a complete local test environment for testing the web scraper against a controlled website with a known structure.
+
+## Generated Test Site
+
+A test website with the following characteristics has been generated:
+- 400+ HTML pages in a hierarchical structure
+- Maximum depth of 5 levels
+- Navigation links between pages at different levels
+- Proper `robots.txt` and `sitemap.xml` files
+- Random metadata on pages for testing extraction
+
+## Directory Structure
+
+- `example-site/` - Contains all the generated HTML files and resources
+ - `index.html` - Homepage
+ - `page*.html` - Top-level pages
+ - `section*/` - Section directories with their own pages
+ - `robots.txt` - Contains crawler directives with some intentionally disallowed pages
+ - `sitemap.xml` - XML sitemap with all publicly available pages
+
+- `nginx/` - Contains Nginx configuration
+ - `nginx.conf` - Server configuration with directory listing enabled
+
+- `docker-compose.yml` - Docker Compose configuration for running Nginx
+
+- `generate_test_site.py` - Script that generated the test site
+
+## Running the Test Environment
+
+1. Make sure Docker and Docker Compose are installed and running
+2. Start the Nginx server:
+ ```
+ docker-compose up -d
+ ```
+3. The test site will be available at http://localhost:8080
+
+## Testing the Scraper
+
+You can test your scraper against this environment with:
+
+```
+python main.py http://localhost:8080 --depth 3
+```
+
+Additional test commands:
+
+- Test with sitemap parsing:
+ ```
+ python main.py http://localhost:8080 --use-sitemap
+ ```
+
+- Test with robots.txt consideration:
+ ```
+ python main.py http://localhost:8080 --respect-robots-txt
+ ```
+
+## Site Characteristics for Testing
+
+- The site contains a mix of pages that link to subpages
+- Some deeper pages (depth >= 3) are disallowed in robots.txt
+- Pages have consistent navigation but varying depth
+- The sitemap includes all non-disallowed pages with metadata
+
+## Regenerating the Test Site
+
+If you need to regenerate the test site with different characteristics, modify the configuration variables at the top of the `generate_test_site.py` file and run:
+
+```
+./venv/bin/python generate_test_site.py
+```
\ No newline at end of file
diff --git a/README.md b/README.md
index f621c60..6f4ce92 100644
--- a/README.md
+++ b/README.md
@@ -70,3 +70,160 @@ Additional considerations:
For storing the crawled data:
- Define a clear structure for storing URLs and their associated content
- Consider what metadata to keep (status code, headers, timestamps)
+
+## User Guide
+
+### Installation
+
+1. Clone the repository:
+```bash
+git clone https://github.com/your-username/scraper.git
+cd scraper
+```
+
+2. Create and activate a virtual environment:
+```bash
+python -m venv venv
+source venv/bin/activate # On Windows: venv\Scripts\activate
+```
+
+3. Install dependencies:
+```bash
+pip install -r requirements.txt
+```
+
+### Basic Usage
+
+To start crawling a website:
+
+```bash
+python main.py https://example.com
+```
+
+This will crawl the website with default settings (depth of 3, respecting robots.txt, not following external links).
+
+### Command Line Options
+
+The scraper supports the following command-line arguments:
+
+| Option | Description |
+|--------|-------------|
+| `url` | The URL to start crawling from (required) |
+| `-h, --help` | Show help message and exit |
+| `-d, --depth DEPTH` | Maximum recursion depth (default: 3) |
+| `--allow-external` | Allow crawling external domains |
+| `--no-subdomains` | Disallow crawling subdomains |
+| `-c, --concurrency CONCURRENCY` | Maximum concurrent requests (default: 10) |
+| `--no-cache` | Disable caching |
+| `--cache-dir CACHE_DIR` | Directory for cache storage |
+| `--delay DELAY` | Delay between requests in seconds (default: 0.1) |
+| `-v, --verbose` | Enable verbose logging |
+| `--output-dir OUTPUT_DIR` | Directory to save results as JSON files |
+| `--print-pages` | Print scraped pages to console |
+| `--ignore-robots` | Ignore robots.txt rules |
+| `--use-sitemap` | Use sitemap.xml for URL discovery |
+| `--max-subsitemaps MAX_SUBSITEMAPS` | Maximum number of sub-sitemaps to process (default: 5) |
+| `--sitemap-timeout SITEMAP_TIMEOUT` | Timeout in seconds for sitemap processing (default: 30) |
+
+### Examples
+
+#### Crawl with a specific depth limit:
+```bash
+python main.py https://example.com --depth 5
+```
+
+#### Allow crawling external domains:
+```bash
+python main.py https://example.com --allow-external
+```
+
+#### Save crawled pages to a specific directory:
+```bash
+python main.py https://example.com --output-dir results
+```
+
+#### Use sitemap for discovery with a longer timeout:
+```bash
+python main.py https://example.com --use-sitemap --sitemap-timeout 60
+```
+
+#### Maximum performance for a large site:
+```bash
+python main.py https://example.com --depth 4 --concurrency 20 --ignore-robots
+```
+
+#### Crawl site slowly to avoid rate limiting:
+```bash
+python main.py https://example.com --delay 1.0
+```
+
+## Testing
+
+The project includes a local testing environment based on Docker that generates a controlled website structure for development and testing purposes.
+
+### Test Environment Features
+
+- 400+ HTML pages in a hierarchical structure
+- Maximum depth of 5 levels
+- Navigation links between pages at different levels
+- Proper `robots.txt` and `sitemap.xml` files
+- Random metadata on pages for testing extraction
+
+### Setting Up the Test Environment
+
+1. Make sure Docker and Docker Compose are installed and running.
+
+2. Generate the test site (if not already done):
+```bash
+./venv/bin/python generate_test_site.py
+```
+
+3. Start the Nginx server:
+```bash
+docker-compose up -d
+```
+
+4. The test site will be available at http://localhost:8080
+
+### Running Tests Against the Test Environment
+
+#### Basic crawl:
+```bash
+python main.py http://localhost:8080 --depth 2
+```
+
+#### Test with sitemap parsing:
+```bash
+python main.py http://localhost:8080 --use-sitemap
+```
+
+#### Test robots.txt handling:
+```bash
+# Default behavior respects robots.txt
+python main.py http://localhost:8080 --depth 4
+
+# Ignore robots.txt to crawl all pages
+python main.py http://localhost:8080 --depth 4 --ignore-robots
+```
+
+#### Save the crawled results:
+```bash
+python main.py http://localhost:8080 --output-dir test_results
+```
+
+### Stopping the Test Environment
+
+To stop the Docker container:
+```bash
+docker-compose down
+```
+
+### Regenerating the Test Site
+
+If you need to regenerate the test site with different characteristics, modify the configuration variables at the top of the `generate_test_site.py` file and run:
+
+```bash
+./venv/bin/python generate_test_site.py
+```
+
+For more details on the test environment, see the [README-test-environment.md](README-test-environment.md) file.
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..62be3b1
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,11 @@
+version: '3'
+
+services:
+ nginx:
+ image: nginx:alpine
+ ports:
+ - "8080:80"
+ volumes:
+ - ./example-site:/usr/share/nginx/html
+ - ./nginx/nginx.conf:/etc/nginx/conf.d/default.conf
+ restart: always
\ No newline at end of file
diff --git a/generate_test_site.py b/generate_test_site.py
new file mode 100644
index 0000000..60b9eb5
--- /dev/null
+++ b/generate_test_site.py
@@ -0,0 +1,251 @@
+#!/usr/bin/env python
+import os
+import random
+import xml.dom.minidom
+from datetime import datetime, timedelta
+
+# Configuration
+NUM_TOP_LEVEL_PAGES = 8
+NUM_SECTIONS = 6 # Number of section directories
+PAGES_PER_SECTION = 7 # Pages per section
+MAX_DEPTH = 5 # Maximum depth of page hierarchy
+SITE_DOMAIN = "http://localhost:8080" # Domain for sitemap
+
+# Create directories
+os.makedirs("example-site", exist_ok=True)
+for i in range(1, NUM_SECTIONS + 1):
+ os.makedirs(f"example-site/section{i}", exist_ok=True)
+
+# Track all pages for sitemap and robots
+all_pages = []
+disallowed_pages = []
+
+def create_navigation(current_page, depth=0):
+ """Create navigation links for a page."""
+ nav_links = [
+ f'
Home '
+ ]
+
+ # Add links to top-level pages
+ for i in range(1, NUM_TOP_LEVEL_PAGES + 1):
+ page_name = f"page{i}.html"
+ if current_page != page_name:
+ nav_links.append(f'Page {i} ')
+
+ # Add links to sections
+ for i in range(1, NUM_SECTIONS + 1):
+ section = f"section{i}"
+ nav_links.append(f'Section {i} ')
+
+ return f"""
+
+
+
+ """
+
+def create_content(page_name, depth=0, section=None):
+ """Create content with links based on depth and section."""
+ links = []
+
+ # Add links based on depth
+ if depth < MAX_DEPTH:
+ # Create "child" pages (deeper hierarchy)
+ child_pages = random.randint(1, 3) # Random number of child pages
+ for i in range(1, child_pages + 1):
+ child_name = f"subpage{i}.html"
+ path_prefix = f"{section}/" if section else ""
+
+ # Build the correct path based on current page's location
+ if page_name == "index.html":
+ link_path = f"{path_prefix}{child_name}"
+ elif page_name.startswith("subpage"):
+ # For subpages, append depth information to distinguish them
+ dirname = os.path.dirname(f"depth{depth+1}_{child_name}")
+ if dirname:
+ os.makedirs(f"example-site/{path_prefix}{dirname}", exist_ok=True)
+ link_path = f"{path_prefix}depth{depth+1}_{child_name}"
+ else:
+ # For regular pages, create subpages in their "directory"
+ dir_name = page_name.replace(".html", "")
+ os.makedirs(f"example-site/{path_prefix}{dir_name}", exist_ok=True)
+ link_path = f"{path_prefix}{dir_name}/{child_name}"
+
+ links.append(f'Child page {i} (depth {depth+1}) ')
+
+ # Add the child page to all pages list
+ all_pages.append(f"/{link_path}")
+
+ # Create the child page recursively
+ create_page(link_path, depth + 1, section)
+
+ # Add some cross-section links on higher level pages
+ if depth <= 1 and random.random() < 0.7:
+ other_section = random.randint(1, NUM_SECTIONS)
+ section_page = random.randint(1, PAGES_PER_SECTION)
+ links.append(f'Random link to Section {other_section} ')
+
+ # Create content with links
+ content = f"""
+ {section if section else "Main"} - {'Index' if page_name == 'index.html' else page_name.replace('.html', '')}
+ This is a test page at depth {depth}.
+
+
+
+ {'Subpages ' if links else 'No subpages available.
'}
+
+ Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nullam auctor,
+ nisl eget ultricies tincidunt, nisl nisl aliquet nisl, eget aliquet nisl
+ nisl eget nisl. Nullam auctor, nisl eget ultricies tincidunt.
+ """
+ return content
+
+def create_page(page_path, depth=0, section=None):
+ """Create an HTML page at the given path."""
+ is_section_index = page_path.endswith("/")
+
+ if is_section_index:
+ page_path = page_path + "index.html"
+
+ # Determine actual filesystem path
+ file_path = os.path.join("example-site", page_path)
+
+ # Create parent directory if needed
+ os.makedirs(os.path.dirname(file_path), exist_ok=True)
+
+ # Extract the page name for navigation
+ page_name = os.path.basename(page_path)
+
+ # Randomly disallow some deep pages from robots.txt
+ if depth >= 3 and random.random() < 0.3:
+ disallowed_pages.append(f"/{page_path}")
+
+ # Create HTML content
+ navigation = create_navigation(page_name, depth)
+ content = create_content(page_name, depth, section)
+
+ html = f"""
+
+
+
+
+ {'Section ' + section.replace('section', '') if section else 'Main'} - {page_name.replace('.html', '')}
+
+
+
+
+
+
+ {content}
+
+
+
+"""
+
+ # Write the HTML file
+ with open(file_path, "w") as f:
+ f.write(html)
+
+ return f"/{page_path}"
+
+# Create homepage
+print("Generating homepage...")
+homepage_path = create_page("index.html")
+all_pages.append(homepage_path)
+
+# Create top-level pages
+print("Generating top-level pages...")
+for i in range(1, NUM_TOP_LEVEL_PAGES + 1):
+ page_path = create_page(f"page{i}.html")
+ all_pages.append(page_path)
+
+# Create sections with pages
+print("Generating section pages...")
+for section_num in range(1, NUM_SECTIONS + 1):
+ section = f"section{section_num}"
+
+ # Create section index
+ section_index_path = create_page(f"{section}/", 0, section)
+ all_pages.append(section_index_path)
+
+ # Create section pages
+ for page_num in range(1, PAGES_PER_SECTION + 1):
+ page_path = create_page(f"{section}/page{page_num}.html", 1, section)
+ all_pages.append(page_path)
+
+# Create robots.txt
+print("Generating robots.txt...")
+robots_content = """User-agent: *
+Crawl-delay: 0.1
+
+"""
+for disallowed in disallowed_pages:
+ robots_content += f"Disallow: {disallowed}\n"
+
+with open("example-site/robots.txt", "w") as f:
+ f.write(robots_content)
+
+# Create sitemap.xml
+print("Generating sitemap.xml...")
+doc = xml.dom.minidom.getDOMImplementation().createDocument(None, "urlset", None)
+root = doc.documentElement
+root.setAttribute("xmlns", "http://www.sitemaps.org/schemas/sitemap/0.9")
+
+for page in all_pages:
+ if page not in disallowed_pages: # Don't include disallowed pages in sitemap
+ url_elem = doc.createElement("url")
+
+ loc = doc.createElement("loc")
+ loc_text = doc.createTextNode(f"{SITE_DOMAIN}{page}")
+ loc.appendChild(loc_text)
+ url_elem.appendChild(loc)
+
+ # Add lastmod with random date
+ lastmod = doc.createElement("lastmod")
+ date = (datetime.now() - timedelta(days=random.randint(0, 30))).strftime('%Y-%m-%d')
+ lastmod_text = doc.createTextNode(date)
+ lastmod.appendChild(lastmod_text)
+ url_elem.appendChild(lastmod)
+
+ # Add changefreq
+ changefreq = doc.createElement("changefreq")
+ freq_options = ["daily", "weekly", "monthly"]
+ freq = random.choice(freq_options)
+ changefreq_text = doc.createTextNode(freq)
+ changefreq.appendChild(changefreq_text)
+ url_elem.appendChild(changefreq)
+
+ # Add priority
+ priority = doc.createElement("priority")
+ # Higher level pages get higher priority
+ if page.count('/') <= 2:
+ pri = 0.8
+ else:
+ pri = 0.5
+ priority_text = doc.createTextNode(f"{pri:.1f}")
+ priority.appendChild(priority_text)
+ url_elem.appendChild(priority)
+
+ root.appendChild(url_elem)
+
+with open("example-site/sitemap.xml", "w") as f:
+ f.write(doc.toprettyxml())
+
+# Print summary
+print(f"Generated {len(all_pages)} pages")
+print(f"Disallowed {len(disallowed_pages)} pages from robots.txt")
+print("Done!")
\ No newline at end of file
diff --git a/main.py b/main.py
old mode 100644
new mode 100755
index e69de29..3ecc169
--- a/main.py
+++ b/main.py
@@ -0,0 +1,136 @@
+#!/usr/bin/env python
+import argparse
+import logging
+import sys
+import os
+from typing import Dict, Any
+
+from scraper.crawler import Crawler
+from scraper.callbacks import console_printer, json_file_writer, link_collector
+
+
+def configure_logging(verbose: bool) -> None:
+ """
+ Configure logging based on verbosity level.
+
+ Args:
+ verbose: Whether to enable verbose logging
+ """
+ log_level = logging.DEBUG if verbose else logging.INFO
+ logging.basicConfig(
+ level=log_level,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ handlers=[logging.StreamHandler()]
+ )
+
+
+def print_stats(stats: Dict[str, Any]) -> None:
+ """
+ Print crawling statistics in a pretty format.
+
+ Args:
+ stats: Dictionary of stats from the crawler
+ """
+ print("\n===== Crawling Statistics =====")
+ print(f"Pages Crawled: {stats['pages_crawled']}")
+ print(f"Pages Skipped (from cache): {stats['pages_skipped']}")
+ print(f"Total URLs Visited: {stats['total_urls']}")
+
+ # Print sitemap stats if available
+ if "sitemap_urls_found" in stats:
+ print(f"Sitemap URLs Found: {stats['sitemap_urls_found']}")
+ print(f"Sitemap URLs Used: {stats['sitemap_urls_used']}")
+
+ print(f"Duration: {stats['duration']:.2f} seconds")
+ print("==============================\n")
+
+
+def main() -> int:
+ """
+ Main entry point for the scraper.
+
+ Returns:
+ Exit code (0 for success, non-zero for errors)
+ """
+ parser = argparse.ArgumentParser(description="Web crawler that recursively follows links from a starting URL")
+
+ parser.add_argument("url", help="The URL to start crawling from")
+ parser.add_argument("-d", "--depth", type=int, default=3, help="Maximum recursion depth (default: 3)")
+ parser.add_argument("--allow-external", action="store_true", help="Allow crawling external domains")
+ parser.add_argument("--no-subdomains", action="store_true", help="Disallow crawling subdomains")
+ parser.add_argument("-c", "--concurrency", type=int, default=10, help="Maximum concurrent requests (default: 10)")
+ parser.add_argument("--no-cache", action="store_true", help="Disable caching")
+ parser.add_argument("--cache-dir", help="Directory for cache storage")
+ parser.add_argument("--delay", type=float, default=0.1, help="Delay between requests in seconds (default: 0.1)")
+ parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose logging")
+ parser.add_argument("--output-dir", help="Directory to save results as JSON files")
+ parser.add_argument("--print-pages", action="store_true", help="Print scraped pages to console")
+ parser.add_argument("--ignore-robots", action="store_true", help="Ignore robots.txt rules")
+ parser.add_argument("--use-sitemap", action="store_true", help="Use sitemap.xml for URL discovery")
+ parser.add_argument("--max-subsitemaps", type=int, default=5, help="Maximum number of sub-sitemaps to process (default: 5)")
+ parser.add_argument("--sitemap-timeout", type=int, default=30, help="Timeout in seconds for sitemap processing (default: 30)")
+
+ args = parser.parse_args()
+
+ # Configure logging
+ configure_logging(args.verbose)
+
+ # Set up callbacks
+ callback = None
+
+ if args.print_pages and args.output_dir:
+ # Both console printing and JSON output
+ all_links = set()
+ json_cb = json_file_writer(args.output_dir)
+ link_cb = link_collector(all_links)
+
+ def combined_callback(url, data):
+ console_printer(url, data)
+ json_cb(url, data)
+ link_cb(url, data)
+
+ callback = combined_callback
+ elif args.print_pages:
+ # Just console printing
+ callback = console_printer
+ elif args.output_dir:
+ # Just JSON output
+ callback = json_file_writer(args.output_dir)
+
+ # Create crawler instance
+ crawler = Crawler(
+ max_depth=args.depth,
+ allow_external_domains=args.allow_external,
+ allow_subdomains=not args.no_subdomains,
+ concurrency_limit=args.concurrency,
+ use_cache=not args.no_cache,
+ cache_dir=args.cache_dir,
+ request_delay=args.delay,
+ on_page_crawled=callback,
+ respect_robots_txt=not args.ignore_robots,
+ use_sitemap=args.use_sitemap,
+ max_subsitemaps=args.max_subsitemaps,
+ sitemap_timeout=args.sitemap_timeout
+ )
+
+ try:
+ # Start crawling
+ print(f"Starting crawl from {args.url} with max depth {args.depth}")
+ stats = crawler.crawl(args.url)
+
+ # Print stats
+ print_stats(stats)
+
+ return 0
+ except KeyboardInterrupt:
+ print("\nCrawling interrupted by user.")
+ return 130
+ except Exception as e:
+ logging.error(f"Error during crawling: {str(e)}")
+ return 1
+ finally:
+ crawler.close()
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/nginx/nginx.conf b/nginx/nginx.conf
new file mode 100644
index 0000000..8baf555
--- /dev/null
+++ b/nginx/nginx.conf
@@ -0,0 +1,27 @@
+server {
+ listen 80;
+ server_name localhost;
+
+ # Enable directory listing for testing purposes
+ autoindex on;
+
+ # Custom header for testing
+ add_header X-Test-Server "Example Site";
+
+ location / {
+ root /usr/share/nginx/html;
+ index index.html index.htm;
+ try_files $uri $uri/ =404;
+ }
+
+ # Add robots.txt and sitemap.xml handling
+ location = /robots.txt {
+ root /usr/share/nginx/html;
+ try_files $uri =404;
+ }
+
+ location = /sitemap.xml {
+ root /usr/share/nginx/html;
+ try_files $uri =404;
+ }
+}
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 740db03..370412f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -42,6 +42,7 @@ jupyter_server_terminals==0.5.3
jupyterlab==4.3.6
jupyterlab_pygments==0.3.0
jupyterlab_server==2.27.3
+lxml==5.1.0
MarkupSafe==3.0.2
matplotlib-inline==0.1.7
mistune==3.1.2
@@ -70,6 +71,7 @@ PyYAML==6.0.2
pyzmq==26.3.0
referencing==0.36.2
requests==2.32.3
+robotexclusionrulesparser==1.7.1
rfc3339-validator==0.1.4
rfc3986-validator==0.1.1
rpds-py==0.23.1
diff --git a/scraper/cache_manager.py b/scraper/cache_manager.py
index 1e8976f..c6cef04 100644
--- a/scraper/cache_manager.py
+++ b/scraper/cache_manager.py
@@ -213,7 +213,7 @@ def clear_expired(self) -> int:
Returns:
Number of entries cleared
"""
- cleared_count = 0
+ cleared_urls = set() # Track unique URLs cleared
current_time = time.time()
# Clear expired entries from memory cache
@@ -221,7 +221,7 @@ def clear_expired(self) -> int:
if current_time - entry['timestamp'] >= self.expiry_time]
for url in expired_urls:
del self.memory_cache[url]
- cleared_count += 1
+ cleared_urls.add(url)
# Clear expired entries from persistent cache if enabled
if self.use_persistent and self.conn:
@@ -229,26 +229,26 @@ def clear_expired(self) -> int:
cursor = self.conn.cursor()
expire_time = int(current_time - self.expiry_time)
- # First, get the count of entries to be deleted
+ # Get URLs of entries to be deleted
cursor.execute(
- "SELECT COUNT(*) FROM cache WHERE timestamp < ?",
+ "SELECT url FROM cache WHERE timestamp < ?",
(expire_time,)
)
- db_cleared_count = cursor.fetchone()[0]
+ db_expired_urls = {row[0] for row in cursor.fetchall()}
- # Then perform the delete
+ # Perform the delete
cursor.execute(
"DELETE FROM cache WHERE timestamp < ?",
(expire_time,)
)
- cleared_count = cleared_count + db_cleared_count
+ cleared_urls.update(db_expired_urls)
self.conn.commit()
- self.logger.info(f"Cleared {cleared_count} expired cache entries")
+ self.logger.info(f"Cleared {len(cleared_urls)} expired cache entries")
except Exception as e:
self.logger.error(f"Error clearing expired cache entries: {str(e)}")
- return cleared_count
+ return len(cleared_urls)
def close(self) -> None:
"""Close the cache and release resources."""
diff --git a/scraper/callbacks.py b/scraper/callbacks.py
new file mode 100644
index 0000000..d96888c
--- /dev/null
+++ b/scraper/callbacks.py
@@ -0,0 +1,86 @@
+"""
+Callback functions that can be used with the Crawler.
+
+This module provides example callback functions that can be passed to
+the Crawler's on_page_crawled parameter to customize crawling behavior.
+"""
+
+import json
+import os
+from typing import Dict, Any
+
+
+def console_printer(url: str, page_data: Dict[str, Any]) -> None:
+ """
+ Print page information to the console.
+
+ Args:
+ url: The URL that was crawled
+ page_data: Data about the crawled page
+ """
+ print(f"\n--- Page Crawled: {url} ---")
+ print(f"Title: {page_data.get('title', 'No title')}")
+ print(f"Status: {page_data.get('status_code', 0)}")
+ print(f"Depth: {page_data.get('depth', 0)}")
+ print(f"Links found: {len(page_data.get('links', []))}")
+ print("-" * 50)
+
+
+def json_file_writer(output_dir: str) -> callable:
+ """
+ Create a callback function that writes page data to JSON files.
+
+ Args:
+ output_dir: Directory where JSON files will be saved
+
+ Returns:
+ Callback function that can be passed to Crawler
+ """
+ # Create the output directory if it doesn't exist
+ os.makedirs(output_dir, exist_ok=True)
+
+ def callback(url: str, page_data: Dict[str, Any]) -> None:
+ """
+ Write page data to a JSON file.
+
+ Args:
+ url: The URL that was crawled
+ page_data: Data about the crawled page
+ """
+ # Create a safe filename from URL
+ safe_filename = url.replace("://", "_").replace("/", "_").replace(".", "_")
+ if len(safe_filename) > 100:
+ safe_filename = safe_filename[:100] # Truncate long filenames
+
+ # Create full path
+ file_path = os.path.join(output_dir, f"{safe_filename}.json")
+
+ # Write data to file
+ with open(file_path, 'w') as f:
+ json.dump(page_data, f, indent=2)
+
+ return callback
+
+
+def link_collector(collected_links: set) -> callable:
+ """
+ Create a callback function that collects links into a provided set.
+
+ Args:
+ collected_links: Set where links will be stored
+
+ Returns:
+ Callback function that can be passed to Crawler
+ """
+ def callback(url: str, page_data: Dict[str, Any]) -> None:
+ """
+ Add links from the page to the collected_links set.
+
+ Args:
+ url: The URL that was crawled
+ page_data: Data about the crawled page
+ """
+ links = page_data.get('links', [])
+ collected_links.update(links)
+
+ return callback
\ No newline at end of file
diff --git a/scraper/crawler.py b/scraper/crawler.py
new file mode 100644
index 0000000..fd23c32
--- /dev/null
+++ b/scraper/crawler.py
@@ -0,0 +1,313 @@
+import asyncio
+import logging
+from typing import Set, Dict, Any, Optional, Callable, List
+from urllib.parse import urlparse
+import time
+
+from scraper.cache_manager import Cache
+from scraper.request_handler import RequestHandler
+from scraper.response_parser import ResponseParser
+from scraper.robots_parser import RobotsParser
+from scraper.sitemap_parser import SitemapParser
+
+
+class Crawler:
+ """
+ Main component that orchestrates the web crawling process.
+
+ This class coordinates the RequestHandler, ResponseParser, and Cache
+ to recursively crawl web pages, extract links, and store results.
+ """
+
+ def __init__(
+ self,
+ max_depth: int = 3,
+ allow_external_domains: bool = False,
+ allow_subdomains: bool = True,
+ concurrency_limit: int = 10,
+ use_cache: bool = True,
+ cache_dir: Optional[str] = None,
+ request_delay: float = 0.1,
+ user_agent: str = "ScraperBot (https://github.com/johnburbridge/scraper)",
+ on_page_crawled: Optional[Callable[[str, dict], None]] = None,
+ respect_robots_txt: bool = True,
+ use_sitemap: bool = False,
+ max_subsitemaps: int = 5,
+ sitemap_timeout: int = 30
+ ):
+ """
+ Initialize the Crawler with configurable parameters.
+
+ Args:
+ max_depth: Maximum recursion depth for crawling (default: 3)
+ allow_external_domains: Whether to follow links to other domains (default: False)
+ allow_subdomains: Whether to follow links to subdomains (default: True)
+ concurrency_limit: Maximum number of concurrent requests (default: 10)
+ use_cache: Whether to use caching (default: True)
+ cache_dir: Directory for the cache database (if None, uses default)
+ request_delay: Delay between requests in seconds (default: 0.1)
+ user_agent: User-agent string to identify the crawler
+ on_page_crawled: Optional callback function called when a page is crawled
+ respect_robots_txt: Whether to respect robots.txt rules (default: True)
+ use_sitemap: Whether to use sitemap.xml for URL discovery (default: False)
+ max_subsitemaps: Maximum number of sub-sitemaps to process (default: 5)
+ sitemap_timeout: Timeout in seconds for sitemap processing (default: 30)
+ """
+ self.max_depth = max_depth
+ self.allow_external_domains = allow_external_domains
+ self.allow_subdomains = allow_subdomains
+ self.concurrency_limit = concurrency_limit
+ self.request_delay = request_delay
+ self.user_agent = user_agent
+ self.on_page_crawled = on_page_crawled
+ self.respect_robots_txt = respect_robots_txt
+ self.use_sitemap = use_sitemap
+ self.max_subsitemaps = max_subsitemaps
+ self.sitemap_timeout = sitemap_timeout
+
+ self.logger = logging.getLogger(__name__)
+ self.cache = Cache(use_persistent=use_cache, cache_dir=cache_dir)
+ self.request_handler = RequestHandler(user_agent=user_agent)
+
+ # Initialize robots.txt parser if needed
+ self.robots_parser = RobotsParser(user_agent) if respect_robots_txt else None
+
+ # Initialize sitemap parser if needed
+ self.sitemap_parser = SitemapParser(
+ user_agent,
+ max_subsitemaps=max_subsitemaps,
+ overall_timeout=sitemap_timeout
+ ) if use_sitemap else None
+
+ # Stats tracking
+ self.stats = {
+ "pages_crawled": 0,
+ "pages_skipped": 0,
+ "start_time": 0,
+ "end_time": 0
+ }
+
+ # Sets to track URLs
+ self.visited_urls: Set[str] = set()
+ self.queue: Set[str] = set()
+
+ # Semaphore for controlling concurrency
+ self.semaphore = asyncio.Semaphore(concurrency_limit)
+
+ def _is_allowed_domain(self, url: str, base_domain: str) -> bool:
+ """
+ Check if a URL's domain is allowed based on configuration.
+
+ Args:
+ url: The URL to check
+ base_domain: The base domain of the initial URL
+
+ Returns:
+ True if the domain is allowed, False otherwise
+ """
+ parsed_url = urlparse(url)
+ url_domain = parsed_url.netloc.lower()
+
+ # Always allow the exact same domain
+ if url_domain == base_domain:
+ return True
+
+ # Check for subdomains if allowed
+ if self.allow_subdomains and url_domain.endswith(f".{base_domain}"):
+ return True
+
+ # Check for external domains if allowed
+ if self.allow_external_domains:
+ return True
+
+ return False
+
+ async def _crawl_url(self, url: str, depth: int, base_domain: str) -> Set[str]:
+ """
+ Crawl a single URL and extract links.
+
+ Args:
+ url: The URL to crawl
+ depth: Current recursion depth
+ base_domain: The base domain of the initial URL
+
+ Returns:
+ Set of discovered URLs
+ """
+ # Skip if already visited
+ if url in self.visited_urls:
+ return set()
+
+ self.visited_urls.add(url)
+
+ # Check robots.txt rules if enabled
+ if self.respect_robots_txt and self.robots_parser:
+ if not self.robots_parser.is_allowed(url):
+ self.logger.info(f"Skipping {url} (disallowed by robots.txt)")
+ return set()
+
+ # Adjust request delay based on crawl-delay directive
+ robots_delay = self.robots_parser.get_crawl_delay(url)
+ delay = max(self.request_delay, robots_delay)
+ else:
+ delay = self.request_delay
+
+ # Check cache first
+ cached_response = self.cache.get(url)
+
+ if cached_response:
+ content, status_code, headers = cached_response
+ self.logger.info(f"Using cached response for {url}")
+ self.stats["pages_skipped"] += 1
+ else:
+ # Respect request delay
+ await asyncio.sleep(delay)
+
+ # Make request
+ async with self.semaphore:
+ content, status_code, headers = self.request_handler.get(url)
+
+ if content and status_code == 200:
+ # Cache successful response
+ self.cache.set(url, content, status_code, headers)
+ else:
+ self.logger.warning(f"Failed to fetch {url}, status: {status_code}")
+ return set()
+
+ # Update stats
+ self.stats["pages_crawled"] += 1
+
+ # Parse response
+ parser = ResponseParser(base_url=url)
+ extracted_links = parser.extract_links(content)
+
+ # Get metadata
+ title = parser.extract_page_title(content)
+ metadata = parser.extract_metadata(content)
+
+ # Create result object
+ page_data = {
+ "url": url,
+ "status_code": status_code,
+ "title": title,
+ "depth": depth,
+ "metadata": metadata,
+ "links": list(extracted_links)
+ }
+
+ # Call the callback if provided
+ if self.on_page_crawled:
+ self.on_page_crawled(url, page_data)
+
+ # Filter links by domain
+ allowed_links = {
+ link for link in extracted_links
+ if self._is_allowed_domain(link, base_domain)
+ }
+
+ return allowed_links
+
+ async def _crawl_recursive(self, url: str, depth: int, base_domain: str) -> None:
+ """
+ Recursively crawl URLs up to the maximum depth.
+
+ Args:
+ url: The URL to start crawling from
+ depth: Current recursion depth
+ base_domain: The base domain of the initial URL
+ """
+ if depth > self.max_depth:
+ return
+
+ discovered_links = await self._crawl_url(url, depth, base_domain)
+
+ # Filter out already visited or queued links
+ new_links = discovered_links - self.visited_urls - self.queue
+ self.queue.update(new_links)
+
+ # Create tasks for each new link
+ tasks = []
+ for link in new_links:
+ task = asyncio.create_task(self._crawl_recursive(link, depth + 1, base_domain))
+ tasks.append(task)
+
+ if tasks:
+ await asyncio.gather(*tasks)
+
+ async def crawl_async(self, start_url: str) -> Dict[str, Any]:
+ """
+ Start an asynchronous crawl from the given URL.
+
+ Args:
+ start_url: The URL to start crawling from
+
+ Returns:
+ Dictionary with crawling statistics
+ """
+ self.logger.info(f"Starting crawl from {start_url}")
+
+ # Reset state
+ self.visited_urls.clear()
+ self.queue.clear()
+ self.stats["pages_crawled"] = 0
+ self.stats["pages_skipped"] = 0
+ self.stats["start_time"] = time.time()
+
+ # Parse base domain from start URL
+ parsed_start_url = urlparse(start_url)
+ base_domain = parsed_start_url.netloc.lower()
+
+ # Use sitemap for URL discovery if enabled
+ initial_urls = set([start_url])
+ sitemap_urls = set()
+
+ if self.use_sitemap and self.sitemap_parser:
+ self.logger.info(f"Fetching sitemap for {start_url}")
+ sitemap_urls = self.sitemap_parser.get_urls_from_domain(start_url)
+
+ # Filter URLs by domain restrictions
+ filtered_sitemap_urls = {
+ url for url in sitemap_urls
+ if self._is_allowed_domain(url, base_domain)
+ }
+
+ if filtered_sitemap_urls:
+ self.logger.info(f"Found {len(filtered_sitemap_urls)} URLs from sitemap")
+ initial_urls.update(filtered_sitemap_urls)
+ self.stats["sitemap_urls_found"] = len(sitemap_urls)
+ self.stats["sitemap_urls_used"] = len(filtered_sitemap_urls)
+
+ # Start crawling from all initial URLs
+ tasks = []
+ for url in initial_urls:
+ task = asyncio.create_task(self._crawl_recursive(url, 1, base_domain))
+ tasks.append(task)
+
+ if tasks:
+ await asyncio.gather(*tasks)
+
+ # Update stats
+ self.stats["end_time"] = time.time()
+ self.stats["duration"] = self.stats["end_time"] - self.stats["start_time"]
+ self.stats["total_urls"] = len(self.visited_urls)
+
+ self.logger.info(f"Crawl completed. Visited {self.stats['total_urls']} URLs in {self.stats['duration']:.2f} seconds")
+
+ return self.stats
+
+ def crawl(self, start_url: str) -> Dict[str, Any]:
+ """
+ Start a synchronous crawl from the given URL.
+
+ Args:
+ start_url: The URL to start crawling from
+
+ Returns:
+ Dictionary with crawling statistics
+ """
+ return asyncio.run(self.crawl_async(start_url))
+
+ def close(self) -> None:
+ """Clean up resources used by the crawler."""
+ self.request_handler.close()
+ self.cache.close()
\ No newline at end of file
diff --git a/scraper/robots_parser.py b/scraper/robots_parser.py
new file mode 100644
index 0000000..bd70f03
--- /dev/null
+++ b/scraper/robots_parser.py
@@ -0,0 +1,134 @@
+import logging
+from urllib.parse import urlparse
+import requests
+from robotexclusionrulesparser import RobotExclusionRulesParser
+
+
+class RobotsParser:
+ """
+ Parser for robots.txt files to check if a URL can be crawled.
+
+ This class fetches and parses robots.txt files for domains, and provides
+ methods to check if a given URL is allowed to be crawled based on the
+ rules defined in the robots.txt file.
+ """
+
+ def __init__(self, user_agent: str):
+ """
+ Initialize the RobotsParser.
+
+ Args:
+ user_agent: The user agent string to use for fetching robots.txt
+ and for checking permissions
+ """
+ self.user_agent = user_agent
+ self.logger = logging.getLogger(__name__)
+ self.parsers = {} # Cache of parsed robots.txt files keyed by domain
+ self.fetched_domains = set() # Set of domains for which robots.txt has been fetched
+ self.default_crawl_delay = 0 # Default crawl delay (seconds)
+
+ def get_robots_url(self, url: str) -> str:
+ """
+ Get the URL of the robots.txt file for a given URL.
+
+ Args:
+ url: The URL to get the robots.txt URL for
+
+ Returns:
+ URL to the robots.txt file
+ """
+ parsed_url = urlparse(url)
+ return f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
+
+ def fetch_robots_txt(self, domain_url: str) -> bool:
+ """
+ Fetch and parse the robots.txt file for a domain.
+
+ Args:
+ domain_url: URL of the website (not the robots.txt file)
+
+ Returns:
+ True if robots.txt was successfully fetched and parsed, False otherwise
+ """
+ parsed_url = urlparse(domain_url)
+ domain = parsed_url.netloc
+
+ # Skip if already fetched
+ if domain in self.fetched_domains:
+ return domain in self.parsers
+
+ self.fetched_domains.add(domain)
+ robots_url = self.get_robots_url(domain_url)
+
+ try:
+ response = requests.get(robots_url, timeout=10)
+
+ if response.status_code == 200:
+ parser = RobotExclusionRulesParser()
+ parser.parse(response.text)
+ self.parsers[domain] = parser
+ self.logger.info(f"Successfully parsed robots.txt for {domain}")
+ return True
+ elif response.status_code == 404:
+ # No robots.txt file, assume everything is allowed
+ self.logger.info(f"No robots.txt found for {domain} (404)")
+ parser = RobotExclusionRulesParser()
+ parser.parse("") # Empty robots.txt means everything is allowed
+ self.parsers[domain] = parser
+ return True
+ else:
+ self.logger.warning(f"Failed to fetch robots.txt for {domain}: HTTP {response.status_code}")
+ return False
+ except Exception as e:
+ self.logger.error(f"Error fetching robots.txt for {domain}: {str(e)}")
+ return False
+
+ def is_allowed(self, url: str) -> bool:
+ """
+ Check if a URL is allowed to be crawled.
+
+ Args:
+ url: The URL to check
+
+ Returns:
+ True if the URL is allowed to be crawled, False otherwise
+ """
+ parsed_url = urlparse(url)
+ domain = parsed_url.netloc
+
+ # Fetch robots.txt if not already fetched
+ if domain not in self.parsers and not self.fetch_robots_txt(url):
+ # If fetch fails, assume allowed (permissive default)
+ self.logger.warning(f"Assuming URL is allowed due to robots.txt fetch failure: {url}")
+ return True
+
+ # Get the parser for this domain
+ if domain in self.parsers:
+ return self.parsers[domain].is_allowed(self.user_agent, url)
+
+ # Default permissive case
+ return True
+
+ def get_crawl_delay(self, url: str) -> float:
+ """
+ Get the crawl delay specified in robots.txt.
+
+ Args:
+ url: The URL to check
+
+ Returns:
+ Crawl delay in seconds, or the default if not specified
+ """
+ parsed_url = urlparse(url)
+ domain = parsed_url.netloc
+
+ # Fetch robots.txt if not already fetched
+ if domain not in self.parsers and not self.fetch_robots_txt(url):
+ return self.default_crawl_delay
+
+ # Get the parser for this domain
+ if domain in self.parsers:
+ delay = self.parsers[domain].get_crawl_delay(self.user_agent)
+ return delay if delay is not None else self.default_crawl_delay
+
+ return self.default_crawl_delay
\ No newline at end of file
diff --git a/scraper/sitemap_parser.py b/scraper/sitemap_parser.py
new file mode 100644
index 0000000..85b4af8
--- /dev/null
+++ b/scraper/sitemap_parser.py
@@ -0,0 +1,308 @@
+import logging
+import asyncio
+import time
+from typing import List, Dict, Any, Optional, Set
+from urllib.parse import urlparse, urljoin
+import requests
+from bs4 import BeautifulSoup
+
+
+class SitemapParser:
+ """
+ Parser for XML sitemaps to extract URLs for crawling.
+
+ This class fetches and parses XML sitemaps, including sitemap indexes,
+ and provides methods to extract URLs and their metadata for crawling.
+ """
+
+ def __init__(self, user_agent: str, max_subsitemaps: int = 5, overall_timeout: int = 30):
+ """
+ Initialize the SitemapParser.
+
+ Args:
+ user_agent: The user agent string to use for fetching sitemaps
+ max_subsitemaps: Maximum number of sub-sitemaps to process from an index (default: 5)
+ overall_timeout: Maximum time in seconds for the entire sitemap processing (default: 30)
+ """
+ self.user_agent = user_agent
+ self.logger = logging.getLogger(__name__)
+ self.headers = {'User-Agent': user_agent}
+ self.max_subsitemaps = max_subsitemaps
+ self.overall_timeout = overall_timeout
+
+ def get_sitemap_url(self, url: str) -> str:
+ """
+ Get the URL of the sitemap.xml file for a given URL.
+
+ Args:
+ url: The URL to get the sitemap URL for
+
+ Returns:
+ URL to the sitemap.xml file
+ """
+ parsed_url = urlparse(url)
+ return f"{parsed_url.scheme}://{parsed_url.netloc}/sitemap.xml"
+
+ def fetch_sitemap(self, sitemap_url: str) -> Optional[str]:
+ """
+ Fetch a sitemap from the given URL.
+
+ Args:
+ sitemap_url: URL of the sitemap
+
+ Returns:
+ The content of the sitemap, or None if it couldn't be fetched
+ """
+ try:
+ response = requests.get(sitemap_url, headers=self.headers, timeout=10)
+
+ if response.status_code == 200:
+ self.logger.info(f"Successfully fetched sitemap from {sitemap_url}")
+ return response.text
+ else:
+ self.logger.warning(f"Failed to fetch sitemap from {sitemap_url}: HTTP {response.status_code}")
+ return None
+ except Exception as e:
+ self.logger.error(f"Error fetching sitemap from {sitemap_url}: {str(e)}")
+ return None
+
+ async def fetch_sitemap_async(self, sitemap_url: str) -> Optional[str]:
+ """
+ Fetch a sitemap asynchronously from the given URL.
+
+ Args:
+ sitemap_url: URL of the sitemap
+
+ Returns:
+ The content of the sitemap, or None if it couldn't be fetched
+ """
+ try:
+ # Use synchronous requests library with a separate thread
+ # to avoid adding aiohttp as a dependency
+ loop = asyncio.get_event_loop()
+ content = await loop.run_in_executor(
+ None, lambda: self.fetch_sitemap(sitemap_url)
+ )
+ return content
+ except Exception as e:
+ self.logger.error(f"Error fetching sitemap asynchronously from {sitemap_url}: {str(e)}")
+ return None
+
+ def is_sitemap_index(self, content: str) -> bool:
+ """
+ Check if the given content is a sitemap index.
+
+ Args:
+ content: The content of the sitemap
+
+ Returns:
+ True if the content is a sitemap index, False otherwise
+ """
+ try:
+ soup = BeautifulSoup(content, 'lxml-xml')
+ return soup.find('sitemapindex') is not None
+ except Exception as e:
+ self.logger.error(f"Error checking if content is sitemap index: {str(e)}")
+ return False
+
+ def parse_sitemap_index(self, content: str, base_url: str) -> List[str]:
+ """
+ Parse a sitemap index and return the URLs of the sitemaps it contains.
+
+ Args:
+ content: The content of the sitemap index
+ base_url: The base URL to resolve relative URLs
+
+ Returns:
+ List of sitemap URLs
+ """
+ try:
+ soup = BeautifulSoup(content, 'lxml-xml')
+ sitemap_tags = soup.find_all('sitemap')
+ sitemap_urls = []
+
+ for sitemap in sitemap_tags:
+ loc = sitemap.find('loc')
+ if loc and loc.text:
+ # Make sure the URL is absolute
+ url = urljoin(base_url, loc.text.strip())
+ sitemap_urls.append(url)
+
+ self.logger.info(f"Found {len(sitemap_urls)} sitemaps in sitemap index")
+ # Limit the number of sub-sitemaps to process
+ limited_urls = sitemap_urls[:self.max_subsitemaps]
+ if len(sitemap_urls) > self.max_subsitemaps:
+ self.logger.info(f"Limiting to {self.max_subsitemaps} sub-sitemaps out of {len(sitemap_urls)}")
+
+ return limited_urls
+ except Exception as e:
+ self.logger.error(f"Error parsing sitemap index: {str(e)}")
+ return []
+
+ def parse_sitemap(self, content: str, base_url: str) -> List[Dict[str, Any]]:
+ """
+ Parse a sitemap and return the URLs it contains with metadata.
+
+ Args:
+ content: The content of the sitemap
+ base_url: The base URL to resolve relative URLs
+
+ Returns:
+ List of dictionaries containing URL and metadata
+ """
+ try:
+ soup = BeautifulSoup(content, 'lxml-xml')
+ url_tags = soup.find_all('url')
+ urls = []
+
+ for url in url_tags:
+ loc = url.find('loc')
+ if loc and loc.text:
+ # Make sure the URL is absolute
+ url_str = urljoin(base_url, loc.text.strip())
+
+ # Extract metadata
+ lastmod = url.find('lastmod')
+ changefreq = url.find('changefreq')
+ priority = url.find('priority')
+
+ url_data = {
+ 'url': url_str,
+ 'lastmod': lastmod.text.strip() if lastmod else None,
+ 'changefreq': changefreq.text.strip() if changefreq else None,
+ 'priority': float(priority.text.strip()) if priority else None
+ }
+
+ urls.append(url_data)
+
+ self.logger.info(f"Found {len(urls)} URLs in sitemap")
+ return urls
+ except Exception as e:
+ self.logger.error(f"Error parsing sitemap: {str(e)}")
+ return []
+
+ async def process_sitemap(self, sitemap_url: str, base_url: str) -> Set[str]:
+ """
+ Process a single sitemap and extract URLs.
+
+ Args:
+ sitemap_url: URL of the sitemap
+ base_url: Base URL for resolving relative URLs
+
+ Returns:
+ Set of URLs found in the sitemap
+ """
+ urls = set()
+ content = await self.fetch_sitemap_async(sitemap_url)
+ if content:
+ url_data_list = self.parse_sitemap(content, base_url)
+ for url_data in url_data_list:
+ urls.add(url_data['url'])
+ return urls
+
+ async def extract_urls_from_sitemap_async(self, sitemap_url: str) -> Set[str]:
+ """
+ Extract all URLs from a sitemap or sitemap index asynchronously.
+
+ Args:
+ sitemap_url: The URL of the sitemap or sitemap index
+
+ Returns:
+ Set of URLs found in the sitemap(s)
+ """
+ start_time = time.time()
+ urls = set()
+ base_url = f"{urlparse(sitemap_url).scheme}://{urlparse(sitemap_url).netloc}"
+
+ # Fetch the initial sitemap
+ content = await self.fetch_sitemap_async(sitemap_url)
+ if not content:
+ return urls
+
+ # If we've exceeded the timeout, return what we have
+ if time.time() - start_time > self.overall_timeout:
+ self.logger.warning(f"Timeout exceeded while processing sitemap: {sitemap_url}")
+ return urls
+
+ # Check if it's a sitemap index
+ if self.is_sitemap_index(content):
+ # Parse the sitemap index to get the URLs of the sitemaps
+ sitemap_urls = self.parse_sitemap_index(content, base_url)
+
+ # Process each sitemap concurrently
+ tasks = []
+ for url in sitemap_urls:
+ # Check timeout before starting a new task
+ if time.time() - start_time > self.overall_timeout:
+ self.logger.warning(f"Timeout exceeded while processing sub-sitemaps")
+ break
+ tasks.append(self.process_sitemap(url, base_url))
+
+ if tasks:
+ # Wait for all tasks to complete or timeout
+ try:
+ results = await asyncio.gather(*tasks)
+ for result in results:
+ urls.update(result)
+ except asyncio.TimeoutError:
+ self.logger.warning("Timeout while processing sub-sitemaps")
+ else:
+ # It's a regular sitemap, parse it directly
+ url_data_list = self.parse_sitemap(content, base_url)
+ for url_data in url_data_list:
+ urls.add(url_data['url'])
+
+ self.logger.info(f"Extracted {len(urls)} URLs from sitemap(s) in {time.time() - start_time:.2f} seconds")
+ return urls
+
+ def extract_urls_from_sitemap(self, sitemap_url: str) -> Set[str]:
+ """
+ Extract all URLs from a sitemap or sitemap index.
+
+ Args:
+ sitemap_url: The URL of the sitemap or sitemap index
+
+ Returns:
+ Set of URLs found in the sitemap(s)
+ """
+ try:
+ # Run the async method in an event loop with a timeout
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+
+ task = self.extract_urls_from_sitemap_async(sitemap_url)
+ future = asyncio.ensure_future(task, loop=loop)
+
+ # Add overall timeout
+ try:
+ urls = loop.run_until_complete(
+ asyncio.wait_for(future, timeout=self.overall_timeout)
+ )
+ except asyncio.TimeoutError:
+ self.logger.warning(f"Global timeout reached while processing sitemap: {sitemap_url}")
+ # Return any URLs we collected before timeout
+ urls = set()
+ if future.done():
+ urls = future.result()
+ else:
+ future.cancel()
+ finally:
+ loop.close()
+
+ return urls
+ except Exception as e:
+ self.logger.error(f"Error extracting URLs from sitemap: {str(e)}")
+ return set()
+
+ def get_urls_from_domain(self, domain_url: str) -> Set[str]:
+ """
+ Get all URLs from a domain's sitemap.
+
+ Args:
+ domain_url: The URL of the domain (not the sitemap)
+
+ Returns:
+ Set of URLs found in the domain's sitemap(s)
+ """
+ sitemap_url = self.get_sitemap_url(domain_url)
+ return self.extract_urls_from_sitemap(sitemap_url)
\ No newline at end of file
diff --git a/tests/test_cache.py b/tests/test_cache.py
index a7076db..074a0b8 100644
--- a/tests/test_cache.py
+++ b/tests/test_cache.py
@@ -106,40 +106,60 @@ def test_clear_expired(self):
separate_temp_dir = tempfile.mkdtemp(prefix="isolated_cache_test_")
try:
- # Create an isolated cache with its own directory
- isolated_cache = Cache(use_persistent=True, cache_dir=separate_temp_dir, expiry_time=1)
+ # Create an isolated cache with a 10 second expiry time
+ isolated_cache = Cache(use_persistent=True, cache_dir=separate_temp_dir, expiry_time=10)
# Ensure we start with a clean slate
isolated_cache.clear()
- # Verify we're starting with a clean state by trying to get a known URL
- self.assertIsNone(isolated_cache.get("https://will-expire.com"))
- self.assertIsNone(isolated_cache.get("https://wont-expire.com"))
+ # Get current time
+ current_time = time.time()
- # Add a single entry that will expire
+ # Add an expired entry (20 seconds old)
isolated_cache.set("https://will-expire.com", "old content", 200, {})
-
- # Verify it was added
- self.assertTrue(isolated_cache.has("https://will-expire.com"))
-
- # Wait for it to expire
- time.sleep(1.5)
+ # Manually update the timestamp to make it expired
+ isolated_cache.memory_cache["https://will-expire.com"]["timestamp"] = current_time - 20
+ if isolated_cache.use_persistent and isolated_cache.conn:
+ cursor = isolated_cache.conn.cursor()
+ cursor.execute(
+ "UPDATE cache SET timestamp = ? WHERE url = ?",
+ (int(current_time - 20), "https://will-expire.com")
+ )
+ isolated_cache.conn.commit()
# Add a fresh entry
isolated_cache.set("https://wont-expire.com", "new content", 200, {})
+ # Verify entries exist in memory cache
+ self.assertIn("https://will-expire.com", isolated_cache.memory_cache)
+ self.assertIn("https://wont-expire.com", isolated_cache.memory_cache)
+
+ # Verify entries exist in persistent cache
+ if isolated_cache.use_persistent and isolated_cache.conn:
+ cursor = isolated_cache.conn.cursor()
+ cursor.execute("SELECT url FROM cache WHERE url = ?", ("https://will-expire.com",))
+ self.assertIsNotNone(cursor.fetchone())
+ cursor.execute("SELECT url FROM cache WHERE url = ?", ("https://wont-expire.com",))
+ self.assertIsNotNone(cursor.fetchone())
+
# Clear expired entries and check count
cleared = isolated_cache.clear_expired()
# Should only clear the expired entry
self.assertEqual(cleared, 1, f"Expected to clear 1 expired entry, but cleared {cleared}")
- self.assertFalse(isolated_cache.has("https://will-expire.com"))
- self.assertTrue(isolated_cache.has("https://wont-expire.com"))
+ # Verify the expired entry is gone and the fresh one remains
+ self.assertNotIn("https://will-expire.com", isolated_cache.memory_cache)
+ self.assertIn("https://wont-expire.com", isolated_cache.memory_cache)
+
+ # Verify persistent cache state
+ if isolated_cache.use_persistent and isolated_cache.conn:
+ cursor = isolated_cache.conn.cursor()
+ cursor.execute("SELECT url FROM cache WHERE url = ?", ("https://will-expire.com",))
+ self.assertIsNone(cursor.fetchone())
+ cursor.execute("SELECT url FROM cache WHERE url = ?", ("https://wont-expire.com",))
+ self.assertIsNotNone(cursor.fetchone())
finally:
- # Ensure we clean up properly
- if 'isolated_cache' in locals():
- isolated_cache.close()
# Clean up the temporary directory
shutil.rmtree(separate_temp_dir)
diff --git a/tests/test_crawler.py b/tests/test_crawler.py
new file mode 100644
index 0000000..6f0eb70
--- /dev/null
+++ b/tests/test_crawler.py
@@ -0,0 +1,304 @@
+import unittest
+from unittest.mock import Mock, patch, MagicMock, AsyncMock
+import asyncio
+from urllib.parse import urlparse
+
+from scraper.crawler import Crawler
+from scraper.request_handler import RequestHandler
+from scraper.response_parser import ResponseParser
+from scraper.cache_manager import Cache
+
+
+def async_run(coro):
+ """Helper function to run coroutines in tests with a fresh event loop."""
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ try:
+ return loop.run_until_complete(coro)
+ finally:
+ loop.close()
+ asyncio.set_event_loop(None)
+
+
+class TestCrawler(unittest.TestCase):
+ """Tests for the Crawler class."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ self.crawler = Crawler(
+ max_depth=2,
+ concurrency_limit=5,
+ use_cache=False,
+ request_delay=0
+ )
+
+ def tearDown(self):
+ """Clean up after tests."""
+ self.crawler.close()
+
+ def test_is_allowed_domain_same_domain(self):
+ """Test that same domain is always allowed."""
+ base_domain = "example.com"
+ url = "https://example.com/page"
+
+ result = self.crawler._is_allowed_domain(url, base_domain)
+
+ self.assertTrue(result)
+
+ def test_is_allowed_domain_subdomain_allowed(self):
+ """Test that subdomains are allowed when configured."""
+ base_domain = "example.com"
+ url = "https://sub.example.com/page"
+ self.crawler.allow_subdomains = True
+
+ result = self.crawler._is_allowed_domain(url, base_domain)
+
+ self.assertTrue(result)
+
+ def test_is_allowed_domain_subdomain_not_allowed(self):
+ """Test that subdomains are not allowed when configured."""
+ base_domain = "example.com"
+ url = "https://sub.example.com/page"
+ self.crawler.allow_subdomains = False
+
+ result = self.crawler._is_allowed_domain(url, base_domain)
+
+ self.assertFalse(result)
+
+ def test_is_allowed_domain_external_allowed(self):
+ """Test that external domains are allowed when configured."""
+ base_domain = "example.com"
+ url = "https://another-site.com/page"
+ self.crawler.allow_external_domains = True
+
+ result = self.crawler._is_allowed_domain(url, base_domain)
+
+ self.assertTrue(result)
+
+ def test_is_allowed_domain_external_not_allowed(self):
+ """Test that external domains are not allowed when configured."""
+ base_domain = "example.com"
+ url = "https://another-site.com/page"
+ self.crawler.allow_external_domains = False
+
+ result = self.crawler._is_allowed_domain(url, base_domain)
+
+ self.assertFalse(result)
+
+ @patch.object(Cache, 'get')
+ @patch.object(Cache, 'set')
+ @patch.object(RequestHandler, 'get')
+ @patch.object(ResponseParser, 'extract_links')
+ @patch.object(ResponseParser, 'extract_page_title')
+ @patch.object(ResponseParser, 'extract_metadata')
+ def test_crawl_url_uncached(self, mock_extract_metadata, mock_extract_title,
+ mock_extract_links, mock_request_get, mock_cache_set,
+ mock_cache_get):
+ """Test crawling a URL that's not in the cache."""
+ url = "https://example.com"
+ depth = 1
+ base_domain = "example.com"
+
+ # Configure mocks
+ mock_cache_get.return_value = None
+ mock_request_get.return_value = ("HTML content", 200, {})
+ mock_extract_links.return_value = {"https://example.com/page1", "https://example.com/page2"}
+ mock_extract_title.return_value = "Example Page"
+ mock_extract_metadata.return_value = {"description": "An example page"}
+
+ callback_mock = Mock()
+ self.crawler.on_page_crawled = callback_mock
+
+ # Call the method under test and await the result
+ result = async_run(self.crawler._crawl_url(url, depth, base_domain))
+
+ # Verify interactions
+ mock_cache_get.assert_called_once_with(url)
+ mock_request_get.assert_called_once_with(url)
+ mock_cache_set.assert_called_once_with(url, "HTML content", 200, {})
+ mock_extract_links.assert_called_once()
+ mock_extract_title.assert_called_once()
+ mock_extract_metadata.assert_called_once()
+
+ # Verify results
+ self.assertEqual(result, {"https://example.com/page1", "https://example.com/page2"})
+ self.assertEqual(self.crawler.stats["pages_crawled"], 1)
+ self.assertEqual(self.crawler.stats["pages_skipped"], 0)
+
+ # Verify callback
+ callback_mock.assert_called_once()
+ args, kwargs = callback_mock.call_args
+ self.assertEqual(args[0], url)
+ self.assertEqual(args[1]["url"], url)
+ self.assertEqual(args[1]["depth"], depth)
+
+ @patch.object(Cache, 'get')
+ @patch.object(Cache, 'set')
+ @patch.object(RequestHandler, 'get')
+ @patch.object(ResponseParser, 'extract_links')
+ @patch.object(ResponseParser, 'extract_page_title')
+ @patch.object(ResponseParser, 'extract_metadata')
+ def test_crawl_url_cached(self, mock_extract_metadata, mock_extract_title,
+ mock_extract_links, mock_request_get, mock_cache_set,
+ mock_cache_get):
+ """Test crawling a URL that's in the cache."""
+ url = "https://example.com"
+ depth = 1
+ base_domain = "example.com"
+
+ # Configure mocks
+ mock_cache_get.return_value = ("Cached HTML content", 200, {})
+ mock_extract_links.return_value = {"https://example.com/page1", "https://example.com/page2"}
+ mock_extract_title.return_value = "Example Page"
+ mock_extract_metadata.return_value = {"description": "An example page"}
+
+ # Call the method under test
+ result = async_run(self.crawler._crawl_url(url, depth, base_domain))
+
+ # Verify interactions
+ mock_cache_get.assert_called_once_with(url)
+ mock_request_get.assert_not_called()
+ mock_cache_set.assert_not_called()
+ mock_extract_links.assert_called_once()
+
+ # Verify results
+ self.assertEqual(result, {"https://example.com/page1", "https://example.com/page2"})
+ self.assertEqual(self.crawler.stats["pages_crawled"], 1)
+ self.assertEqual(self.crawler.stats["pages_skipped"], 1)
+
+ @patch.object(Cache, 'get')
+ @patch.object(RequestHandler, 'get')
+ def test_crawl_url_already_visited(self, mock_request_get, mock_cache_get):
+ """Test that already visited URLs are skipped."""
+ url = "https://example.com"
+ depth = 1
+ base_domain = "example.com"
+
+ # Mark URL as already visited
+ self.crawler.visited_urls.add(url)
+
+ # Call the method under test
+ result = async_run(self.crawler._crawl_url(url, depth, base_domain))
+
+ # Verify interactions
+ mock_cache_get.assert_not_called()
+ mock_request_get.assert_not_called()
+
+ # Verify results
+ self.assertEqual(result, set())
+
+ @patch.object(RequestHandler, 'get')
+ def test_crawl_url_request_failed(self, mock_request_get):
+ """Test handling of failed requests."""
+ url = "https://example.com"
+ depth = 1
+ base_domain = "example.com"
+
+ # Configure mock
+ mock_request_get.return_value = (None, 404, {})
+
+ # Call the method under test
+ result = async_run(self.crawler._crawl_url(url, depth, base_domain))
+
+ # Verify results
+ self.assertEqual(result, set())
+ self.assertEqual(self.crawler.stats["pages_crawled"], 0)
+
+ @patch.object(Crawler, '_crawl_url')
+ def test_crawl_recursive_max_depth(self, mock_crawl_url):
+ """Test that crawling stops at max_depth."""
+ url = "https://example.com"
+ depth = 3 # > max_depth (2)
+ base_domain = "example.com"
+
+ # Call the method under test
+ async_run(self.crawler._crawl_recursive(url, depth, base_domain))
+
+ # Verify that _crawl_url is not called
+ mock_crawl_url.assert_not_called()
+
+ def test_crawl_recursive_no_new_links(self):
+ """Test recursive crawling when no new links are found."""
+ url = "https://example.com"
+ depth = 1
+ base_domain = "example.com"
+
+ # Mock _crawl_url to return empty set
+ with patch.object(self.crawler, '_crawl_url') as mock_crawl_url:
+ mock_crawl_url.return_value = set()
+
+ # Call the method under test
+ async_run(self.crawler._crawl_recursive(url, depth, base_domain))
+
+ # Verify interactions
+ mock_crawl_url.assert_called_once_with(url, depth, base_domain)
+
+ def test_crawl_recursive_with_new_links(self):
+ """Test recursive crawling with new links."""
+ url = "https://example.com"
+ depth = 1
+ base_domain = "example.com"
+
+ # Create a new crawler instance for this test to avoid interference
+ crawler = Crawler(max_depth=2, concurrency_limit=5, use_cache=False, request_delay=0)
+
+ try:
+ # Mock _crawl_url directly on the instance
+ crawler._crawl_url = AsyncMock(return_value={"https://example.com/page1", "https://example.com/page2"})
+
+ # Also mock _crawl_recursive to prevent actual recursion
+ original_recursive = crawler._crawl_recursive
+ recursive_mock = AsyncMock()
+ crawler._crawl_recursive = recursive_mock
+
+ # Run the test
+ async_run(original_recursive(url, depth, base_domain))
+
+ # Verify _crawl_url was called
+ crawler._crawl_url.assert_called_once_with(url, depth, base_domain)
+
+ # Verify recursive calls
+ self.assertEqual(recursive_mock.call_count, 2)
+ recursive_mock.assert_any_call("https://example.com/page1", depth + 1, base_domain)
+ recursive_mock.assert_any_call("https://example.com/page2", depth + 1, base_domain)
+ finally:
+ crawler.close()
+
+ @patch.object(Crawler, '_crawl_recursive')
+ def test_crawl_async(self, mock_crawl_recursive):
+ """Test the asynchronous crawling entry point."""
+ start_url = "https://example.com"
+
+ # Configure mock
+ mock_crawl_recursive.return_value = None
+
+ # Call the method under test
+ result = async_run(self.crawler.crawl_async(start_url))
+
+ # Verify _crawl_recursive was called with correct parameters
+ mock_crawl_recursive.assert_called_once_with(start_url, 1, "example.com")
+
+ # Verify stats in result
+ self.assertIn("pages_crawled", result)
+ self.assertIn("pages_skipped", result)
+ self.assertIn("duration", result)
+ self.assertIn("total_urls", result)
+
+ @patch.object(Crawler, 'crawl_async')
+ def test_crawl(self, mock_crawl_async):
+ """Test the synchronous crawling entry point."""
+ start_url = "https://example.com"
+ expected_result = {"pages_crawled": 5}
+
+ # Configure mock
+ mock_crawl_async.return_value = expected_result
+
+ # Call the method under test
+ result = self.crawler.crawl(start_url)
+
+ # Verify result
+ self.assertEqual(result, expected_result)
+
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file
diff --git a/tests/test_robots_parser.py b/tests/test_robots_parser.py
new file mode 100644
index 0000000..909a69d
--- /dev/null
+++ b/tests/test_robots_parser.py
@@ -0,0 +1,139 @@
+import unittest
+from unittest.mock import patch, MagicMock
+
+from scraper.robots_parser import RobotsParser
+
+
+class TestRobotsParser(unittest.TestCase):
+ """Test cases for the RobotsParser class."""
+
+ def setUp(self):
+ """Set up test environment."""
+ self.user_agent = "TestBot"
+ self.parser = RobotsParser(self.user_agent)
+
+ @patch('requests.get')
+ def test_fetch_robots_txt_success(self, mock_get):
+ """Test successful fetching of robots.txt."""
+ # Mock response
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.text = """
+ User-agent: *
+ Disallow: /private/
+ Allow: /
+
+ User-agent: TestBot
+ Disallow: /test-private/
+ Allow: /
+
+ Crawl-delay: 5
+ """
+ mock_get.return_value = mock_response
+
+ # Call the method
+ result = self.parser.fetch_robots_txt("https://example.com")
+
+ # Verify results
+ self.assertTrue(result)
+ mock_get.assert_called_once_with("https://example.com/robots.txt", timeout=10)
+
+ # Verify the parser was created and domain added to cache
+ self.assertIn("example.com", self.parser.parsers)
+ self.assertIn("example.com", self.parser.fetched_domains)
+
+ @patch('requests.get')
+ def test_fetch_robots_txt_404(self, mock_get):
+ """Test fetching when robots.txt doesn't exist."""
+ # Mock response
+ mock_response = MagicMock()
+ mock_response.status_code = 404
+ mock_get.return_value = mock_response
+
+ # Call the method
+ result = self.parser.fetch_robots_txt("https://example.com")
+
+ # Verify results
+ self.assertTrue(result) # Should still return True for successful operation
+ self.assertIn("example.com", self.parser.parsers)
+
+ # The empty parser should allow everything
+ self.assertTrue(self.parser.is_allowed("https://example.com/anything"))
+
+ @patch('requests.get')
+ def test_fetch_robots_txt_error(self, mock_get):
+ """Test error handling when fetching robots.txt."""
+ # Make the request raise an exception
+ mock_get.side_effect = Exception("Network error")
+
+ # Call the method
+ result = self.parser.fetch_robots_txt("https://example.com")
+
+ # Verify results
+ self.assertFalse(result)
+ self.assertNotIn("example.com", self.parser.parsers)
+ self.assertIn("example.com", self.parser.fetched_domains)
+
+ @patch.object(RobotsParser, 'fetch_robots_txt')
+ def test_is_allowed(self, mock_fetch):
+ """Test checking if a URL is allowed."""
+ # Setup mock parser
+ mock_parser = MagicMock()
+ mock_parser.is_allowed.return_value = False
+ self.parser.parsers["example.com"] = mock_parser
+
+ # Call the method
+ result = self.parser.is_allowed("https://example.com/private")
+
+ # Verify results
+ self.assertFalse(result)
+ mock_fetch.assert_not_called() # Should not fetch since already in parsers
+ mock_parser.is_allowed.assert_called_once_with(self.user_agent, "https://example.com/private")
+
+ @patch.object(RobotsParser, 'fetch_robots_txt')
+ def test_is_allowed_fetch_failure(self, mock_fetch):
+ """Test that URLs are allowed when robots.txt fetch fails."""
+ # Setup mock to return False (fetch failure)
+ mock_fetch.return_value = False
+
+ # Call the method
+ result = self.parser.is_allowed("https://example.com/something")
+
+ # Verify results
+ self.assertTrue(result) # Should allow when fetch fails
+ mock_fetch.assert_called_once_with("https://example.com/something")
+
+ @patch.object(RobotsParser, 'fetch_robots_txt')
+ def test_get_crawl_delay(self, mock_fetch):
+ """Test getting crawl delay from robots.txt."""
+ # Setup mock parser
+ mock_parser = MagicMock()
+ mock_parser.get_crawl_delay.return_value = 3.5
+ self.parser.parsers["example.com"] = mock_parser
+
+ # Call the method
+ delay = self.parser.get_crawl_delay("https://example.com/page")
+
+ # Verify results
+ self.assertEqual(delay, 3.5)
+ mock_fetch.assert_not_called()
+ mock_parser.get_crawl_delay.assert_called_once_with(self.user_agent)
+
+ @patch.object(RobotsParser, 'fetch_robots_txt')
+ def test_get_crawl_delay_not_specified(self, mock_fetch):
+ """Test getting crawl delay when not specified in robots.txt."""
+ # Setup mock parser
+ mock_parser = MagicMock()
+ mock_parser.get_crawl_delay.return_value = None
+ self.parser.parsers["example.com"] = mock_parser
+
+ # Call the method
+ delay = self.parser.get_crawl_delay("https://example.com/page")
+
+ # Verify results
+ self.assertEqual(delay, self.parser.default_crawl_delay)
+ mock_fetch.assert_not_called()
+
+
+if __name__ == "__main__":
+ unittest.main()
\ No newline at end of file
diff --git a/tests/test_sitemap_parser.py b/tests/test_sitemap_parser.py
new file mode 100644
index 0000000..2364989
--- /dev/null
+++ b/tests/test_sitemap_parser.py
@@ -0,0 +1,203 @@
+import unittest
+from unittest.mock import patch, MagicMock
+import asyncio
+
+from scraper.sitemap_parser import SitemapParser
+
+
+class TestSitemapParser(unittest.TestCase):
+ """Test cases for the SitemapParser class."""
+
+ def setUp(self):
+ """Set up test environment."""
+ self.user_agent = "TestBot"
+ self.parser = SitemapParser(self.user_agent, max_subsitemaps=2, overall_timeout=5)
+
+ def test_get_sitemap_url(self):
+ """Test generating sitemap URL from a domain URL."""
+ test_cases = [
+ ("https://example.com", "https://example.com/sitemap.xml"),
+ ("https://example.com/page", "https://example.com/sitemap.xml"),
+ ("http://sub.example.com", "http://sub.example.com/sitemap.xml"),
+ ]
+
+ for input_url, expected_url in test_cases:
+ with self.subTest(url=input_url):
+ result = self.parser.get_sitemap_url(input_url)
+ self.assertEqual(result, expected_url)
+
+ @patch('requests.get')
+ def test_fetch_sitemap_success(self, mock_get):
+ """Test successful fetching of sitemap."""
+ # Mock response
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.text = "sitemap content "
+ mock_get.return_value = mock_response
+
+ # Call the method
+ result = self.parser.fetch_sitemap("https://example.com/sitemap.xml")
+
+ # Verify results
+ self.assertEqual(result, "sitemap content ")
+ mock_get.assert_called_once_with(
+ "https://example.com/sitemap.xml",
+ headers={'User-Agent': self.user_agent},
+ timeout=10
+ )
+
+ @patch('requests.get')
+ def test_fetch_sitemap_failure(self, mock_get):
+ """Test handling of sitemap fetch failures."""
+ # Mock 404 response
+ mock_response = MagicMock()
+ mock_response.status_code = 404
+ mock_get.return_value = mock_response
+
+ # Call the method
+ result = self.parser.fetch_sitemap("https://example.com/sitemap.xml")
+
+ # Verify results
+ self.assertIsNone(result)
+
+ @patch('requests.get')
+ def test_fetch_sitemap_exception(self, mock_get):
+ """Test handling of exceptions during sitemap fetch."""
+ # Make the request raise an exception
+ mock_get.side_effect = Exception("Network error")
+
+ # Call the method
+ result = self.parser.fetch_sitemap("https://example.com/sitemap.xml")
+
+ # Verify results
+ self.assertIsNone(result)
+
+ def test_is_sitemap_index(self):
+ """Test detecting sitemap index vs regular sitemap."""
+ # Sitemap index
+ sitemap_index = """
+
+
+
+ https://example.com/sitemap1.xml
+ 2023-01-01
+
+
+ https://example.com/sitemap2.xml
+
+
+ /sitemap3.xml
+
+
+ """
+
+ # Regular sitemap
+ regular_sitemap = """
+
+
+
+ https://example.com/page1
+
+
+ https://example.com/page2
+
+
+ """
+
+ # Non-XML content
+ non_xml = "This is not XML"
+
+ # Test cases
+ self.assertTrue(self.parser.is_sitemap_index(sitemap_index))
+ self.assertFalse(self.parser.is_sitemap_index(regular_sitemap))
+ self.assertFalse(self.parser.is_sitemap_index(non_xml))
+
+ def test_parse_sitemap_index(self):
+ """Test parsing a sitemap index."""
+ sitemap_index = """
+
+
+
+ https://example.com/sitemap1.xml
+ 2023-01-01
+
+
+ /sitemap2.xml
+
+
+ /sitemap3.xml
+
+
+ """
+
+ base_url = "https://example.com"
+ # Only 2 sub-sitemaps should be returned due to max_subsitemaps=2
+ expected_urls = [
+ "https://example.com/sitemap1.xml",
+ "https://example.com/sitemap2.xml"
+ ]
+
+ result = self.parser.parse_sitemap_index(sitemap_index, base_url)
+ self.assertEqual(sorted(result), sorted(expected_urls))
+
+ def test_parse_sitemap(self):
+ """Test parsing a regular sitemap."""
+ sitemap = """
+
+
+
+ https://example.com/page1
+ 2023-01-01
+ daily
+ 0.8
+
+
+ /page2
+ 0.5
+
+
+ """
+
+ base_url = "https://example.com"
+ expected_data = [
+ {
+ 'url': 'https://example.com/page1',
+ 'lastmod': '2023-01-01',
+ 'changefreq': 'daily',
+ 'priority': 0.8
+ },
+ {
+ 'url': 'https://example.com/page2',
+ 'lastmod': None,
+ 'changefreq': None,
+ 'priority': 0.5
+ }
+ ]
+
+ result = self.parser.parse_sitemap(sitemap, base_url)
+
+ # Compare each URL data
+ for expected, actual in zip(sorted(expected_data, key=lambda x: x['url']),
+ sorted(result, key=lambda x: x['url'])):
+ self.assertEqual(expected['url'], actual['url'])
+ self.assertEqual(expected['lastmod'], actual['lastmod'])
+ self.assertEqual(expected['changefreq'], actual['changefreq'])
+ self.assertEqual(expected['priority'], actual['priority'])
+
+ @patch.object(SitemapParser, 'extract_urls_from_sitemap')
+ def test_get_urls_from_domain(self, mock_extract):
+ """Test getting URLs from a domain sitemap."""
+ # Setup mock
+ expected_urls = {'https://example.com/page1', 'https://example.com/page2'}
+ mock_extract.return_value = expected_urls
+
+ # Call the method
+ result = self.parser.get_urls_from_domain("https://example.com")
+
+ # Verify results
+ self.assertEqual(result, expected_urls)
+ mock_extract.assert_called_once_with("https://example.com/sitemap.xml")
+
+
+if __name__ == "__main__":
+ unittest.main()
\ No newline at end of file