DevOps & Scaling

Rolling Updates for CAPTCHA Solving Worker Fleets

Upgrading a fleet of CAPTCHA solving workers shouldn't drop in-flight tasks. Rolling updates replace workers one at a time — draining active tasks, deploying the new version, and verifying health before moving to the next worker.

Rolling Update Flow

Workers: [W1-old] [W2-old] [W3-old] [W4-old]

Step 1:  [W1-drain] [W2-old]  [W3-old]  [W4-old]
Step 2:  [W1-NEW✓]  [W2-old]  [W3-old]  [W4-old]
Step 3:  [W1-NEW✓]  [W2-drain] [W3-old]  [W4-old]
Step 4:  [W1-NEW✓]  [W2-NEW✓]  [W3-old]  [W4-old]
  ...until all updated

Python — Rolling Update Orchestrator

import os
import time
import signal
import threading
import requests
from dataclasses import dataclass, field
from enum import Enum

API_KEY = os.environ["CAPTCHAAI_API_KEY"]


class WorkerState(Enum):
    RUNNING = "running"
    DRAINING = "draining"
    STOPPED = "stopped"
    UPDATING = "updating"


@dataclass
class Worker:
    worker_id: str
    version: str
    state: WorkerState = WorkerState.RUNNING
    active_tasks: int = 0
    tasks_completed: int = 0
    session: requests.Session = field(default_factory=requests.Session)

    def solve(self, task):
        if self.state != WorkerState.RUNNING:
            return {"error": "WORKER_NOT_ACCEPTING"}

        self.active_tasks += 1
        try:
            result = self._do_solve(task)
            self.tasks_completed += 1
            return result
        finally:
            self.active_tasks -= 1

    def _do_solve(self, task):
        resp = self.session.post("https://ocr.captchaai.com/in.php", data={
            "key": API_KEY,
            "method": task.get("method", "userrecaptcha"),
            "googlekey": task["sitekey"],
            "pageurl": task["pageurl"],
            "json": 1
        })
        data = resp.json()
        if data.get("status") != 1:
            return {"error": data.get("request")}

        captcha_id = data["request"]
        for _ in range(60):
            time.sleep(5)
            result = self.session.get(
                "https://ocr.captchaai.com/res.php",
                params={
                    "key": API_KEY,
                    "action": "get",
                    "id": captcha_id,
                    "json": 1
                }
            ).json()
            if result.get("status") == 1:
                return {"solution": result["request"]}
            if result.get("request") != "CAPCHA_NOT_READY":
                return {"error": result.get("request")}
        return {"error": "TIMEOUT"}

    def drain(self, timeout=120):
        """Stop accepting tasks and wait for active tasks to complete."""
        self.state = WorkerState.DRAINING
        start = time.time()
        while self.active_tasks > 0:
            if time.time() - start > timeout:
                print(f"Worker {self.worker_id}: drain timeout with "
                      f"{self.active_tasks} tasks remaining")
                break
            time.sleep(1)
        self.state = WorkerState.STOPPED

    @property
    def is_healthy(self):
        return self.state == WorkerState.RUNNING


class RollingUpdateOrchestrator:
    def __init__(self, workers):
        self.workers = {w.worker_id: w for w in workers}
        self.lock = threading.Lock()

    def get_available_worker(self):
        """Route tasks only to RUNNING workers."""
        with self.lock:
            for worker in self.workers.values():
                if worker.state == WorkerState.RUNNING:
                    return worker
        return None

    def rolling_update(self, new_version, health_check_fn=None,
                       max_unavailable=1, drain_timeout=120):
        """Update workers one at a time with health gates."""
        worker_ids = list(self.workers.keys())
        updated = []
        failed = []

        for i in range(0, len(worker_ids), max_unavailable):
            batch = worker_ids[i:i + max_unavailable]

            for wid in batch:
                worker = self.workers[wid]
                print(f"[{wid}] Draining (v{worker.version})...")

                # Step 1: Drain active tasks
                worker.drain(timeout=drain_timeout)

                # Step 2: "Deploy" new version
                print(f"[{wid}] Deploying v{new_version}...")
                worker.state = WorkerState.UPDATING
                worker.version = new_version
                time.sleep(2)  # Simulate deployment

                # Step 3: Start and health check
                worker.state = WorkerState.RUNNING
                if health_check_fn:
                    healthy = health_check_fn(worker)
                    if not healthy:
                        print(f"[{wid}] Health check FAILED — rolling back")
                        failed.append(wid)
                        self._rollback(updated)
                        return {
                            "status": "rolled_back",
                            "failed_at": wid,
                            "updated": updated,
                        }

                updated.append(wid)
                print(f"[{wid}] Updated to v{new_version} ✓")

        return {"status": "complete", "updated": updated, "failed": failed}

    def _rollback(self, updated_ids):
        """Roll back already-updated workers."""
        for wid in updated_ids:
            worker = self.workers[wid]
            print(f"[{wid}] Rolling back...")
            worker.state = WorkerState.STOPPED
            time.sleep(1)
            worker.version = "rollback"
            worker.state = WorkerState.RUNNING

    @property
    def status(self):
        return {
            wid: {
                "version": w.version,
                "state": w.state.value,
                "active_tasks": w.active_tasks,
            }
            for wid, w in self.workers.items()
        }


# Create fleet
workers = [Worker(f"w{i}", "1.2.0") for i in range(6)]
orchestrator = RollingUpdateOrchestrator(workers)


def health_check(worker):
    """Verify worker can solve a test CAPTCHA."""
    # In production, send a real test task
    return worker.state == WorkerState.RUNNING


# Execute rolling update
result = orchestrator.rolling_update(
    new_version="1.3.0",
    health_check_fn=health_check,
    max_unavailable=1,
    drain_timeout=60
)
print(f"Rolling update result: {result}")

JavaScript — Rolling Update with Progress Tracking

const axios = require("axios");

const API_KEY = process.env.CAPTCHAAI_API_KEY;

class RollingUpdater {
  constructor(workerCount, currentVersion) {
    this.workers = Array.from({ length: workerCount }, (_, i) => ({
      id: `worker-${i}`,
      version: currentVersion,
      state: "running",
      activeTasks: 0,
    }));
    this.progress = { total: workerCount, completed: 0, failed: 0 };
  }

  async update(newVersion, options = {}) {
    const {
      maxUnavailable = 1,
      drainTimeout = 60000,
      healthCheckRetries = 3,
    } = options;

    console.log(
      `Starting rolling update: v${this.workers[0].version} → v${newVersion}`
    );

    for (let i = 0; i < this.workers.length; i += maxUnavailable) {
      const batch = this.workers.slice(i, i + maxUnavailable);

      for (const worker of batch) {
        try {
          // Drain
          console.log(`[${worker.id}] Draining...`);
          worker.state = "draining";
          await this.waitForDrain(worker, drainTimeout);

          // Deploy
          console.log(`[${worker.id}] Deploying v${newVersion}...`);
          worker.state = "updating";
          worker.version = newVersion;

          // Health check
          worker.state = "running";
          const healthy = await this.healthCheck(worker, healthCheckRetries);

          if (!healthy) {
            worker.state = "failed";
            this.progress.failed++;
            console.log(`[${worker.id}] FAILED health check`);

            if (this.progress.failed > Math.floor(this.workers.length * 0.25)) {
              console.log("Too many failures — aborting rolling update");
              return { status: "aborted", progress: this.progress };
            }
            continue;
          }

          this.progress.completed++;
          console.log(
            `[${worker.id}] Updated ✓ (${this.progress.completed}/${this.progress.total})`
          );
        } catch (err) {
          console.error(`[${worker.id}] Error: ${err.message}`);
          this.progress.failed++;
        }
      }
    }

    return { status: "complete", progress: this.progress };
  }

  async waitForDrain(worker, timeout) {
    const start = Date.now();
    while (worker.activeTasks > 0 && Date.now() - start < timeout) {
      await new Promise((r) => setTimeout(r, 1000));
    }
  }

  async healthCheck(worker, retries) {
    for (let attempt = 0; attempt < retries; attempt++) {
      try {
        const resp = await axios.get("https://ocr.captchaai.com/res.php", {
          params: { key: API_KEY, action: "getbalance", json: 1 },
          timeout: 10000,
        });
        if (resp.data.status === 1) return true;
      } catch {
        // Retry
      }
      await new Promise((r) => setTimeout(r, 5000));
    }
    return false;
  }
}

// Execute
const updater = new RollingUpdater(8, "1.2.0");
updater
  .update("1.3.0", { maxUnavailable: 2, drainTimeout: 30000 })
  .then((result) => console.log("Result:", JSON.stringify(result, null, 2)));

Update Strategies Compared

Strategy Downtime Rollback Speed Complexity Best For
Rolling None Moderate Low Most deployments
Blue-Green None Instant Medium Critical services
Canary None Fast High Large fleets (50+ workers)
Recreate Brief N/A Lowest Dev environments

Troubleshooting

Issue Cause Fix
Tasks dropped during update Drain timeout too short Increase drain_timeout; check max task duration
Health check always fails after update New version has a bug Roll back; test new version in staging first
Update takes too long maxUnavailable too low with many workers Increase maxUnavailable to 2-3 for large fleets
Rollback leaves mixed versions Partial rollback incomplete Track which workers were updated; rollback all

FAQ

What's a good maxUnavailable setting?

For small fleets (< 10 workers), update 1 at a time. For larger fleets, use 10–25% of total workers. Never exceed 50% — you need enough capacity to handle traffic.

How long should drain timeout be?

Set it to your maximum CAPTCHA solve time plus a buffer. For reCAPTCHA v2 (typically 30–90 seconds), a 120-second drain timeout is safe.

Should I run integration tests after each worker update?

Yes for critical systems. Run a real CAPTCHA solve against each newly-updated worker before moving to the next one. This catches configuration errors early.

Next Steps

Deploy updates without downtime — get your CaptchaAI API key and implement rolling updates.

Related guides:

Full Working Code

Complete runnable examples for this article in Python, Node.js, PHP, Go, Java, C#, Ruby, Rust, Kotlin & Bash.

View on GitHub →

Discussions (0)

No comments yet.