Use Cases

Multi-Step Workflow Automation with CaptchaAI

Manage workflows across multiple accounts on CAPTCHA-protected platforms — login, action, and data collection at scale.


Use Cases

Scenario Example
Social media management Post/monitor across multiple brand accounts
E-commerce operations Update listings on multiple seller accounts
Data collection Gather data from accounts on different platforms
QA testing Test user flows with different account types
Account monitoring Check account status and notifications

Architecture

┌────────────────┐     ┌──────────────┐     ┌───────────┐     ┌───────────────┐
│ Account        │────▶│ Session      │────▶│ CAPTCHA   │────▶│ Workflow      │
│ Registry       │     │ Manager      │     │ Solver    │     │ Executor      │
│ (credentials)  │     │ (cookies,    │     │           │     │ (per-account  │
│                │     │  proxies)    │     │           │     │  actions)     │
└────────────────┘     └──────────────┘     └───────────┘     └───────────────┘

Implementation

Account Registry

import json
from dataclasses import dataclass, field, asdict
from typing import Optional


@dataclass
class Account:
    id: str
    platform: str
    username: str
    password: str
    proxy: Optional[str] = None
    cookies: dict = field(default_factory=dict)
    last_login: Optional[float] = None
    status: str = "active"


class AccountRegistry:
    def __init__(self, filepath="accounts.json"):
        self.filepath = filepath
        self.accounts = {}
        self._load()

    def _load(self):
        try:
            with open(self.filepath, "r") as f:
                data = json.load(f)
                for item in data:
                    acct = Account(**item)
                    self.accounts[acct.id] = acct
        except FileNotFoundError:
            pass

    def save(self):
        data = [asdict(a) for a in self.accounts.values()]
        with open(self.filepath, "w") as f:
            json.dump(data, f, indent=2)

    def add(self, account):
        self.accounts[account.id] = account
        self.save()

    def get(self, account_id):
        return self.accounts.get(account_id)

    def get_by_platform(self, platform):
        return [a for a in self.accounts.values() if a.platform == platform and a.status == "active"]

    def update_status(self, account_id, status):
        if account_id in self.accounts:
            self.accounts[account_id].status = status
            self.save()

Session Manager

import time
import requests
import pickle
import os


class SessionManager:
    def __init__(self, sessions_dir="sessions"):
        self.sessions_dir = sessions_dir
        os.makedirs(sessions_dir, exist_ok=True)
        self.sessions = {}

    def get_session(self, account):
        """Get or create a requests session for an account."""
        if account.id in self.sessions:
            return self.sessions[account.id]

        session = requests.Session()

        # Set proxy if configured
        if account.proxy:
            session.proxies = {
                "http": account.proxy,
                "https": account.proxy,
            }

        # Load saved cookies
        cookie_file = os.path.join(self.sessions_dir, f"{account.id}.cookies")
        if os.path.exists(cookie_file):
            with open(cookie_file, "rb") as f:
                session.cookies = pickle.load(f)

        self.sessions[account.id] = session
        return session

    def save_session(self, account):
        """Persist session cookies."""
        if account.id in self.sessions:
            cookie_file = os.path.join(self.sessions_dir, f"{account.id}.cookies")
            with open(cookie_file, "wb") as f:
                pickle.dump(self.sessions[account.id].cookies, f)

    def clear_session(self, account_id):
        if account_id in self.sessions:
            del self.sessions[account_id]
        cookie_file = os.path.join(self.sessions_dir, f"{account_id}.cookies")
        if os.path.exists(cookie_file):
            os.remove(cookie_file)

CAPTCHA-Aware Login

import time
import requests


class CaptchaSolver:
    BASE = "https://ocr.captchaai.com"

    def __init__(self, api_key):
        self.api_key = api_key

    def solve(self, params, initial_wait=10):
        params["key"] = self.api_key
        params["json"] = 1
        resp = requests.post(f"{self.BASE}/in.php", data=params).json()
        if resp["status"] != 1:
            raise Exception(resp["request"])
        task_id = resp["request"]
        time.sleep(initial_wait)
        for _ in range(60):
            result = requests.get(
                f"{self.BASE}/res.php",
                params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
            ).json()
            if result["request"] == "CAPCHA_NOT_READY":
                time.sleep(5)
                continue
            if result["status"] == 1:
                return result["request"]
            raise Exception(result["request"])
        raise TimeoutError("Timed out")


class LoginHandler:
    def __init__(self, captcha_solver):
        self.solver = captcha_solver

    def login(self, session, account, login_config):
        """
        login_config: {
            "url": login page URL,
            "submit_url": login form action URL,
            "captcha_type": "recaptcha_v2" | "turnstile" | None,
            "sitekey": "...",
            "username_field": "username",
            "password_field": "password",
            "captcha_field": "g-recaptcha-response",
        }
        """
        # Get login page (for CSRF token / cookies)
        session.get(login_config["url"])

        payload = {
            login_config.get("username_field", "username"): account.username,
            login_config.get("password_field", "password"): account.password,
        }

        # Solve CAPTCHA if present
        captcha_type = login_config.get("captcha_type")
        if captcha_type:
            if captcha_type == "recaptcha_v2":
                token = self.solver.solve({
                    "method": "userrecaptcha",
                    "googlekey": login_config["sitekey"],
                    "pageurl": login_config["url"],
                })
            elif captcha_type == "turnstile":
                token = self.solver.solve({
                    "method": "turnstile",
                    "sitekey": login_config["sitekey"],
                    "pageurl": login_config["url"],
                })
            else:
                raise ValueError(f"Unknown captcha type: {captcha_type}")

            captcha_field = login_config.get("captcha_field", "g-recaptcha-response")
            payload[captcha_field] = token

        submit_url = login_config.get("submit_url", login_config["url"])
        resp = session.post(submit_url, data=payload, allow_redirects=True)

        # Check login success
        success = resp.status_code == 200 and "login" not in resp.url.lower()
        if success:
            account.last_login = time.time()
        return success

Workflow Executor

import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

logger = logging.getLogger("workflow")


class WorkflowExecutor:
    def __init__(self, api_key, max_workers=5):
        self.solver = CaptchaSolver(api_key)
        self.registry = AccountRegistry()
        self.sessions = SessionManager()
        self.login_handler = LoginHandler(self.solver)
        self.max_workers = max_workers

    def run_for_account(self, account, workflow_fn, login_config):
        """Execute a workflow for a single account."""
        session = self.sessions.get_session(account)

        # Login if needed
        if not account.last_login or time.time() - account.last_login > 3600:
            logger.info(f"Logging in: {account.id}")
            if not self.login_handler.login(session, account, login_config):
                logger.error(f"Login failed: {account.id}")
                self.registry.update_status(account.id, "login_failed")
                return {"account": account.id, "status": "login_failed"}

            self.sessions.save_session(account)
            self.registry.save()

        # Execute workflow
        try:
            result = workflow_fn(session, account)
            return {"account": account.id, "status": "success", "data": result}
        except Exception as e:
            logger.error(f"Workflow failed for {account.id}: {e}")
            return {"account": account.id, "status": "error", "error": str(e)}

    def run_for_all(self, platform, workflow_fn, login_config):
        """Execute a workflow across all accounts on a platform."""
        accounts = self.registry.get_by_platform(platform)
        results = []

        with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
            futures = {
                pool.submit(self.run_for_account, acct, workflow_fn, login_config): acct
                for acct in accounts
            }
            for future in as_completed(futures):
                result = future.result()
                results.append(result)
                logger.info(f"  {result['account']}: {result['status']}")

        return results

Usage Example

executor = WorkflowExecutor("YOUR_API_KEY", max_workers=3)

# Define platform login config
login_config = {
    "url": "https://platform.example.com/login",
    "submit_url": "https://platform.example.com/api/login",
    "captcha_type": "recaptcha_v2",
    "sitekey": "6Le-wvkSAAAA...",
    "username_field": "email",
    "password_field": "password",
    "captcha_field": "g-recaptcha-response",
}


# Define workflow
def check_notifications(session, account):
    resp = session.get("https://platform.example.com/api/notifications")
    data = resp.json()
    return {
        "unread": data.get("unread_count", 0),
        "latest": data.get("notifications", [])[:5],
    }


# Run across all accounts
results = executor.run_for_all("example_platform", check_notifications, login_config)

for r in results:
    if r["status"] == "success":
        print(f"{r['account']}: {r['data']['unread']} unread notifications")
    else:
        print(f"{r['account']}: {r['status']}")

Workflow Examples

Data Export

def export_data(session, account):
    resp = session.get("https://platform.example.com/api/export")
    filename = f"export_{account.id}.json"
    with open(filename, "w") as f:
        f.write(resp.text)
    return {"file": filename, "size": len(resp.text)}

Status Check

def check_status(session, account):
    resp = session.get("https://platform.example.com/api/account/status")
    return resp.json()

Update Settings

def update_settings(session, account):
    resp = session.post(
        "https://platform.example.com/api/settings",
        json={"timezone": "UTC", "notifications": True},
    )
    return {"updated": resp.status_code == 200}

Troubleshooting

Issue Cause Fix
All logins failing Sitekey changed Re-extract sitekey from login page
Session cookies invalid Expired cookies Clear session and re-login
Rate limited Too many concurrent logins Reduce max_workers, add delays
Account locked Too many failed attempts Check credentials, reduce frequency

FAQ

How many accounts can I manage concurrently?

Start with 3-5 concurrent workers and increase based on the platform's rate limits. Monitor for rate-limiting responses (429).

How do I handle account-specific proxies?

Set the proxy field on each Account object. The SessionManager automatically applies the proxy for that account's requests.

How do I rotate credentials securely?

Store credentials in environment variables or a secrets manager. Never hardcode credentials in source files.



Scale multi-step workflows — solve CAPTCHAs with CaptchaAI.

Full Working Code

Complete runnable examples for this article in Python, Node.js, PHP, Go, Java, C#, Ruby, Rust, Kotlin & Bash.

View on GitHub →

Discussions (0)

No comments yet.