Tutorials

Build a Multi-Site Data Aggregation System with CaptchaAI

Scrape structured data from multiple heterogeneous sources, each with different page layouts and CAPTCHA types, and merge into a unified dataset.


Architecture

Source Configs ──> Source Adapters ──> CAPTCHA Router ──> Data Normalizer ──> Unified Store
     │                                     │
     └─ Selectors, URL patterns      CaptchaAI API

Universal CAPTCHA Router

# captcha_router.py
import requests
import re
import time
import os


class CaptchaRouter:
    """Routes to the correct solve method based on CAPTCHA type detected."""

    def __init__(self):
        self.api_key = os.environ["CAPTCHAAI_API_KEY"]

    def solve_if_present(self, session, url, html):
        captcha_type = self._detect_type(html)

        if captcha_type == "turnstile":
            return self._solve_turnstile(session, url, html)
        elif captcha_type == "recaptcha_v2":
            return self._solve_recaptcha(session, url, html)
        elif captcha_type == "recaptcha_v3":
            return self._solve_recaptcha_v3(session, url, html)
        else:
            return html  # No CAPTCHA

    def _detect_type(self, html):
        if "cf-turnstile" in html or "challenges.cloudflare.com/turnstile" in html:
            return "turnstile"
        if "recaptcha/api.js?render=" in html:
            return "recaptcha_v3"
        if "data-sitekey" in html and "g-recaptcha" in html:
            return "recaptcha_v2"
        return None

    def _solve_turnstile(self, session, url, html):
        sitekey = self._extract_sitekey(html)
        return self._submit_and_poll(session, url, {
            "method": "turnstile",
            "sitekey": sitekey,
            "pageurl": url,
        })

    def _solve_recaptcha(self, session, url, html):
        sitekey = self._extract_sitekey(html)
        return self._submit_and_poll(session, url, {
            "method": "userrecaptcha",
            "googlekey": sitekey,
            "pageurl": url,
        })

    def _solve_recaptcha_v3(self, session, url, html):
        match = re.search(r'recaptcha/api\.js\?render=([^"&]+)', html)
        sitekey = match.group(1) if match else self._extract_sitekey(html)

        return self._submit_and_poll(session, url, {
            "method": "userrecaptcha",
            "googlekey": sitekey,
            "pageurl": url,
            "version": "v3",
            "action": "submit",
            "min_score": "0.5",
        })

    def _extract_sitekey(self, html):
        match = re.search(r'data-sitekey="([^"]+)"', html)
        if match:
            return match.group(1)
        match = re.search(r"sitekey['\"]?\s*[:=]\s*['\"]([^'\"]+)", html)
        return match.group(1) if match else ""

    def _submit_and_poll(self, session, url, params):
        params["key"] = self.api_key
        params["json"] = 1

        resp = requests.post(
            "https://ocr.captchaai.com/in.php",
            data=params, timeout=30,
        )
        task_id = resp.json()["request"]

        time.sleep(15)
        for _ in range(24):
            resp = requests.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get",
                "id": task_id, "json": 1,
            }, timeout=15)
            data = resp.json()
            if data.get("status") == 1:
                token = data["request"]
                post_resp = session.post(url, data={
                    "g-recaptcha-response": token,
                    "cf-turnstile-response": token,
                }, timeout=30)
                return post_resp.text
            if data["request"] != "CAPCHA_NOT_READY":
                raise RuntimeError(data["request"])
            time.sleep(5)
        raise TimeoutError("CAPTCHA timeout")

Source Adapter Base

# adapters.py
import requests
from bs4 import BeautifulSoup
from captcha_router import CaptchaRouter


class SourceAdapter:
    """Base adapter for a data source."""

    def __init__(self, name, base_url, selectors):
        self.name = name
        self.base_url = base_url
        self.selectors = selectors
        self.router = CaptchaRouter()
        self.session = requests.Session()
        self.session.headers["User-Agent"] = (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36"
        )

    def fetch_page(self, url):
        resp = self.session.get(url, timeout=20)
        return self.router.solve_if_present(self.session, url, resp.text)

    def extract_items(self, html):
        soup = BeautifulSoup(html, "html.parser")
        cards = soup.select(self.selectors["card"])
        items = []

        for card in cards:
            item = {}
            for field, selector in self.selectors["fields"].items():
                el = card.select_one(selector)
                item[field] = el.get_text(strip=True) if el else ""
            item["source"] = self.name
            items.append(item)

        return items

    def scrape(self, query, max_pages=3):
        all_items = []

        for page in range(1, max_pages + 1):
            url = self.base_url.format(query=query.replace(" ", "+"), page=page)
            html = self.fetch_page(url)
            items = self.extract_items(html)

            if not items:
                break
            all_items.extend(items)

        return all_items

Data Normalizer

# normalizer.py
import re


class DataNormalizer:
    """Normalize data from different sources into a unified schema."""

    FIELD_MAP = {
        "product_name": ["title", "name", "product", "item_name"],
        "price": ["price", "cost", "amount"],
        "description": ["description", "desc", "summary", "details"],
        "category": ["category", "type", "group"],
        "url": ["url", "link", "href"],
    }

    def normalize(self, items):
        normalized = []
        for item in items:
            norm = {"source": item.get("source", "")}
            for target, aliases in self.FIELD_MAP.items():
                for alias in aliases:
                    if alias in item and item[alias]:
                        norm[target] = item[alias]
                        break
                if target not in norm:
                    norm[target] = ""
            norm["price_numeric"] = self._parse_price(norm.get("price", ""))
            normalized.append(norm)
        return normalized

    def _parse_price(self, text):
        match = re.search(r'[\d,]+\.?\d*', text.replace(",", ""))
        return float(match.group()) if match else None

    def deduplicate(self, items, key_fields=("product_name", "source")):
        seen = set()
        unique = []
        for item in items:
            key = tuple(item.get(f, "").lower() for f in key_fields)
            if key not in seen:
                seen.add(key)
                unique.append(item)
        return unique

Unified Store

# store.py
import csv
import json
from datetime import datetime


class UnifiedStore:
    def __init__(self, output_dir="output"):
        import os
        os.makedirs(output_dir, exist_ok=True)
        self.output_dir = output_dir

    def save_csv(self, items, filename="aggregated.csv"):
        if not items:
            return
        path = f"{self.output_dir}/{filename}"
        with open(path, "w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=items[0].keys())
            writer.writeheader()
            writer.writerows(items)
        print(f"Saved {len(items)} items to {path}")

    def save_json(self, items, filename="aggregated.json"):
        path = f"{self.output_dir}/{filename}"
        with open(path, "w", encoding="utf-8") as f:
            json.dump({
                "items": items,
                "count": len(items),
                "generated_at": datetime.now().isoformat(),
            }, f, indent=2)
        print(f"Saved {len(items)} items to {path}")

Main Pipeline

# main.py
import time
from adapters import SourceAdapter
from normalizer import DataNormalizer
from store import UnifiedStore

SOURCES = [
    {
        "name": "Store Alpha",
        "base_url": "https://store-alpha.example.com/search?q={query}&page={page}",
        "selectors": {
            "card": ".product-card",
            "fields": {
                "title": "h3.product-title",
                "price": ".price",
                "category": ".category-tag",
                "description": ".product-desc",
            },
        },
    },
    {
        "name": "Store Beta",
        "base_url": "https://store-beta.example.com/find?term={query}&p={page}",
        "selectors": {
            "card": ".item-listing",
            "fields": {
                "name": ".item-name",
                "cost": ".item-price",
                "type": ".item-category",
                "summary": ".item-summary",
            },
        },
    },
]

QUERIES = ["wireless headphones", "bluetooth speaker"]


def main():
    normalizer = DataNormalizer()
    store = UnifiedStore()
    all_items = []

    for source_config in SOURCES:
        adapter = SourceAdapter(
            source_config["name"],
            source_config["base_url"],
            source_config["selectors"],
        )

        for query in QUERIES:
            print(f"Scraping {source_config['name']} for '{query}'...")
            raw_items = adapter.scrape(query)
            print(f"  Found {len(raw_items)} items")
            all_items.extend(raw_items)
            time.sleep(5)

    # Normalize and deduplicate
    normalized = normalizer.normalize(all_items)
    unique = normalizer.deduplicate(normalized)
    print(f"\nTotal: {len(all_items)} raw → {len(normalized)} normalized → {len(unique)} unique")

    # Export
    store.save_csv(unique)
    store.save_json(unique)


if __name__ == "__main__":
    main()

FAQ

How do I add a new data source?

Add an entry to SOURCES with the site's URL template, card CSS selector, and field selectors. The normalizer maps varied field names to the unified schema.

What if sources use different CAPTCHA types?

The CaptchaRouter auto-detects reCAPTCHA v2, v3, and Turnstile. Add more detection patterns for GeeTest or BLS if needed.

How do I handle sources with JavaScript rendering?

Replace requests.get() with Selenium or Playwright in the adapter's fetch_page method. The CAPTCHA router works the same way.



Aggregate data from any source — start with CaptchaAI.

Full Working Code

Complete runnable examples for this article in Python, Node.js, PHP, Go, Java, C#, Ruby, Rust, Kotlin & Bash.

View on GitHub →

Discussions (0)

No comments yet.