Reference

CAPTCHA Solving Architecture Patterns for High Volume

Patterns for solving 1,000 to 100,000+ CAPTCHAs per hour reliably.


Pattern 1: Simple Worker Pool

Best for: 100–1,000 solves/hour.

┌──────────┐     ┌──────────────┐     ┌────────────┐
│  Scraper  │────▶│  Thread Pool │────▶│ CaptchaAI  │
│  Tasks    │     │  (5-20       │     │   API      │
│           │◀────│   workers)   │◀────│            │
└──────────┘     └──────────────┘     └────────────┘
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import requests


class SimpleWorkerPool:
    def __init__(self, api_key, max_workers=10):
        self.api_key = api_key
        self.max_workers = max_workers
        self.base = "https://ocr.captchaai.com"

    def _solve_one(self, params):
        params["key"] = self.api_key
        params["json"] = 1

        resp = requests.post(f"{self.base}/in.php", data=params).json()
        if resp["status"] != 1:
            return {"error": resp["request"]}

        task_id = resp["request"]
        time.sleep(10)

        for _ in range(60):
            result = requests.get(
                f"{self.base}/res.php",
                params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
            ).json()

            if result["request"] == "CAPCHA_NOT_READY":
                time.sleep(5)
                continue
            if result["status"] == 1:
                return {"token": result["request"]}
            return {"error": result["request"]}

        return {"error": "timeout"}

    def solve_batch(self, tasks):
        """tasks: list of (identifier, params) tuples."""
        results = {}
        with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
            futures = {
                pool.submit(self._solve_one, params): ident
                for ident, params in tasks
            }
            for future in as_completed(futures):
                ident = futures[future]
                try:
                    results[ident] = future.result()
                except Exception as e:
                    results[ident] = {"error": str(e)}
        return results

Pattern 2: Queue-Based Pipeline

Best for: 1,000–10,000 solves/hour with back-pressure control.

┌────────┐     ┌───────────┐     ┌──────────┐     ┌───────────┐     ┌────────┐
│Producer│────▶│ Submit    │────▶│ Pending  │────▶│  Poll     │────▶│Results │
│        │     │ Queue     │     │ Queue    │     │  Workers  │     │ Queue  │
└────────┘     └───────────┘     └──────────┘     └───────────┘     └────────┘
import queue
import threading
import time
import requests


class QueuePipeline:
    def __init__(self, api_key, submit_workers=5, poll_workers=10):
        self.api_key = api_key
        self.base = "https://ocr.captchaai.com"
        self.submit_queue = queue.Queue(maxsize=100)
        self.pending_queue = queue.Queue()
        self.results = {}
        self.results_lock = threading.Lock()
        self._running = False
        self.submit_workers = submit_workers
        self.poll_workers = poll_workers

    def start(self):
        self._running = True
        for _ in range(self.submit_workers):
            threading.Thread(target=self._submit_worker, daemon=True).start()
        for _ in range(self.poll_workers):
            threading.Thread(target=self._poll_worker, daemon=True).start()

    def stop(self):
        self._running = False

    def add(self, ident, params):
        self.submit_queue.put((ident, params))

    def get_result(self, ident, timeout=300):
        deadline = time.time() + timeout
        while time.time() < deadline:
            with self.results_lock:
                if ident in self.results:
                    return self.results.pop(ident)
            time.sleep(1)
        return {"error": "timeout"}

    def _submit_worker(self):
        while self._running:
            try:
                ident, params = self.submit_queue.get(timeout=1)
            except queue.Empty:
                continue
            params["key"] = self.api_key
            params["json"] = 1
            try:
                resp = requests.post(f"{self.base}/in.php", data=params).json()
                if resp["status"] == 1:
                    self.pending_queue.put((ident, resp["request"], time.time()))
                else:
                    with self.results_lock:
                        self.results[ident] = {"error": resp["request"]}
            except Exception as e:
                with self.results_lock:
                    self.results[ident] = {"error": str(e)}

    def _poll_worker(self):
        while self._running:
            try:
                ident, task_id, submitted_at = self.pending_queue.get(timeout=1)
            except queue.Empty:
                continue

            # Wait at least 10s from submission
            wait = 10 - (time.time() - submitted_at)
            if wait > 0:
                time.sleep(wait)

            try:
                resp = requests.get(
                    f"{self.base}/res.php",
                    params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
                ).json()

                if resp["request"] == "CAPCHA_NOT_READY":
                    self.pending_queue.put((ident, task_id, submitted_at))
                    time.sleep(3)
                elif resp["status"] == 1:
                    with self.results_lock:
                        self.results[ident] = {"token": resp["request"]}
                else:
                    with self.results_lock:
                        self.results[ident] = {"error": resp["request"]}
            except Exception:
                self.pending_queue.put((ident, task_id, submitted_at))
                time.sleep(5)

Usage:

pipeline = QueuePipeline("YOUR_API_KEY")
pipeline.start()

# Add CAPTCHAs
pipeline.add("page_1", {"method": "turnstile", "sitekey": "KEY", "pageurl": "URL"})
pipeline.add("page_2", {"method": "userrecaptcha", "googlekey": "KEY", "pageurl": "URL"})

# Get results
result1 = pipeline.get_result("page_1")
result2 = pipeline.get_result("page_2")

pipeline.stop()

Pattern 3: Circuit Breaker

Prevents cascading failures when the solving API is degraded.

import time
import threading


class CircuitBreaker:
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing — reject requests
    HALF_OPEN = "half_open"  # Testing recovery

    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.state = self.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0
        self.lock = threading.Lock()

    def can_proceed(self):
        with self.lock:
            if self.state == self.CLOSED:
                return True
            if self.state == self.OPEN:
                if time.time() - self.last_failure_time > self.reset_timeout:
                    self.state = self.HALF_OPEN
                    return True
                return False
            # HALF_OPEN: allow one request
            return True

    def record_success(self):
        with self.lock:
            self.failure_count = 0
            self.state = self.CLOSED

    def record_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = self.OPEN


class ResilientSolver:
    def __init__(self, api_key):
        self.api_key = api_key
        self.breaker = CircuitBreaker(failure_threshold=5, reset_timeout=60)

    def solve(self, params):
        if not self.breaker.can_proceed():
            raise Exception("Circuit open — API degraded, try later")

        try:
            result = self._do_solve(params)
            self.breaker.record_success()
            return result
        except Exception as e:
            self.breaker.record_failure()
            raise

    def _do_solve(self, params):
        # Standard solve logic
        pass

Pattern 4: Pre-Solving with Token Buffer

Maintain a buffer of pre-solved tokens for instant use.

import queue
import threading
import time


class TokenBuffer:
    def __init__(self, solver, params, buffer_size=5, ttl_seconds=90):
        self.solver = solver
        self.params = params
        self.buffer = queue.Queue(maxsize=buffer_size)
        self.ttl = ttl_seconds
        self.buffer_size = buffer_size
        self._running = False

    def start(self):
        self._running = True
        threading.Thread(target=self._fill_loop, daemon=True).start()

    def stop(self):
        self._running = False

    def get_token(self, timeout=30):
        """Get a pre-solved token. Returns None if buffer empty."""
        try:
            token, created_at = self.buffer.get(timeout=timeout)
            if time.time() - created_at > self.ttl:
                # Token expired, try next
                return self.get_token(timeout=timeout)
            return token
        except queue.Empty:
            return None

    def _fill_loop(self):
        while self._running:
            if self.buffer.qsize() < self.buffer_size:
                try:
                    token = self.solver.solve(self.params)
                    self.buffer.put((token, time.time()))
                except Exception:
                    time.sleep(5)
            else:
                time.sleep(2)

Pattern 5: Multi-Provider Failover

Route to backup providers when primary fails.

class MultiProviderSolver:
    def __init__(self, providers):
        """providers: list of (name, solver_instance, priority) tuples."""
        self.providers = sorted(providers, key=lambda x: x[2])
        self.breakers = {name: CircuitBreaker() for name, _, _ in providers}

    def solve(self, params):
        errors = []
        for name, solver, _ in self.providers:
            if not self.breakers[name].can_proceed():
                continue
            try:
                result = solver.solve(params)
                self.breakers[name].record_success()
                return result
            except Exception as e:
                self.breakers[name].record_failure()
                errors.append(f"{name}: {e}")
        raise Exception(f"All providers failed: {'; '.join(errors)}")

Scaling Guidelines

Volume Architecture Workers Notes
< 100/hr Direct calls 1-3 No special architecture needed
100-1K/hr Worker pool 5-10 Pattern 1
1K-10K/hr Queue pipeline 10-30 Pattern 2 + circuit breaker
10K-50K/hr Distributed queues 30-100 Redis/RabbitMQ, multiple machines
50K+/hr Multi-provider 100+ Pattern 5 + distributed queue

Monitoring at Scale

import logging
from collections import defaultdict

logger = logging.getLogger("captcha_scale")


class ScaleMetrics:
    def __init__(self):
        self.counts = defaultdict(int)
        self.times = defaultdict(list)

    def record(self, captcha_type, success, elapsed):
        key = f"{captcha_type}_{'ok' if success else 'fail'}"
        self.counts[key] += 1
        self.times[captcha_type].append(elapsed)

    def report(self):
        for captcha_type in set(k.rsplit("_", 1)[0] for k in self.counts):
            ok = self.counts.get(f"{captcha_type}_ok", 0)
            fail = self.counts.get(f"{captcha_type}_fail", 0)
            total = ok + fail
            rate = (ok / total * 100) if total else 0
            times = self.times.get(captcha_type, [])
            avg_time = sum(times) / len(times) if times else 0
            logger.info(
                f"{captcha_type}: {total} solves, {rate:.1f}% success, {avg_time:.1f}s avg"
            )

FAQ

How many concurrent solves can CaptchaAI handle?

CaptchaAI can handle high concurrency. Start with 10-20 concurrent requests and increase based on your results.

Should I use async or threads?

Threads work well since CAPTCHA solving is I/O-bound (network requests + waiting). For Python 3.7+, asyncio with aiohttp is also excellent for very high concurrency.

How do I prevent overwhelming the API?

Use a queue with bounded size (back-pressure), rate limiting, and circuit breakers. Monitor your ERROR_NO_SLOT_AVAILABLE rate.



Scale to any volume — start with CaptchaAI.

Discussions (0)

No comments yet.