Reference

CaptchaAI Rate Limits and Throttling

CaptchaAI supports high request volumes, but understanding capacity limits and implementing proper throttling keeps your pipeline reliable.

API Rate Limits

CaptchaAI does not enforce strict per-second rate limits. Instead, capacity is managed by worker availability:

Factor Behavior
Submit rate No hard limit per second
Concurrent tasks Handles 100+ per account
Poll frequency Recommended: every 5 seconds per task
Balance check No limit

The main constraint is worker capacity. When all workers are busy, you get ERROR_NO_SLOT_AVAILABLE.

Handling ERROR_NO_SLOT_AVAILABLE

This error means the system is at capacity. Implement exponential backoff:

Python

import time
import requests

API_KEY = "YOUR_API_KEY"


def submit_with_backoff(params, max_retries=5):
    params["key"] = API_KEY

    for attempt in range(max_retries):
        resp = requests.get(
            "https://ocr.captchaai.com/in.php", params=params
        )

        if resp.text.startswith("OK|"):
            return resp.text.split("|")[1]

        if resp.text == "ERROR_NO_SLOT_AVAILABLE":
            wait = min(2 ** attempt * 2, 60)  # 2, 4, 8, 16, 32s max 60
            print(f"No slots, waiting {wait}s (attempt {attempt + 1})")
            time.sleep(wait)
            continue

        raise Exception(f"Submit error: {resp.text}")

    raise Exception("Max retries exceeded — no slots available")

Node.js

async function submitWithBackoff(params, maxRetries = 5) {
  params.key = process.env.CAPTCHAAI_API_KEY;

  for (let attempt = 0; attempt < maxRetries; attempt++) {
    const resp = await axios.get("https://ocr.captchaai.com/in.php", {
      params,
    });
    const text = String(resp.data);

    if (text.startsWith("OK|")) {
      return text.split("|")[1];
    }

    if (text === "ERROR_NO_SLOT_AVAILABLE") {
      const wait = Math.min(2 ** attempt * 2000, 60000);
      console.log(`No slots, waiting ${wait}ms (attempt ${attempt + 1})`);
      await new Promise((r) => setTimeout(r, wait));
      continue;
    }

    throw new Error(`Submit error: ${text}`);
  }

  throw new Error("Max retries exceeded");
}

Client-Side Rate Limiting

Token Bucket (Python)

import time
import threading


class RateLimiter:
    def __init__(self, rate, per=1.0):
        """Allow `rate` requests per `per` seconds."""
        self.rate = rate
        self.per = per
        self.tokens = rate
        self.last_refill = time.monotonic()
        self.lock = threading.Lock()

    def acquire(self):
        with self.lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.rate, self.tokens + elapsed * (self.rate / self.per))
            self.last_refill = now

            if self.tokens >= 1:
                self.tokens -= 1
                return
            else:
                sleep_time = (1 - self.tokens) * (self.per / self.rate)

        time.sleep(sleep_time)
        self.acquire()


# Allow 10 submissions per second
limiter = RateLimiter(rate=10, per=1.0)

def submit_limited(params):
    limiter.acquire()
    return submit_with_backoff(params)

Asyncio Semaphore

import asyncio

# Limit to 20 concurrent tasks
semaphore = asyncio.Semaphore(20)

async def solve_limited(solver, session, params):
    async with semaphore:
        return await solver.solve(session, params)

Poll Rate Control

Don't poll more frequently than every 5 seconds per task:

async def smart_poll(session, task_id, solver):
    """Polls with adaptive intervals."""
    intervals = [5, 5, 5, 10, 10, 15, 15, 30, 30, 60]

    for wait in intervals:
        await asyncio.sleep(wait)
        result = await solver.check(session, task_id)
        if result is not None:
            return result

    raise TimeoutError(f"Task {task_id} timed out")

Monitoring and Metrics

Track your API usage to stay within limits:

import time
from collections import deque


class APIMetrics:
    def __init__(self, window=60):
        self.window = window
        self.requests = deque()
        self.errors = deque()

    def record_request(self):
        now = time.time()
        self.requests.append(now)
        self._cleanup(self.requests, now)

    def record_error(self, error_code):
        now = time.time()
        self.errors.append((now, error_code))
        self._cleanup_tuples(self.errors, now)

    def get_rate(self):
        now = time.time()
        self._cleanup(self.requests, now)
        return len(self.requests) / self.window

    def get_error_rate(self):
        now = time.time()
        self._cleanup(self.requests, now)
        self._cleanup_tuples(self.errors, now)
        if not self.requests:
            return 0
        return len(self.errors) / len(self.requests)

    def _cleanup(self, dq, now):
        while dq and dq[0] < now - self.window:
            dq.popleft()

    def _cleanup_tuples(self, dq, now):
        while dq and dq[0][0] < now - self.window:
            dq.popleft()


metrics = APIMetrics()

# Use in your submit function
def submit_tracked(params):
    metrics.record_request()
    try:
        return submit_with_backoff(params)
    except Exception as e:
        metrics.record_error(str(e))
        raise

# Check metrics periodically
print(f"Rate: {metrics.get_rate():.1f} req/s")
print(f"Error rate: {metrics.get_error_rate():.1%}")

Capacity Planning

Volume Concurrency Strategy
< 100/hour 1-5 Sequential, no rate control needed
100-1K/hour 5-20 Semaphore-based concurrency
1K-10K/hour 20-50 Async with queue, callbacks
10K+/hour 50-100 Worker pool, dedicated capacity

For volumes above 10K/hour, contact CaptchaAI support for dedicated capacity.

FAQ

Is there a daily limit?

No hard daily limit. Your limit is determined by your balance and the API's worker capacity.

What happens if I exceed capacity?

You'll receive ERROR_NO_SLOT_AVAILABLE. It's temporary — retry after a backoff period.

Should I throttle on the client side?

Yes, for predictable behavior. Use a semaphore to cap concurrent tasks and a token bucket to control submission rate.

Discussions (0)

No comments yet.