DevOps & Scaling

NATS Messaging + CaptchaAI: Lightweight CAPTCHA Task Distribution

NATS is a lightweight, high-performance messaging system — no JVM, no disk persistence by default, sub-millisecond latency. For CAPTCHA task distribution where you need speed and simplicity over Kafka's durability, NATS is an ideal fit.

Why NATS for CAPTCHA Tasks

Feature NATS Kafka RabbitMQ
Latency < 1 ms 5-10 ms 1-5 ms
Setup complexity Single binary Cluster + ZooKeeper Moderate
Memory footprint ~20 MB ~1 GB+ ~200 MB
Persistence Optional (JetStream) Built-in Built-in
Best for Ephemeral tasks, low latency Durable streaming Complex routing

CAPTCHA tasks are ephemeral — if a task is lost, you re-submit. NATS's simplicity and speed make it a natural choice.

Architecture

[Scrapers] → Publish → [NATS: captcha.tasks]
                              ↓
                    Queue Group: captcha-workers
                    ├── Worker 1 (solve via CaptchaAI)
                    ├── Worker 2
                    └── Worker 3
                              ↓
                    Publish → [NATS: captcha.results]
                              ↓
                    [Result Subscribers]

NATS queue groups automatically distribute messages across workers — each task goes to exactly one worker.

Prerequisites

# Install NATS server
# macOS
brew install nats-server

# Linux
curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.0/nats-server-v2.10.0-linux-amd64.tar.gz | tar xz

# Start
nats-server

# Python client
pip install nats-py

# Node.js client
npm install nats

Task Publisher (Scraper)

Python

import asyncio
import json
import nats


async def publish_captcha_tasks():
    nc = await nats.connect("nats://localhost:4222")

    tasks = [
        {
            "task_id": f"task_{i}",
            "method": "userrecaptcha",
            "sitekey": "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
            "pageurl": f"https://example.com/page/{i}"
        }
        for i in range(100)
    ]

    for task in tasks:
        await nc.publish("captcha.tasks", json.dumps(task).encode())
        print(f"Published: {task['task_id']}")

    await nc.flush()
    await nc.close()


asyncio.run(publish_captcha_tasks())

JavaScript

const { connect, StringCodec } = require("nats");

const sc = StringCodec();

async function publishCaptchaTasks() {
  const nc = await connect({ servers: "nats://localhost:4222" });

  for (let i = 0; i < 100; i++) {
    const task = {
      task_id: `task_${i}`,
      method: "userrecaptcha",
      sitekey: "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
      pageurl: `https://example.com/page/${i}`,
    };

    nc.publish("captcha.tasks", sc.encode(JSON.stringify(task)));
    console.log(`Published: ${task.task_id}`);
  }

  await nc.flush();
  await nc.close();
}

publishCaptchaTasks();

CAPTCHA Worker (Queue Group Subscriber)

Queue groups ensure each message goes to exactly one worker, even with multiple workers running.

Python

import asyncio
import json
import os
import nats
import aiohttp

API_KEY = os.environ["CAPTCHAAI_API_KEY"]


async def solve_captcha(session, task):
    """Submit to CaptchaAI and poll for result."""
    # Submit
    async with session.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": task["method"],
        "googlekey": task["sitekey"],
        "pageurl": task["pageurl"],
        "json": 1
    }) as resp:
        data = await resp.json(content_type=None)

    if data.get("status") != 1:
        return {"task_id": task["task_id"], "error": data.get("request")}

    captcha_id = data["request"]

    # Poll for result
    for _ in range(60):
        await asyncio.sleep(5)
        async with session.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY, "action": "get", "id": captcha_id, "json": 1
        }) as resp:
            result = await resp.json(content_type=None)

        if result.get("status") == 1:
            return {"task_id": task["task_id"], "solution": result["request"]}
        if result.get("request") != "CAPCHA_NOT_READY":
            return {"task_id": task["task_id"], "error": result.get("request")}

    return {"task_id": task["task_id"], "error": "TIMEOUT"}


async def worker(worker_id):
    nc = await nats.connect("nats://localhost:4222")

    # Subscribe with queue group — each message goes to one worker only
    sub = await nc.subscribe("captcha.tasks", queue="captcha-workers")

    print(f"Worker {worker_id} listening...")

    async with aiohttp.ClientSession() as session:
        async for msg in sub.messages:
            task = json.loads(msg.data.decode())
            print(f"Worker {worker_id} processing {task['task_id']}")

            result = await solve_captcha(session, task)

            # Publish result
            await nc.publish(
                "captcha.results",
                json.dumps(result).encode()
            )

            status = "solved" if "solution" in result else result.get("error")
            print(f"  → {task['task_id']}: {status}")


asyncio.run(worker(1))

JavaScript

const { connect, StringCodec } = require("nats");
const axios = require("axios");

const sc = StringCodec();
const API_KEY = process.env.CAPTCHAAI_API_KEY;

function sleep(ms) {
  return new Promise((r) => setTimeout(r, ms));
}

async function solveCaptcha(task) {
  const submitResp = await axios.post(
    "https://ocr.captchaai.com/in.php",
    null,
    {
      params: {
        key: API_KEY,
        method: task.method,
        googlekey: task.sitekey,
        pageurl: task.pageurl,
        json: 1,
      },
    }
  );

  if (submitResp.data.status !== 1) {
    return { task_id: task.task_id, error: submitResp.data.request };
  }

  const captchaId = submitResp.data.request;

  for (let i = 0; i < 60; i++) {
    await sleep(5000);
    const result = await axios.get("https://ocr.captchaai.com/res.php", {
      params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
    });

    if (result.data.status === 1) {
      return { task_id: task.task_id, solution: result.data.request };
    }
    if (result.data.request !== "CAPCHA_NOT_READY") {
      return { task_id: task.task_id, error: result.data.request };
    }
  }

  return { task_id: task.task_id, error: "TIMEOUT" };
}

async function worker(workerId) {
  const nc = await connect({ servers: "nats://localhost:4222" });

  // Queue group subscription — load-balanced across workers
  const sub = nc.subscribe("captcha.tasks", { queue: "captcha-workers" });

  console.log(`Worker ${workerId} listening...`);

  for await (const msg of sub) {
    const task = JSON.parse(sc.decode(msg.data));
    console.log(`Worker ${workerId} processing ${task.task_id}`);

    const result = await solveCaptcha(task);
    nc.publish("captcha.results", sc.encode(JSON.stringify(result)));

    const status = result.solution ? "solved" : result.error;
    console.log(`  → ${task.task_id}: ${status}`);
  }
}

worker(1);

Result Collector

async def collect_results():
    nc = await nats.connect("nats://localhost:4222")
    sub = await nc.subscribe("captcha.results")

    solved = 0
    failed = 0

    async for msg in sub.messages:
        result = json.loads(msg.data.decode())

        if "solution" in result:
            solved += 1
            print(f"[SOLVED] {result['task_id']} — {result['solution'][:30]}...")
        else:
            failed += 1
            print(f"[FAILED] {result['task_id']} — {result['error']}")

        print(f"  Stats: {solved} solved, {failed} failed")

asyncio.run(collect_results())

NATS JetStream for Durability

For tasks that must not be lost, enable JetStream persistence:

async def durable_publisher():
    nc = await nats.connect("nats://localhost:4222")
    js = nc.jetstream()

    # Create stream (one-time setup)
    await js.add_stream(name="CAPTCHA", subjects=["captcha.>"])

    # Publish with acknowledgment
    ack = await js.publish("captcha.tasks", json.dumps(task).encode())
    print(f"Published to stream, seq={ack.seq}")

JetStream adds disk persistence, replay capability, and exactly-once delivery — similar to Kafka but with NATS's simplicity.

Scaling Workers

# Run multiple workers — NATS distributes automatically via queue groups
python worker.py --id=1 &
python worker.py --id=2 &
python worker.py --id=3 &

# Each task goes to exactly one worker
# Add more workers to increase throughput

Troubleshooting

Issue Cause Fix
Messages dropped NATS core pub/sub doesn't buffer for slow consumers Use JetStream for persistence, or increase consumer capacity
Worker not receiving messages Wrong queue group name or subject Verify subject and queue group match publisher
Connection reset NATS server restart Enable auto-reconnect in client options
Uneven distribution One worker processing faster than others Normal — NATS distributes to available workers; faster workers get more

FAQ

When should I use NATS instead of Redis or Kafka?

Use NATS when you want minimal infrastructure (single binary, no dependencies), sub-millisecond latency, and don't need persistent message storage. Use Kafka for durable streaming, Redis for caching + pub/sub combo.

Can NATS handle 10,000+ CAPTCHA tasks per hour?

Easily. NATS handles millions of messages per second. The bottleneck will be CaptchaAI solve times, not NATS throughput.

Do I need JetStream?

Only if you need persistence. For most CAPTCHA workflows, core NATS is sufficient — if a task is lost, you can re-submit. Enable JetStream for audit trails or exactly-once processing requirements.

Next Steps

Distribute CAPTCHA tasks with NATS — get your CaptchaAI API key and spin up lightweight workers.

Related guides:

Full Working Code

Complete runnable examples for this article in Python, Node.js, PHP, Go, Java, C#, Ruby, Rust, Kotlin & Bash.

View on GitHub →

Discussions (0)

No comments yet.