Tutorials

Python Multiprocessing for Parallel CAPTCHA Solving

CAPTCHA solving is I/O-bound (waiting for the API), but the surrounding scraping work — HTML parsing, data extraction, file writing — benefits from multiple processes. Multiprocessing bypasses Python's GIL and enables true parallel execution.


When to use multiprocessing vs threading vs asyncio

Approach Best for GIL-free Overhead
Threading Pure I/O waits No Low
Asyncio Many concurrent I/O tasks No Low
Multiprocessing CPU + I/O mixed workloads Yes Higher

Use multiprocessing when each job includes both CAPTCHA solving (I/O) and heavy data processing (CPU).


ProcessPoolExecutor — simplest pattern

import time
import requests
from concurrent.futures import ProcessPoolExecutor, as_completed

API_KEY = "YOUR_API_KEY"


def solve_captcha(task):
    """Solve a single CAPTCHA — runs in a separate process."""
    method = task["method"]
    params = task["params"]

    submit = requests.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY, "method": method, "json": 1, **params,
    }, timeout=30).json()

    if submit.get("status") != 1:
        return {"task_id": task["id"], "status": "error", "error": submit.get("request")}

    captcha_id = submit["request"]

    for _ in range(30):
        time.sleep(5)
        result = requests.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY, "action": "get", "id": captcha_id, "json": 1,
        }, timeout=30).json()

        if result.get("status") == 1:
            return {"task_id": task["id"], "status": "solved", "token": result["request"]}
        if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
            return {"task_id": task["id"], "status": "error", "error": "unsolvable"}

    return {"task_id": task["id"], "status": "error", "error": "timeout"}


def solve_batch(tasks, max_workers=4):
    """Solve multiple CAPTCHAs in parallel processes."""
    results = []

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(solve_captcha, task): task for task in tasks}

        for future in as_completed(futures):
            result = future.result()
            results.append(result)
            print(f"Task {result['task_id']}: {result['status']}")

    return results


# Usage
tasks = [
    {"id": i, "method": "userrecaptcha", "params": {"googlekey": f"KEY_{i}", "pageurl": f"https://example.com/{i}"}}
    for i in range(10)
]

results = solve_batch(tasks, max_workers=4)
solved = [r for r in results if r["status"] == "solved"]
print(f"Solved: {len(solved)}/{len(tasks)}")

multiprocessing.Pool with map

from multiprocessing import Pool


def solve_single(args):
    """Worker function for Pool.map — must accept a single argument."""
    task_id, method, sitekey, url = args

    try:
        submit = requests.post("https://ocr.captchaai.com/in.php", data={
            "key": API_KEY, "method": method, "googlekey": sitekey,
            "pageurl": url, "json": 1,
        }, timeout=30).json()

        if submit.get("status") != 1:
            return (task_id, None, submit.get("request"))

        captcha_id = submit["request"]
        for _ in range(30):
            time.sleep(5)
            result = requests.get("https://ocr.captchaai.com/res.php", params={
                "key": API_KEY, "action": "get", "id": captcha_id, "json": 1,
            }, timeout=30).json()
            if result.get("status") == 1:
                return (task_id, result["request"], None)

        return (task_id, None, "timeout")
    except Exception as e:
        return (task_id, None, str(e))


def solve_with_pool(work_items, num_processes=4):
    """Use Pool.map for simple parallel execution."""
    with Pool(processes=num_processes) as pool:
        results = pool.map(solve_single, work_items)

    for task_id, token, error in results:
        if token:
            print(f"Task {task_id}: solved")
        else:
            print(f"Task {task_id}: {error}")

    return results


# Usage
work = [
    (i, "userrecaptcha", f"SITEKEY_{i}", f"https://example.com/page{i}")
    for i in range(8)
]
results = solve_with_pool(work, num_processes=4)

Shared state with Manager

When processes need to share state (counters, results dict):

from multiprocessing import Process, Manager
import time
import requests

API_KEY = "YOUR_API_KEY"


def worker(task_queue, result_dict, counter, api_key):
    """Worker process with shared state."""
    while True:
        try:
            task = task_queue.get(timeout=5)
        except Exception:
            break

        if task is None:  # Poison pill
            break

        try:
            # Solve CAPTCHA
            submit = requests.post("https://ocr.captchaai.com/in.php", data={
                "key": api_key, "method": task["method"], "json": 1, **task["params"],
            }, timeout=30).json()

            if submit.get("status") != 1:
                result_dict[task["id"]] = {"error": submit.get("request")}
                continue

            captcha_id = submit["request"]
            for _ in range(30):
                time.sleep(5)
                result = requests.get("https://ocr.captchaai.com/res.php", params={
                    "key": api_key, "action": "get", "id": captcha_id, "json": 1,
                }, timeout=30).json()
                if result.get("status") == 1:
                    result_dict[task["id"]] = {"token": result["request"]}
                    counter["solved"] = counter.get("solved", 0) + 1
                    break
            else:
                result_dict[task["id"]] = {"error": "timeout"}
                counter["failed"] = counter.get("failed", 0) + 1

        except Exception as e:
            result_dict[task["id"]] = {"error": str(e)}
            counter["failed"] = counter.get("failed", 0) + 1


def parallel_solve_with_manager(tasks, num_workers=4):
    """Parallel CAPTCHA solving with shared state."""
    with Manager() as manager:
        task_queue = manager.Queue()
        result_dict = manager.dict()
        counter = manager.dict({"solved": 0, "failed": 0})

        # Load queue
        for task in tasks:
            task_queue.put(task)
        for _ in range(num_workers):
            task_queue.put(None)  # Poison pills

        # Start workers
        processes = []
        for _ in range(num_workers):
            p = Process(target=worker, args=(task_queue, result_dict, counter, API_KEY))
            p.start()
            processes.append(p)

        # Wait for all workers
        for p in processes:
            p.join()

        print(f"Solved: {counter['solved']}, Failed: {counter['failed']}")
        return dict(result_dict)

Hybrid: multiprocessing + asyncio

The most powerful pattern — multiple processes, each running an async event loop:

import asyncio
import aiohttp
from multiprocessing import Pool
from functools import partial

API_KEY = "YOUR_API_KEY"


async def solve_async_batch(api_key, tasks):
    """Async solver running inside a process."""
    results = []
    semaphore = asyncio.Semaphore(5)

    async def solve_one(task):
        async with semaphore:
            async with aiohttp.ClientSession() as session:
                async with session.post("https://ocr.captchaai.com/in.php", data={
                    "key": api_key, "method": task["method"], "json": 1, **task["params"],
                }) as resp:
                    data = await resp.json(content_type=None)
                    if data.get("status") != 1:
                        return {"id": task["id"], "error": data.get("request")}
                    task_id = data["request"]

                for _ in range(30):
                    await asyncio.sleep(5)
                    async with session.get("https://ocr.captchaai.com/res.php", params={
                        "key": api_key, "action": "get", "id": task_id, "json": 1,
                    }) as resp:
                        result = await resp.json(content_type=None)
                        if result.get("status") == 1:
                            return {"id": task["id"], "token": result["request"]}

                return {"id": task["id"], "error": "timeout"}

    results = await asyncio.gather(*[solve_one(t) for t in tasks])
    return results


def process_chunk(args):
    """Entry point for each process — runs async event loop."""
    api_key, chunk = args
    return asyncio.run(solve_async_batch(api_key, chunk))


def hybrid_solve(tasks, num_processes=4, batch_size=10):
    """Split tasks across processes, each running async solvers."""
    # Split into chunks
    chunks = [tasks[i:i + batch_size] for i in range(0, len(tasks), batch_size)]
    chunk_args = [(API_KEY, chunk) for chunk in chunks]

    with Pool(processes=num_processes) as pool:
        all_results = pool.map(process_chunk, chunk_args)

    # Flatten results
    flat = [r for batch in all_results for r in batch]
    solved = sum(1 for r in flat if "token" in r)
    print(f"Total: {len(flat)}, Solved: {solved}")
    return flat


# Usage
tasks = [
    {"id": i, "method": "userrecaptcha", "params": {"googlekey": f"KEY_{i}", "pageurl": f"https://example.com/{i}"}}
    for i in range(40)
]

results = hybrid_solve(tasks, num_processes=4, batch_size=10)

Choosing the right concurrency level

CaptchaAI plan limit       →  max total concurrent solves
Number of CPU cores         →  max processes (multiprocessing)
Tasks per process           →  max async tasks per event loop
API response time           →  determines throughput ceiling

Example: 4-core machine, API limit 20 concurrent
  → 4 processes × 5 async tasks = 20 concurrent solves

Troubleshooting

Symptom Cause Fix
Can't pickle error Lambda or non-picklable object passed to Pool Use module-level functions only
Processes hang on join() Worker stuck in infinite loop Add timeout to queue.get()
Results missing Shared dict not synced Use Manager().dict()
High memory usage Too many processes Reduce num_processes
ERROR_NO_SLOT_AVAILABLE Exceeding API concurrency Lower total concurrent workers

Frequently asked questions

Is multiprocessing faster than asyncio for CAPTCHA solving?

Not for pure CAPTCHA solving (I/O-bound). Multiprocessing shines when you also do CPU-heavy work like data parsing or image processing alongside solving.

How many processes should I use?

Match your CPU core count for CPU-bound work. For I/O-bound CAPTCHA solving, 2-4 processes with async inside each is optimal.

Can I share a requests.Session across processes?

No — each process needs its own Session. Sessions are not process-safe.


Summary

Python multiprocessing enables true parallel CAPTCHA solving with CaptchaAI. Use ProcessPoolExecutor for simple cases, Manager for shared state, or the hybrid multiprocessing + asyncio pattern for maximum throughput.

Full Working Code

Complete runnable examples for this article in Python, Node.js, PHP, Go, Java, C#, Ruby, Rust, Kotlin & Bash.

View on GitHub →

Discussions (0)

No comments yet.