Tutorials

Python ThreadPoolExecutor for CAPTCHA Solving Parallelism

asyncio is powerful but requires rewriting your entire call chain as async. ThreadPoolExecutor gives you parallelism with standard synchronous code — drop it into existing projects without restructuring.

Why ThreadPoolExecutor for CAPTCHAs

CAPTCHA solving is I/O-bound (waiting for HTTP responses). Python threads release the GIL during I/O operations, making ThreadPoolExecutor efficient for this workload:

Approach Complexity Fits existing code Parallelism for I/O
Sequential None Yes None
ThreadPoolExecutor Low Yes Good
asyncio High Requires async rewrite Best
multiprocessing Medium Mostly Overkill for I/O

Basic Implementation

import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

API_KEY = os.environ["CAPTCHAAI_API_KEY"]


def solve_captcha(sitekey, pageurl):
    """Synchronous CAPTCHA solve — submit and poll."""
    # Submit
    resp = requests.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": "userrecaptcha",
        "googlekey": sitekey,
        "pageurl": pageurl,
        "json": 1
    })
    data = resp.json()

    if data.get("status") != 1:
        raise RuntimeError(data.get("request", "Submit failed"))

    captcha_id = data["request"]

    # Poll for result
    for _ in range(60):
        time.sleep(5)
        result = requests.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY,
            "action": "get",
            "id": captcha_id,
            "json": 1
        }).json()

        if result.get("status") == 1:
            return result["request"]
        if result.get("request") != "CAPCHA_NOT_READY":
            raise RuntimeError(result.get("request", "Unknown error"))

    raise TimeoutError("Solve timeout after 300s")


# Batch solve with ThreadPoolExecutor
tasks = [
    {"sitekey": "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "pageurl": f"https://example.com/page/{i}"}
    for i in range(20)
]

start = time.time()

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {
        executor.submit(solve_captcha, t["sitekey"], t["pageurl"]): t
        for t in tasks
    }

    solved = 0
    failed = 0

    for future in as_completed(futures):
        task = futures[future]
        try:
            solution = future.result()
            solved += 1
            print(f"[OK] {task['pageurl']}: {solution[:30]}...")
        except Exception as e:
            failed += 1
            print(f"[ERR] {task['pageurl']}: {e}")

elapsed = time.time() - start
print(f"\nDone: {solved} solved, {failed} failed in {elapsed:.1f}s")

Using Session for Connection Reuse

Creating a new TCP connection per request wastes time. Share a requests.Session per thread:

import threading

# Thread-local storage for sessions
thread_local = threading.local()


def get_session():
    """Get or create a thread-local session."""
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
        # Configure connection pooling
        adapter = requests.adapters.HTTPAdapter(
            pool_connections=10,
            pool_maxsize=10,
            max_retries=2
        )
        thread_local.session.mount("https://", adapter)
    return thread_local.session


def solve_captcha_pooled(sitekey, pageurl):
    """Solve using thread-local connection pooling."""
    session = get_session()

    resp = session.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": "userrecaptcha",
        "googlekey": sitekey,
        "pageurl": pageurl,
        "json": 1
    })
    data = resp.json()

    if data.get("status") != 1:
        raise RuntimeError(data.get("request"))

    captcha_id = data["request"]

    for _ in range(60):
        time.sleep(5)
        result = session.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY,
            "action": "get",
            "id": captcha_id,
            "json": 1
        }).json()

        if result.get("status") == 1:
            return result["request"]
        if result.get("request") != "CAPCHA_NOT_READY":
            raise RuntimeError(result.get("request"))

    raise TimeoutError("Solve timeout")

map() for Simple Batch Operations

When you don't need per-task error handling:

def solve_task(task):
    """Wrapper that returns result dict."""
    try:
        solution = solve_captcha_pooled(task["sitekey"], task["pageurl"])
        return {"url": task["pageurl"], "solution": solution, "error": None}
    except Exception as e:
        return {"url": task["pageurl"], "solution": None, "error": str(e)}


with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(solve_task, tasks))

solved = [r for r in results if r["solution"]]
failed = [r for r in results if r["error"]]
print(f"Solved: {len(solved)}, Failed: {len(failed)}")

Timeout Protection

Prevent runaway threads from blocking your pool:

from concurrent.futures import TimeoutError as FuturesTimeout

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {
        executor.submit(solve_captcha_pooled, t["sitekey"], t["pageurl"]): t
        for t in tasks
    }

    for future in as_completed(futures, timeout=600):  # 10 min global timeout
        task = futures[future]
        try:
            solution = future.result(timeout=120)  # 2 min per task
            print(f"[OK] {task['pageurl']}")
        except FuturesTimeout:
            print(f"[TIMEOUT] {task['pageurl']}")
        except Exception as e:
            print(f"[ERR] {task['pageurl']}: {e}")

Progress Callback

Track completion in real-time:

import threading

progress_lock = threading.Lock()
progress = {"done": 0, "total": 0}


def solve_with_progress(task):
    result = solve_task(task)
    with progress_lock:
        progress["done"] += 1
        pct = progress["done"] / progress["total"] * 100
        print(f'\r  Progress: {progress["done"]}/{progress["total"]} ({pct:.0f}%)', end="")
    return result


progress["total"] = len(tasks)

with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(solve_with_progress, tasks))

print()  # Newline after progress

Choosing max_workers

Workers Concurrent solves Overhead Best for
5 5 Very low Small batches, conservative use
10 10 Low General use
25 25 Moderate High-volume pipelines
50 50 Higher Maximum throughput

More workers means more concurrent API connections. Start at 10, increase while monitoring error rates.

ThreadPoolExecutor vs asyncio

# ThreadPoolExecutor — drop into existing sync code
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(solve_task, tasks))

# asyncio — requires async function chain
async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [solve_async(session, t) for t in task_list]
        results = await asyncio.gather(*tasks)

Use ThreadPoolExecutor when:

  • Your existing codebase is synchronous
  • You use libraries that don't support async (Selenium, some ORMs)
  • You want quick parallelism without restructuring

Use asyncio when:

  • Building from scratch
  • Maximum efficiency matters (fewer OS threads)
  • Already in an async framework (FastAPI, aiohttp)

Troubleshooting

Issue Cause Fix
All threads blocked Every thread waiting on time.sleep during polling This is expected — threads release GIL during sleep
ConnectionError spikes Too many concurrent connections Reduce max_workers; use connection pooling
Results out of order as_completed returns in completion order Use map() for ordered results, or track with dict
Memory growing Large result objects held in futures Process results in as_completed loop; don't store all

FAQ

Does the GIL prevent real parallelism?

No — for I/O-bound work like HTTP requests and time.sleep, Python releases the GIL. Your threads run truly concurrent during network calls. The GIL only limits CPU-bound parallelism.

How many CAPTCHAs can ThreadPoolExecutor handle per hour?

With 10 workers and 15-second average solve time: ~2,400 per hour. With 25 workers: ~6,000 per hour. The bottleneck is CaptchaAI solve time, not Python threading.

Should I use ProcessPoolExecutor instead?

No. CAPTCHA solving is I/O-bound. ProcessPoolExecutor adds inter-process communication overhead with no benefit. Stick with threads.

Next Steps

Parallelize CAPTCHA solving — get your CaptchaAI API key and drop ThreadPoolExecutor into your pipeline.

Related guides:

Full Working Code

Complete runnable examples for this article in Python, Node.js, PHP, Go, Java, C#, Ruby, Rust, Kotlin & Bash.

View on GitHub →

Discussions (0)

No comments yet.