Skip to content

Resilience Patterns

Keep your MCP server running reliably when things go wrong -- circuit breakers for flaky dependencies, health checks for monitoring, webhooks for alerting, background tasks for deferred work, and exception handlers for clean error responses.

Circuit Breaker

When you need it

Your MCP server wraps Stripe's payment API. When Stripe has an outage, every tool call to your server fails after a 30-second timeout. Without a circuit breaker, agents keep retrying, your server accumulates blocked connections, and everything slows to a crawl. With a circuit breaker, the server immediately tells agents "try again later" instead of waiting for Stripe to time out.

CircuitBreakerMiddleware

from promptise.mcp.server import MCPServer, CircuitBreakerMiddleware

server = MCPServer(name="payment-api")
server.add_middleware(CircuitBreakerMiddleware(
    failure_threshold=5,           # Open after 5 consecutive failures
    recovery_timeout=60.0,         # Wait 60s before testing recovery
    excluded_tools={"health"},     # Never circuit-break health checks
))

@server.tool()
async def charge_card(
    customer_id: str,
    amount_cents: int,
    currency: str = "usd",
) -> dict:
    """Charge a customer's card via Stripe.

    If Stripe is down, the circuit breaker trips after 5 failures
    and immediately rejects calls for 60 seconds instead of
    waiting for timeouts.
    """
    import stripe
    charge = await stripe.Charge.create(
        customer=customer_id,
        amount=amount_cents,
        currency=currency,
    )
    return {"charge_id": charge.id, "status": charge.status}

How it works

stateDiagram-v2
    [*] --> Closed
    Closed --> Open: failure_threshold reached
    Open --> HalfOpen: recovery_timeout elapsed
    HalfOpen --> Closed: probe call succeeds
    HalfOpen --> Open: probe call fails
State Behavior
Closed Normal operation. Failures increment a counter.
Open All calls rejected immediately with CircuitOpenError.
Half-Open One probe call allowed through. Success → Closed. Failure → Open.

Handling CircuitOpenError

Agents receive a structured error when the circuit is open:

from promptise.mcp.server import CircuitOpenError

@server.exception_handler(CircuitOpenError)
async def handle_circuit_open(ctx, exc):
    from promptise.mcp.server import ToolError
    return ToolError(
        message=f"Service temporarily unavailable. Retry in {exc.retry_after:.0f}s.",
        code="SERVICE_UNAVAILABLE",
        retryable=True,
    )

Configuration

Parameter Default Description
failure_threshold 5 Consecutive failures before opening
recovery_timeout 60.0 Seconds before probing recovery
excluded_tools set() Tools exempt from circuit breaking

Programmatic control

cb = CircuitBreakerMiddleware(failure_threshold=5)
server.add_middleware(cb)

# Check state
cb.get_state("charge_card")  # CircuitState.CLOSED / OPEN / HALF_OPEN

# Manual reset (e.g., after fixing the dependency)
cb.reset("charge_card")  # Reset one tool
cb.reset()               # Reset all

Health Checks

When you need it

Your MCP server runs in Kubernetes. Kubernetes needs to know if the server is alive (liveness probe) and ready to accept traffic (readiness probe). If your database is down, the server should report "not ready" so Kubernetes routes traffic elsewhere.

HealthCheck

from promptise.mcp.server import MCPServer, HealthCheck

server = MCPServer(name="api")
health = HealthCheck()

# Required check — server is "not ready" if this fails
async def check_database() -> bool:
    try:
        await db.execute("SELECT 1")
        return True
    except Exception:
        return False

health.add_check("database", check_database, required_for_ready=True)

# Optional check — logged but doesn't affect readiness
async def check_cache() -> bool:
    return cache.is_connected()

health.add_check("cache", check_cache, required_for_ready=False)

# Register as MCP resources
health.register_resources(server)

This exposes two resources:

Resource URI Purpose
health://live Liveness: is the server process running?
health://ready Readiness: are all required dependencies available?

Agents or monitoring systems can read these resources to check server health.


Webhook Notifications

When you need it

Your ops team uses Slack for alerts. When a tool call fails, you want a Slack message immediately -- not waiting for someone to check logs.

WebhookMiddleware

from promptise.mcp.server import MCPServer, WebhookMiddleware

server = MCPServer(name="api")
server.add_middleware(WebhookMiddleware(
    url="https://hooks.slack.com/services/T.../B.../xxx",
    events={"tool.error"},              # Only fire on errors
    headers={"Authorization": "Bearer slack-token"},
    timeout=5.0,                        # Don't block tool calls
))

Events

Event When fired
tool.call Before every tool execution
tool.success After successful execution
tool.error After an exception

Webhook payload

{
  "event": "tool.error",
  "tool": "charge_card",
  "client_id": "checkout-agent",
  "request_id": "f6e5d4",
  "timestamp": 1709812345.6,
  "error": "stripe.error.CardDeclinedError: Card declined"
}

Webhooks are fire-and-forget -- they never block or fail the tool call. If the webhook endpoint is unreachable, the failure is logged and the tool call proceeds normally.


Background Tasks

When you need it

After creating a user, you want to send a welcome email and log an analytics event. These are important but shouldn't delay the tool response.

BackgroundTasks

from promptise.mcp.server import MCPServer, BackgroundTasks, Depends

server = MCPServer(name="hr-api")

async def send_welcome_email(employee_id: str, email: str):
    """Send welcome email (runs after response is sent)."""
    await email_service.send(
        to=email,
        subject="Welcome to the team!",
        template="welcome",
        data={"employee_id": employee_id},
    )

async def log_audit_event(action: str, actor: str, details: dict):
    """Log to external audit system."""
    await audit_api.log(action=action, actor=actor, details=details)

@server.tool(auth=True)
async def create_employee(
    name: str,
    email: str,
    department: str,
    bg: BackgroundTasks = Depends(BackgroundTasks),
) -> dict:
    """Create an employee.

    Welcome email and audit log run in the background after the
    response is returned. If they fail, it's logged but doesn't
    affect the response.
    """
    from promptise.mcp.server import get_context
    ctx = get_context()

    emp_id = await db.create_employee(name=name, email=email, dept=department)

    bg.add(send_welcome_email, emp_id, email)
    bg.add(log_audit_event, "CREATE_EMPLOYEE", ctx.client_id, {"id": emp_id})

    return {"id": emp_id, "name": name, "status": "created"}

Background tasks run sequentially after the tool response is sent. If a task raises an exception, it's logged but remaining tasks still run.


Exception Handlers

When you need it

Your tools raise domain-specific exceptions (EmployeeNotFoundError, InsufficientFundsError). Without exception handlers, agents see generic "Internal error" messages. With handlers, they get structured, actionable error responses.

Custom exception mapping

from promptise.mcp.server import MCPServer, ToolError

class EmployeeNotFoundError(Exception):
    def __init__(self, employee_id: str):
        self.employee_id = employee_id
        super().__init__(f"Employee {employee_id} not found")

class InsufficientPermissionsError(Exception):
    pass

server = MCPServer(name="hr-api")

@server.exception_handler(EmployeeNotFoundError)
async def handle_not_found(ctx, exc):
    return ToolError(
        message=f"Employee '{exc.employee_id}' does not exist.",
        code="EMPLOYEE_NOT_FOUND",
        retryable=False,
    )

@server.exception_handler(InsufficientPermissionsError)
async def handle_permissions(ctx, exc):
    return ToolError(
        message="You don't have permission for this action.",
        code="FORBIDDEN",
        retryable=False,
    )

@server.tool()
async def get_employee(employee_id: str) -> dict:
    """Get an employee by ID."""
    record = await db.get_employee(employee_id)
    if record is None:
        raise EmployeeNotFoundError(employee_id)
    return record

The handler receives the RequestContext and the exception. It returns a ToolError that's sent to the client as a structured error response.

MRO-based matching: If you register a handler for ValueError and throw a SpecificValueError(ValueError), the ValueError handler catches it. The most specific handler in the MRO wins.


Progress Reporting

When you need it

Your tool processes a large dataset and takes 30+ seconds. Without progress, the agent (or human watching) has no idea if it's stuck or working.

ProgressReporter

from promptise.mcp.server import MCPServer, ProgressReporter, Depends

server = MCPServer(name="data-pipeline")

@server.tool()
async def process_dataset(
    dataset_url: str,
    progress: ProgressReporter = Depends(ProgressReporter),
) -> dict:
    """Process a large dataset with progress reporting."""
    await progress.report(0, total=100, message="Downloading dataset...")
    data = await download(dataset_url)

    rows = parse_csv(data)
    processed = 0
    for i, row in enumerate(rows):
        await transform_row(row)
        processed += 1
        if i % 100 == 0:
            pct = int((i / len(rows)) * 100)
            await progress.report(pct, total=100, message=f"Processed {i}/{len(rows)} rows")

    await progress.report(100, total=100, message="Complete")
    return {"processed": processed, "total": len(rows)}

Progress notifications are sent via MCP's notifications/progress. The client receives them in real-time and can display a progress bar or status message.

If the client doesn't support progress (no progressToken in the request), the report() calls are silently ignored.


Cancellation

When you need it

An agent starts a 5-minute data processing job, then the user decides they don't need it anymore. Without cancellation support, the job runs to completion, wasting resources.

CancellationToken

from promptise.mcp.server import MCPServer, CancellationToken, Depends

server = MCPServer(name="data-pipeline")

@server.tool()
async def long_running_task(
    dataset: str,
    cancel: CancellationToken = Depends(CancellationToken),
) -> dict:
    """Process a dataset. Can be cancelled by the client."""
    results = []
    for chunk in load_chunks(dataset):
        cancel.check()  # Raises CancelledError if cancelled
        results.extend(await process_chunk(chunk))
    return {"processed": len(results)}

You can also wait for cancellation with a timeout:

@server.tool()
async def poll_for_updates(
    topic: str,
    cancel: CancellationToken = Depends(CancellationToken),
) -> dict:
    """Poll until cancelled or timeout."""
    updates = []
    while True:
        cancelled = await cancel.wait(timeout=5.0)
        if cancelled:
            break
        new = await fetch_updates(topic)
        updates.extend(new)
    return {"updates": updates}

Combining Resilience Features

A real production server uses multiple resilience patterns together:

from promptise.mcp.server import (
    MCPServer, AuthMiddleware, JWTAuth,
    CircuitBreakerMiddleware, WebhookMiddleware,
    AuditMiddleware, HealthCheck,
)

server = MCPServer(name="payment-api")
health = HealthCheck()

# Middleware stack
server.add_middleware(AuditMiddleware(log_path="audit.jsonl", signed=True))
server.add_middleware(WebhookMiddleware(
    url="https://hooks.slack.com/services/...",
    events={"tool.error"},
))
server.add_middleware(CircuitBreakerMiddleware(
    failure_threshold=5,
    recovery_timeout=60.0,
    excluded_tools={"health"},
))
server.add_middleware(AuthMiddleware(JWTAuth(secret="...")))

# Health checks
async def check_stripe():
    return await stripe_client.is_available()

health.add_check("stripe", check_stripe, required_for_ready=True)
health.register_resources(server)

API Summary

Symbol Type Description
CircuitBreakerMiddleware(...) Class Circuit breaker for downstream protection
CircuitOpenError Exception Raised when circuit is open
CircuitState Enum CLOSED, OPEN, HALF_OPEN
HealthCheck() Class Health and readiness probe manager
WebhookMiddleware(url, events, ...) Class Fire webhooks on tool events
BackgroundTasks() Class Fire-and-forget task scheduler
ExceptionHandlerRegistry Class Map exceptions to MCP error responses
ProgressReporter Class Report progress during long tools (via DI)
CancellationToken Class Check/wait for client cancellation (via DI)

What's Next