Tutorials

Queue-Based Batch CAPTCHA Processing with Priority Levels

When multiple workflows submit CAPTCHA tasks simultaneously, a flat FIFO queue treats all tasks equally. A priority queue ensures time-sensitive tasks — like user-facing form submissions — get solved before background batch jobs.

Priority Levels

Priority Use Case Target Solve Time
critical (0) User waiting in real-time (checkout, login) < 30 seconds
high (1) Time-sensitive automation (price monitoring) < 60 seconds
normal (2) Standard batch processing < 120 seconds
low (3) Background jobs, non-urgent scraping Best effort

Lower number = higher priority.

Python Priority Queue

import time
import heapq
import threading
import requests
from dataclasses import dataclass, field
from typing import Any

API_KEY = "YOUR_API_KEY"
SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"


@dataclass(order=True)
class CaptchaTask:
    priority: int
    created_at: float = field(compare=False)
    task_data: dict = field(compare=False)
    callback: Any = field(default=None, compare=False, repr=False)
    sequence: int = field(default=0)  # Tie-breaker for same-priority tasks


class PriorityQueueSolver:
    def __init__(self, max_workers=10):
        self.queue = []
        self.lock = threading.Lock()
        self.max_workers = max_workers
        self.active_workers = 0
        self.sequence = 0
        self.running = True
        self.stats = {"submitted": 0, "solved": 0, "failed": 0}

        # Start dispatcher thread
        self.dispatcher = threading.Thread(target=self._dispatch_loop, daemon=True)
        self.dispatcher.start()

    def submit(self, task_data, priority=2, callback=None):
        """Add a CAPTCHA task to the priority queue."""
        with self.lock:
            self.sequence += 1
            task = CaptchaTask(
                priority=priority,
                created_at=time.monotonic(),
                task_data=task_data,
                callback=callback,
                sequence=self.sequence,
            )
            heapq.heappush(self.queue, task)
            self.stats["submitted"] += 1
        return task

    def _dispatch_loop(self):
        """Continuously pull tasks from the queue and process them."""
        while self.running:
            with self.lock:
                if self.active_workers >= self.max_workers or not self.queue:
                    pass
                else:
                    task = heapq.heappop(self.queue)
                    self.active_workers += 1
                    threading.Thread(
                        target=self._process_task, args=(task,), daemon=True
                    ).start()
            time.sleep(0.1)

    def _process_task(self, task):
        """Submit and poll a single CAPTCHA task."""
        try:
            data = task.task_data
            params = {"key": API_KEY, "json": 1}

            captcha_type = data.get("type", "recaptcha_v2")
            if captcha_type == "recaptcha_v2":
                params.update({
                    "method": "userrecaptcha",
                    "googlekey": data["sitekey"],
                    "pageurl": data["pageurl"],
                })
            elif captcha_type == "turnstile":
                params.update({
                    "method": "turnstile",
                    "sitekey": data["sitekey"],
                    "pageurl": data["pageurl"],
                })
            elif captcha_type == "hcaptcha":
                params.update({
                    "method": "hcaptcha",
                    "sitekey": data["sitekey"],
                    "pageurl": data["pageurl"],
                })

            # Submit
            response = requests.post(SUBMIT_URL, data=params, timeout=30)
            result = response.json()

            if result.get("status") != 1:
                self._complete(task, None, result.get("request", "submit failed"))
                return

            task_id = result["request"]

            # Poll
            for _ in range(60):
                time.sleep(5)
                poll = requests.get(RESULT_URL, params={
                    "key": API_KEY, "action": "get",
                    "id": task_id, "json": 1,
                }, timeout=15).json()

                if poll.get("request") == "CAPCHA_NOT_READY":
                    continue
                if poll.get("status") == 1:
                    self._complete(task, poll["request"], None)
                    return

                self._complete(task, None, poll.get("request", "poll error"))
                return

            self._complete(task, None, "TIMEOUT")

        except Exception as e:
            self._complete(task, None, str(e))
        finally:
            with self.lock:
                self.active_workers -= 1

    def _complete(self, task, token, error):
        """Handle task completion."""
        with self.lock:
            if token:
                self.stats["solved"] += 1
            else:
                self.stats["failed"] += 1

        wait_time = time.monotonic() - task.created_at
        priority_name = ["critical", "high", "normal", "low"][task.priority]

        if token:
            print(f"  [{priority_name}] Solved in {wait_time:.1f}s (token={len(token)} chars)")
        else:
            print(f"  [{priority_name}] Failed: {error} ({wait_time:.1f}s)")

        if task.callback:
            task.callback(token, error)

    def queue_size(self):
        return len(self.queue)

    def get_stats(self):
        return {**self.stats, "queue_size": len(self.queue), "active": self.active_workers}

    def shutdown(self, wait=True):
        """Stop accepting new tasks and optionally wait for active ones."""
        self.running = False
        if wait:
            while self.active_workers > 0:
                time.sleep(0.5)


# Usage
solver = PriorityQueueSolver(max_workers=10)

# Critical: user-facing checkout
solver.submit(
    {"type": "recaptcha_v2", "sitekey": "SITE_KEY", "pageurl": "https://checkout.example.com"},
    priority=0,
    callback=lambda token, err: print(f"Checkout: {'OK' if token else err}"),
)

# Normal: batch processing
for i in range(5):
    solver.submit(
        {"type": "recaptcha_v2", "sitekey": "SITE_KEY", "pageurl": "https://example.com/page"},
        priority=2,
    )

# Low: background job
solver.submit(
    {"type": "turnstile", "sitekey": "SITE_KEY", "pageurl": "https://example.com"},
    priority=3,
)

# Wait for all tasks
time.sleep(120)
print(solver.get_stats())
solver.shutdown()

JavaScript Priority Queue (Node.js)

const API_KEY = "YOUR_API_KEY";
const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";

class PriorityQueueSolver {
  constructor(maxWorkers = 10) {
    this.queue = []; // [{ priority, sequence, taskData, resolve, reject }]
    this.maxWorkers = maxWorkers;
    this.activeWorkers = 0;
    this.sequence = 0;
    this.stats = { submitted: 0, solved: 0, failed: 0 };
    this.running = true;
    this._dispatchLoop();
  }

  solve(taskData, priority = 2) {
    return new Promise((resolve, reject) => {
      this.queue.push({
        priority,
        sequence: this.sequence++,
        taskData,
        resolve,
        reject,
        createdAt: Date.now(),
      });
      // Keep sorted: lower priority number = first
      this.queue.sort((a, b) => a.priority - b.priority || a.sequence - b.sequence);
      this.stats.submitted++;
    });
  }

  async _dispatchLoop() {
    while (this.running) {
      if (this.activeWorkers < this.maxWorkers && this.queue.length > 0) {
        const task = this.queue.shift();
        this.activeWorkers++;
        this._processTask(task).finally(() => this.activeWorkers--);
      }
      await new Promise((r) => setTimeout(r, 100));
    }
  }

  async _processTask(task) {
    const priorityNames = ["critical", "high", "normal", "low"];
    const pName = priorityNames[task.priority] || "unknown";

    try {
      const params = { key: API_KEY, json: 1 };
      const type = task.taskData.type || "recaptcha_v2";

      if (type === "recaptcha_v2") {
        Object.assign(params, {
          method: "userrecaptcha",
          googlekey: task.taskData.sitekey,
          pageurl: task.taskData.pageurl,
        });
      } else if (type === "turnstile") {
        Object.assign(params, {
          method: "turnstile",
          sitekey: task.taskData.sitekey,
          pageurl: task.taskData.pageurl,
        });
      }

      // Submit
      const response = await fetch(SUBMIT_URL, {
        method: "POST",
        body: new URLSearchParams(params),
      });
      const result = await response.json();
      if (result.status !== 1) throw new Error(result.request || "Submit failed");

      const taskId = result.request;

      // Poll
      for (let i = 0; i < 60; i++) {
        await new Promise((r) => setTimeout(r, 5000));
        const url = new URL(RESULT_URL);
        url.searchParams.set("key", API_KEY);
        url.searchParams.set("action", "get");
        url.searchParams.set("id", taskId);
        url.searchParams.set("json", "1");

        const poll = await (await fetch(url)).json();
        if (poll.request === "CAPCHA_NOT_READY") continue;
        if (poll.status === 1) {
          const elapsed = ((Date.now() - task.createdAt) / 1000).toFixed(1);
          console.log(`  [${pName}] Solved in ${elapsed}s`);
          this.stats.solved++;
          task.resolve(poll.request);
          return;
        }
        throw new Error(poll.request || "Poll failed");
      }
      throw new Error("TIMEOUT");
    } catch (err) {
      const elapsed = ((Date.now() - task.createdAt) / 1000).toFixed(1);
      console.log(`  [${pName}] Failed: ${err.message} (${elapsed}s)`);
      this.stats.failed++;
      task.reject(err);
    }
  }

  getStats() {
    return { ...this.stats, queueSize: this.queue.length, active: this.activeWorkers };
  }

  shutdown() {
    this.running = false;
  }
}

// Usage
const solver = new PriorityQueueSolver(10);

// Critical priority
solver.solve(
  { type: "recaptcha_v2", sitekey: "SITE_KEY", pageurl: "https://checkout.example.com" },
  0 // critical
).then((token) => console.log("Checkout token ready"));

// Normal priority batch
for (let i = 0; i < 5; i++) {
  solver.solve(
    { type: "recaptcha_v2", sitekey: "SITE_KEY", pageurl: "https://example.com" },
    2 // normal
  ).catch((err) => console.error(`Batch ${i} failed: ${err.message}`));
}

Concurrency Allocation by Priority

Reserve worker slots for high-priority tasks to prevent starvation:

Priority Reserved Workers (out of 10) Shared Workers
Critical 2 (always available) Can use any idle worker
High 2 Can use any non-reserved worker
Normal 0 Uses remaining idle workers
Low 0 Only runs when no other tasks waiting

Backpressure Handling

When the queue grows too large:

Queue Size Action
< 100 Normal operation
100–500 Log warning; reject low priority tasks
500–1000 Reject low and normal; only accept high/critical
> 1000 Reject all new tasks; drain existing queue
def submit(self, task_data, priority=2, callback=None):
    queue_size = len(self.queue)

    if queue_size > 1000:
        raise QueueFullError("Queue capacity exceeded")
    if queue_size > 500 and priority > 1:
        raise QueueFullError(f"Only high/critical priority accepted (queue={queue_size})")
    if queue_size > 100 and priority > 2:
        raise QueueFullError(f"Low priority rejected (queue={queue_size})")

    # ... normal submit logic

Troubleshooting

Issue Cause Fix
Low-priority tasks never execute High-priority tasks constantly arriving Reserve a minimum slot for low priority; add starvation timeout
Critical tasks wait behind normal tasks Priority not being respected Verify heap ordering; check that dispatcher pops highest priority first
Queue grows unbounded Submission rate exceeds solve rate Add backpressure; increase max_workers; reduce submission rate
Tasks solved but callback not called Exception in callback function Wrap callbacks in try/catch
All tasks timing out API key issue or network problem Check balance; verify API connectivity

FAQ

What happens if a critical task fails?

Implement retry logic for critical tasks — automatically resubmit with the same priority. Normal and low tasks can fail without retry to preserve capacity for critical work.

Can I change a task's priority after submission?

With a heap-based queue, you'd need to remove and re-insert the task. In practice, it's simpler to cancel the original and submit a new task at the desired priority.

How do I prevent priority starvation of low-priority tasks?

Add an age-based boost: increase a task's effective priority by 1 level for every 60 seconds it waits. A low task waiting 3 minutes becomes critical priority, ensuring it eventually runs.

Next Steps

Build priority-based CAPTCHA processing into your system — get your CaptchaAI API key and start queuing tasks by importance.

Related guides:

Full Working Code

Complete runnable examples for this article in Python, Node.js, PHP, Go, Java, C#, Ruby, Rust, Kotlin & Bash.

View on GitHub →

Discussions (0)

No comments yet.