Advanced Multi-URL Crawling with Dispatchers

Heads Up: Crawl4AI supports advanced dispatchers for parallel or throttled crawling, providing dynamic rate limiting and memory usage checks. The built-in arun_many() function uses these dispatchers to handle concurrency efficiently.

1. Introduction

When crawling many URLs: - Basic: Use arun() in a loop (simple but less efficient) - Better: Use arun_many(), which efficiently handles multiple URLs with proper concurrency control - Best: Customize dispatcher behavior for your specific needs (memory management, rate limits, etc.)

Why Dispatchers?
- Adaptive: Memory-based dispatchers can pause or slow down based on system resources - Rate-limiting: Built-in rate limiting with exponential backoff for 429/503 responses - Real-time Monitoring: Live dashboard of ongoing tasks, memory usage, and performance - Flexibility: Choose between memory-adaptive or semaphore-based concurrency

2. Core Components

2.1 Rate Limiter

class RateLimiter:
    def __init__(
        base_delay: Tuple[float, float] = (1.0, 3.0),  # Random delay range between requests
        max_delay: float = 60.0,                        # Maximum backoff delay
        max_retries: int = 3,                          # Retries before giving up
        rate_limit_codes: List[int] = [429, 503]       # Status codes triggering backoff
    )

The RateLimiter provides: - Random delays between requests - Exponential backoff on rate limit responses - Domain-specific rate limiting - Automatic retry handling

2.2 Crawler Monitor

The CrawlerMonitor provides real-time visibility into crawling operations:

monitor = CrawlerMonitor(
    max_visible_rows=15,           # Maximum rows in live display
    display_mode=DisplayMode.DETAILED  # DETAILED or AGGREGATED view
)

Display Modes: 1. DETAILED: Shows individual task status, memory usage, and timing 2. AGGREGATED: Displays summary statistics and overall progress

3. Available Dispatchers

3.1 MemoryAdaptiveDispatcher (Default)

Automatically manages concurrency based on system memory usage:

dispatcher = MemoryAdaptiveDispatcher(
    memory_threshold_percent=90.0,  # Pause if memory exceeds this
    check_interval=1.0,             # How often to check memory
    max_session_permit=10,          # Maximum concurrent tasks
    rate_limiter=RateLimiter(       # Optional rate limiting
        base_delay=(1.0, 2.0),
        max_delay=30.0,
        max_retries=2
    ),
    monitor=CrawlerMonitor(         # Optional monitoring
        max_visible_rows=15,
        display_mode=DisplayMode.DETAILED
    )
)

3.2 SemaphoreDispatcher

Provides simple concurrency control with a fixed limit:

dispatcher = SemaphoreDispatcher(
    max_session_permit=5,             # Fixed concurrent tasks
    rate_limiter=RateLimiter(      # Optional rate limiting
        base_delay=(0.5, 1.0),
        max_delay=10.0
    ),
    monitor=CrawlerMonitor(        # Optional monitoring
        max_visible_rows=15,
        display_mode=DisplayMode.DETAILED
    )
)

4. Usage Examples

4.1 Batch Processing (Default)

async def crawl_batch():
    browser_config = BrowserConfig(headless=True, verbose=False)
    run_config = CrawlerRunConfig(
        cache_mode=CacheMode.BYPASS,
        stream=False  # Default: get all results at once
    )

    dispatcher = MemoryAdaptiveDispatcher(
        memory_threshold_percent=70.0,
        check_interval=1.0,
        max_session_permit=10,
        monitor=CrawlerMonitor(
            display_mode=DisplayMode.DETAILED
        )
    )

    async with AsyncWebCrawler(config=browser_config) as crawler:
        # Get all results at once
        results = await crawler.arun_many(
            urls=urls,
            config=run_config,
            dispatcher=dispatcher
        )

        # Process all results after completion
        for result in results:
            if result.success:
                await process_result(result)
            else:
                print(f"Failed to crawl {result.url}: {result.error_message}")

4.2 Streaming Mode

async def crawl_streaming():
    browser_config = BrowserConfig(headless=True, verbose=False)
    run_config = CrawlerRunConfig(
        cache_mode=CacheMode.BYPASS,
        stream=True  # Enable streaming mode
    )

    dispatcher = MemoryAdaptiveDispatcher(
        memory_threshold_percent=70.0,
        check_interval=1.0,
        max_session_permit=10,
        monitor=CrawlerMonitor(
            display_mode=DisplayMode.DETAILED
        )
    )

    async with AsyncWebCrawler(config=browser_config) as crawler:
        # Process results as they become available
        async for result in await crawler.arun_many(
            urls=urls,
            config=run_config,
            dispatcher=dispatcher
        ):
            if result.success:
                # Process each result immediately
                await process_result(result)
            else:
                print(f"Failed to crawl {result.url}: {result.error_message}")

4.3 Semaphore-based Crawling

async def crawl_with_semaphore(urls):
    browser_config = BrowserConfig(headless=True, verbose=False)
    run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)

    dispatcher = SemaphoreDispatcher(
        semaphore_count=5,
        rate_limiter=RateLimiter(
            base_delay=(0.5, 1.0),
            max_delay=10.0
        ),
        monitor=CrawlerMonitor(
            max_visible_rows=15,
            display_mode=DisplayMode.DETAILED
        )
    )

    async with AsyncWebCrawler(config=browser_config) as crawler:
        results = await crawler.arun_many(
            urls, 
            config=run_config,
            dispatcher=dispatcher
        )
        return results

4.4 Robots.txt Consideration

import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode

async def main():
    urls = [
        "https://example1.com",
        "https://example2.com",
        "https://example3.com"
    ]

    config = CrawlerRunConfig(
        cache_mode=CacheMode.ENABLED,
        check_robots_txt=True,  # Will respect robots.txt for each URL
        semaphore_count=3      # Max concurrent requests
    )

    async with AsyncWebCrawler() as crawler:
        async for result in crawler.arun_many(urls, config=config):
            if result.success:
                print(f"Successfully crawled {result.url}")
            elif result.status_code == 403 and "robots.txt" in result.error_message:
                print(f"Skipped {result.url} - blocked by robots.txt")
            else:
                print(f"Failed to crawl {result.url}: {result.error_message}")

if __name__ == "__main__":
    asyncio.run(main())

Key Points: - When check_robots_txt=True, each URL's robots.txt is checked before crawling - Robots.txt files are cached for efficiency - Failed robots.txt checks return 403 status code - Dispatcher handles robots.txt checks automatically for each URL

5. Dispatch Results

Each crawl result includes dispatch information:

@dataclass
class DispatchResult:
    task_id: str
    memory_usage: float
    peak_memory: float
    start_time: datetime
    end_time: datetime
    error_message: str = ""

Access via result.dispatch_result:

for result in results:
    if result.success:
        dr = result.dispatch_result
        print(f"URL: {result.url}")
        print(f"Memory: {dr.memory_usage:.1f}MB")
        print(f"Duration: {dr.end_time - dr.start_time}")

6. Summary

  1. Two Dispatcher Types:
  2. MemoryAdaptiveDispatcher (default): Dynamic concurrency based on memory
  3. SemaphoreDispatcher: Fixed concurrency limit

  4. Optional Components:

  5. RateLimiter: Smart request pacing and backoff
  6. CrawlerMonitor: Real-time progress visualization

  7. Key Benefits:

  8. Automatic memory management
  9. Built-in rate limiting
  10. Live progress monitoring
  11. Flexible concurrency control

Choose the dispatcher that best fits your needs: - MemoryAdaptiveDispatcher: For large crawls or limited resources - SemaphoreDispatcher: For simple, fixed-concurrency scenarios