Tutorials

Plugin Architecture for CAPTCHA Solving Pipelines

A scraping pipeline starts simple: detect CAPTCHA, solve it, inject the token. Then you need logging. Then proxy rotation before solving. Then token caching after solving. Each feature tangles with the core loop. A plugin architecture lets you add behaviour at defined hook points without modifying the pipeline itself.

Pipeline Lifecycle Hooks

A CAPTCHA solve goes through four phases — plugins attach to any of them:

  1. before_submit — Modify parameters, select proxy, log the request
  2. after_submit — Record task ID, start timing
  3. before_result — Check cache before polling
  4. after_result — Cache token, log timing, validate response

Python: Hook-Based Pipeline

import requests
import time
from dataclasses import dataclass, field
from typing import Callable

API_KEY = "YOUR_API_KEY"
SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"


@dataclass
class SolveContext:
    """Shared state flowing through the pipeline."""
    params: dict
    task_id: str | None = None
    result: str | None = None
    metadata: dict = field(default_factory=dict)
    skip_poll: bool = False


class CaptchaPipeline:
    """Plugin-based CAPTCHA solving pipeline."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self._hooks: dict[str, list[Callable]] = {
            "before_submit": [],
            "after_submit": [],
            "before_result": [],
            "after_result": [],
        }

    def register(self, hook: str, handler: Callable):
        """Register a handler for a lifecycle hook."""
        if hook not in self._hooks:
            raise ValueError(f"Unknown hook: {hook}. Valid: {list(self._hooks)}")
        self._hooks[hook].append(handler)

    def plugin(self, hook: str):
        """Decorator to register a plugin."""
        def decorator(fn):
            self.register(hook, fn)
            return fn
        return decorator

    def _run_hooks(self, hook: str, ctx: SolveContext):
        for handler in self._hooks[hook]:
            handler(ctx)

    def solve(self, params: dict, timeout: int = 180) -> str:
        ctx = SolveContext(params=params)

        # Phase 1: before_submit
        self._run_hooks("before_submit", ctx)

        # Phase 2: submit
        submit_params = {**ctx.params, "key": self.api_key, "json": 1}
        resp = requests.post(SUBMIT_URL, data=submit_params, timeout=30).json()
        if resp.get("status") != 1:
            raise RuntimeError(f"Submit failed: {resp.get('request')}")
        ctx.task_id = resp["request"]

        self._run_hooks("after_submit", ctx)

        # Phase 3: before_result — plugin may set skip_poll with cached result
        self._run_hooks("before_result", ctx)

        if ctx.skip_poll and ctx.result:
            return ctx.result

        # Phase 4: poll
        start = time.monotonic()
        while time.monotonic() - start < timeout:
            time.sleep(5)
            poll = requests.get(RESULT_URL, params={
                "key": self.api_key, "action": "get",
                "id": ctx.task_id, "json": 1,
            }, timeout=15).json()

            if poll.get("request") == "CAPCHA_NOT_READY":
                continue
            if poll.get("status") == 1:
                ctx.result = poll["request"]
                self._run_hooks("after_result", ctx)
                return ctx.result
            raise RuntimeError(f"Solve failed: {poll.get('request')}")

        raise RuntimeError("Timeout")


# --- Plugins ---

pipeline = CaptchaPipeline("YOUR_API_KEY")


@pipeline.plugin("before_submit")
def logging_plugin(ctx: SolveContext):
    """Log every solve request."""
    method = ctx.params.get("method", "unknown")
    print(f"[LOG] Solving {method} CAPTCHA")
    ctx.metadata["start_time"] = time.monotonic()


@pipeline.plugin("after_result")
def timing_plugin(ctx: SolveContext):
    """Record solve duration."""
    start = ctx.metadata.get("start_time")
    if start:
        duration = time.monotonic() - start
        ctx.metadata["duration_s"] = round(duration, 2)
        print(f"[LOG] Solved in {duration:.1f}s")


@pipeline.plugin("before_submit")
def proxy_plugin(ctx: SolveContext):
    """Attach a proxy to every request."""
    ctx.params.setdefault("proxy", "http://user:pass@proxy.example.com:8080")
    ctx.params.setdefault("proxytype", "HTTP")


# Token cache plugin
_cache: dict[str, tuple[str, float]] = {}  # key -> (token, expiry)
CACHE_TTL = 90  # seconds


@pipeline.plugin("before_result")
def cache_check_plugin(ctx: SolveContext):
    """Return cached token if still valid."""
    cache_key = f"{ctx.params.get('method')}:{ctx.params.get('pageurl')}"
    if cache_key in _cache:
        token, expiry = _cache[cache_key]
        if time.monotonic() < expiry:
            ctx.result = token
            ctx.skip_poll = True
            print("[CACHE] Returning cached token")


@pipeline.plugin("after_result")
def cache_store_plugin(ctx: SolveContext):
    """Cache solved tokens."""
    cache_key = f"{ctx.params.get('method')}:{ctx.params.get('pageurl')}"
    _cache[cache_key] = (ctx.result, time.monotonic() + CACHE_TTL)


# --- Usage ---
token = pipeline.solve({
    "method": "turnstile",
    "sitekey": "0x4XXXXXXXXXXXXXXXXX",
    "pageurl": "https://example.com/login",
})
print(f"Token: {token[:30]}...")

JavaScript: Event-Based Pipeline

const API_KEY = "YOUR_API_KEY";
const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";

class CaptchaPipeline {
  #hooks = { beforeSubmit: [], afterSubmit: [], beforeResult: [], afterResult: [] };
  #apiKey;

  constructor(apiKey) {
    this.#apiKey = apiKey;
  }

  on(hook, handler) {
    if (!this.#hooks[hook]) throw new Error(`Unknown hook: ${hook}`);
    this.#hooks[hook].push(handler);
    return this; // chainable
  }

  async #runHooks(hook, ctx) {
    for (const handler of this.#hooks[hook]) {
      await handler(ctx);
    }
  }

  async solve(params) {
    const ctx = { params: { ...params }, taskId: null, result: null, meta: {}, skipPoll: false };

    await this.#runHooks("beforeSubmit", ctx);

    const body = new URLSearchParams({ key: this.#apiKey, json: "1", ...ctx.params });
    const resp = await (await fetch(SUBMIT_URL, { method: "POST", body })).json();
    if (resp.status !== 1) throw new Error(`Submit: ${resp.request}`);
    ctx.taskId = resp.request;

    await this.#runHooks("afterSubmit", ctx);
    await this.#runHooks("beforeResult", ctx);

    if (ctx.skipPoll && ctx.result) return ctx.result;

    for (let i = 0; i < 60; i++) {
      await new Promise((r) => setTimeout(r, 5000));
      const url = `${RESULT_URL}?key=${this.#apiKey}&action=get&id=${ctx.taskId}&json=1`;
      const poll = await (await fetch(url)).json();
      if (poll.request === "CAPCHA_NOT_READY") continue;
      if (poll.status === 1) {
        ctx.result = poll.request;
        await this.#runHooks("afterResult", ctx);
        return ctx.result;
      }
      throw new Error(`Solve: ${poll.request}`);
    }
    throw new Error("Timeout");
  }
}

// Register plugins
const pipeline = new CaptchaPipeline("YOUR_API_KEY");

pipeline
  .on("beforeSubmit", (ctx) => {
    ctx.meta.startTime = Date.now();
    console.log(`[LOG] Solving ${ctx.params.method}`);
  })
  .on("afterResult", (ctx) => {
    const ms = Date.now() - ctx.meta.startTime;
    console.log(`[LOG] Solved in ${ms}ms`);
  })
  .on("beforeSubmit", (ctx) => {
    ctx.params.proxy = ctx.params.proxy || "http://user:pass@proxy.example.com:8080";
    ctx.params.proxytype = ctx.params.proxytype || "HTTP";
  });

// Usage
const token = await pipeline.solve({
  method: "turnstile",
  sitekey: "0x4XXXXXXXXXXXXXXXXX",
  pageurl: "https://example.com/login",
});

Plugin Ordering

Plugins run in registration order. Control execution priority by registering in the right sequence:

# Order matters — proxy must be set before the rate limiter checks
pipeline.register("before_submit", proxy_plugin)      # runs first
pipeline.register("before_submit", rate_limit_plugin)  # runs second
pipeline.register("before_submit", logging_plugin)     # runs third

For priority-based ordering, store (priority, handler) tuples and sort before execution.

Troubleshooting

Issue Cause Fix
Plugin modifies wrong field Context field name mismatch Use SolveContext dataclass fields consistently
skip_poll set but no result Cache plugin sets flag without setting ctx.result Always set both skip_poll = True and ctx.result together
Plugin execution order wrong Registration order determines execution Register plugins in the order they should run
Async plugin blocks pipeline Synchronous handler in async pipeline Use async def handlers and await them
Plugin error crashes pipeline No error isolation Wrap hook execution in try/except per handler

FAQ

How many plugins can I register per hook?

No limit. Each hook maintains an ordered list. Performance impact is negligible for dozens of plugins — the CaptchaAI API call itself is the bottleneck.

Can a plugin cancel the solve?

Yes. A before_submit plugin can raise an exception to abort. Or set a flag in ctx.metadata that downstream code checks. For cleaner cancellation, add a ctx.cancelled boolean and check it between phases.

How does this differ from middleware?

Middleware forms a chain where each layer wraps the next. Plugins hook into specific lifecycle points without wrapping. Middleware is better for request/response transformation; plugins are better for side effects at defined moments.

Next Steps

Build extensible CAPTCHA solving pipelines — get your CaptchaAI API key and register your first plugin.

Related guides:

Full Working Code

Complete runnable examples for this article in Python, Node.js, PHP, Go, Java, C#, Ruby, Rust, Kotlin & Bash.

View on GitHub →

Discussions (0)

No comments yet.