Skip to content

Building Production MCP Servers

Build a production-grade MCP server from scratch in 12 incremental steps. Each step adds one concept so you understand the full picture.

This is the recommended starting point for building MCP servers

This guide walks you through building a complete server step by step. For deep reference on individual features, see the Server Fundamentals, Routers & Middleware, Auth & Security, and Production Features pages.

What You'll Build

A task management API with CRUD operations, JWT authentication with structured client context, route organization, middleware, caching, scope-based authorization, request tracing, response metadata, and background job queues -- the same patterns used in real-world deployments. By step 12, you'll have a server that agents can connect to and use immediately.

Concepts

The Model Context Protocol (MCP) standardizes how AI agents discover and invoke tools. Instead of hardcoding tool definitions, agents connect to MCP servers and dynamically discover what's available. This decouples agent logic from tool implementation -- you can update tools without changing agents, and multiple agents can share the same tool server.

An MCPServer is the central object. You register tools, resources, and prompts using decorators, add middleware for cross-cutting concerns, and call server.run() to start serving. The framework automatically generates JSON Schemas from your function signatures and Pydantic models, so clients always have accurate parameter descriptions.

Production servers layer additional capabilities: authentication to control who can call tools, routers to organize tools by domain, middleware for logging and rate limiting, guards for fine-grained access control based on roles and OAuth2 scopes, caching for expensive operations, request tracing for distributed debugging, and response metadata for audit trails.


Step 1: Basic Server

Start with a minimal server that exposes tools:

from promptise.mcp.server import MCPServer

server = MCPServer(name="task-api", version="1.0.0")

@server.tool()
async def list_tasks(status: str = "all") -> list[dict]:
    """List tasks, optionally filtered by status."""
    tasks = [
        {"id": "1", "title": "Review PR", "status": "open"},
        {"id": "2", "title": "Deploy v2", "status": "done"},
    ]
    if status != "all":
        tasks = [t for t in tasks if t["status"] == status]
    return tasks

@server.tool()
async def create_task(title: str, assignee: str = "") -> dict:
    """Create a new task."""
    return {"id": "3", "title": title, "assignee": assignee, "status": "open"}

server.run(transport="http", port=8080)

Run this and any MCP client can discover and call your tools. Parameters are validated automatically from the function signature.


Step 2: Pydantic Validation

For complex inputs, use Pydantic models. The framework generates nested JSON Schemas that clients use for validation:

from pydantic import BaseModel, Field

class TaskCreate(BaseModel):
    title: str = Field(min_length=1, max_length=200, description="Task title")
    description: str = Field(default="", description="Detailed description")
    priority: int = Field(default=3, ge=1, le=5, description="Priority 1-5")
    tags: list[str] = Field(default_factory=list, description="Tags")

class TaskUpdate(BaseModel):
    title: str | None = None
    status: str | None = Field(None, pattern="^(open|in_progress|done)$")
    priority: int | None = Field(None, ge=1, le=5)

@server.tool(tags=["tasks", "write"])
async def create_task(task: TaskCreate) -> dict:
    """Create a task with validated fields."""
    return {"id": "new-1", **task.model_dump()}

@server.tool(tags=["tasks", "write"])
async def update_task(task_id: str, updates: TaskUpdate) -> dict:
    """Update task fields."""
    changes = updates.model_dump(exclude_none=True)
    return {"id": task_id, "updated": changes}

Step 3: Resources and Prompts

Resources expose read-only data at static URIs. Resource templates use parameters for dynamic data. Prompts expose reusable prompt templates:

@server.resource("config://app", mime_type="application/json")
async def app_config() -> str:
    """Application configuration."""
    return '{"version": "1.0.0", "environment": "production"}'

@server.resource_template("tasks://{task_id}", mime_type="application/json")
async def task_detail(task_id: str) -> str:
    """Get task details by ID."""
    return f'{{"id": "{task_id}", "title": "Task {task_id}"}}'

@server.prompt()
async def task_summary(project: str, status: str = "all") -> str:
    """Generate a prompt to summarize tasks for a project."""
    return f"Summarize all {status} tasks for project {project}."

Step 4: Authentication and Client Context

Add JWT authentication to protect sensitive tools. After authentication, every tool receives a structured ClientContext with the caller's identity, roles, scopes, and JWT claims:

from promptise.mcp.server import (
    MCPServer, AuthMiddleware, JWTAuth, RequestContext,
)

jwt_auth = JWTAuth(secret="your-secret-key-min-32-chars-long!!")
server = MCPServer(name="task-api", version="1.0.0")
server.add_middleware(AuthMiddleware(jwt_auth))

# Public tool -- no auth required
@server.tool()
async def health_check() -> dict:
    """Check server health."""
    return {"status": "healthy"}

# Protected tool -- requires valid JWT
@server.tool(auth=True)
async def create_task(ctx: RequestContext, title: str) -> dict:
    """Create a task (authenticated users only)."""
    return {
        "id": "1",
        "title": title,
        "created_by": ctx.client.client_id,  # Structured client identity
    }

# Role-restricted tool -- requires 'admin' role in JWT claims
@server.tool(auth=True, roles=["admin"])
async def delete_task(task_id: str) -> dict:
    """Delete a task (admin only)."""
    return {"deleted": task_id}

The ctx.client object gives you typed access to everything about the caller:

@server.tool(auth=True)
async def whoami(ctx: RequestContext) -> dict:
    """Return full client identity."""
    return {
        "client_id": ctx.client.client_id,     # "agent-prod"
        "roles": sorted(ctx.client.roles),      # ["admin", "write"]
        "scopes": sorted(ctx.client.scopes),    # ["read", "write"]
        "issuer": ctx.client.issuer,            # JWT "iss" claim
        "subject": ctx.client.subject,          # JWT "sub" claim
        "ip_address": ctx.client.ip_address,    # Client IP
        "user_agent": ctx.client.user_agent,    # Client user-agent header
    }

For development, you can enable a built-in token endpoint:

server.enable_token_endpoint(
    jwt_auth=jwt_auth,
    clients={
        "agent-prod":  {"secret": "agent-secret",  "roles": ["admin", "write"]},
        "agent-viewer": {"secret": "viewer-secret", "roles": ["read"]},
    },
)
# POST /auth/token with {"client_id": "agent-prod", "client_secret": "agent-secret"}

For production, use a real identity provider (Auth0, Keycloak, Okta).


Step 5: Guards -- Roles and Scopes

Guards enforce fine-grained permissions after authentication. Use role-based guards for API key auth, and scope-based guards for OAuth2/JWT:

from promptise.mcp.server import HasRole, HasAllRoles, HasScope, HasAllScopes, RequireClientId

# Any ONE of these roles grants access
@server.tool(auth=True, guards=[HasRole("admin", "manager")])
async def approve_task(task_id: str) -> dict:
    """Approve a task (admin or manager)."""
    return {"approved": task_id}

# ALL roles required
@server.tool(auth=True, guards=[HasAllRoles("admin", "billing")])
async def refund_payment(payment_id: str) -> dict:
    """Refund a payment (requires both admin AND billing roles)."""
    return {"refunded": payment_id}

# OAuth2 scope-based access (reads from JWT "scope" claim)
@server.tool(auth=True, guards=[HasScope("tasks:write")])
async def bulk_update(task_ids: list[str]) -> dict:
    """Bulk update tasks (requires tasks:write scope)."""
    return {"updated": len(task_ids)}

# All scopes required
@server.tool(auth=True, guards=[HasAllScopes("tasks:read", "tasks:write")])
async def export_and_archive(project: str) -> dict:
    """Export and archive (requires both read AND write scopes)."""
    return {"archived": project}

# Restrict to specific client identifiers
@server.tool(auth=True, guards=[RequireClientId("agent-prod", "agent-staging")])
async def deploy(version: str) -> dict:
    """Deploy (only specific agents allowed)."""
    return {"deployed": version}

When a guard denies access, the error message explains exactly what's wrong:

Requires any of roles [admin, manager], but client has [viewer]
Requires all scopes [tasks:read, tasks:write], client has [tasks:read], missing [tasks:write]

Step 6: Client Enrichment Hook

Use on_authenticate to add custom data to the client context after authentication -- look up user profiles, load permissions from a database, or inject organization metadata:

from promptise.mcp.server import AuthMiddleware, JWTAuth, RequestContext

async def enrich_client(client, ctx):
    """Called after every successful authentication."""
    # Look up additional info from your database
    org = await db.get_org(client.client_id)
    client.extra["org_name"] = org.name
    client.extra["plan"] = org.plan
    client.extra["feature_flags"] = org.feature_flags

jwt_auth = JWTAuth(secret="your-secret-key-min-32-chars-long!!")
server.add_middleware(AuthMiddleware(jwt_auth, on_authenticate=enrich_client))

@server.tool(auth=True)
async def premium_feature(ctx: RequestContext) -> dict:
    """A feature only available on premium plans."""
    if ctx.client.extra.get("plan") != "premium":
        raise ToolError("This feature requires a premium plan")
    return {"result": "premium data"}

Step 7: Routers

Organize tools by domain with MCPRouter. Each router adds a prefix and can apply shared auth, tags, and guards:

from promptise.mcp.server import MCPRouter

# Task management tools
task_router = MCPRouter(prefix="tasks", tags=["tasks"], auth=True)

@task_router.tool()
async def list(status: str = "all") -> list[dict]:
    """List all tasks."""  # Tool name becomes "tasks_list"
    return []

@task_router.tool()
async def create(title: str) -> dict:
    """Create a task."""  # Tool name becomes "tasks_create"
    return {"id": "1", "title": title}

# User management tools
user_router = MCPRouter(prefix="users", tags=["users"], auth=True)

@user_router.tool()
async def list_users() -> list[dict]:
    """List all users."""  # Tool name becomes "users_list_users"
    return []

# Register routers with the server
server.include_router(task_router)
server.include_router(user_router)

Step 8: Middleware

Add cross-cutting concerns with the middleware chain. Middleware runs in registration order (first added = outermost):

from promptise.mcp.server import LoggingMiddleware, TimeoutMiddleware, RateLimitMiddleware

# Logging -- logs every tool call with timing
server.add_middleware(LoggingMiddleware())

# Timeouts -- kill slow tool calls
server.add_middleware(TimeoutMiddleware(default_timeout=30.0))

# Rate limiting -- prevent abuse
server.add_middleware(RateLimitMiddleware(rate_per_minute=100, per_tool=True))

Write custom middleware with the decorator:

@server.middleware
async def metrics_middleware(ctx, call_next):
    """Track tool call metrics."""
    import time
    start = time.monotonic()
    try:
        result = await call_next(ctx)
        duration = time.monotonic() - start
        print(f"[METRIC] {ctx.tool_name}: {duration:.3f}s (success)")
        return result
    except Exception as exc:
        duration = time.monotonic() - start
        print(f"[METRIC] {ctx.tool_name}: {duration:.3f}s (error: {exc})")
        raise

Step 9: Caching and Dependencies

Cache expensive operations:

from promptise.mcp.server import cached, InMemoryCache

cache = InMemoryCache(max_size=1000, cleanup_interval=60.0)

@server.tool()
@cached(ttl=300, backend=cache)
async def search_tasks(query: str) -> list[dict]:
    """Search tasks (cached for 5 minutes)."""
    # Expensive database query
    return await db.search(query)

Inject dependencies with Depends for resource cleanup:

from promptise.mcp.server import Depends

async def get_db():
    db = await Database.connect()
    try:
        yield db  # Injected into the handler
    finally:
        await db.close()  # Cleanup after handler returns

@server.tool()
async def query_tasks(sql: str, db = Depends(get_db)) -> list:
    """Run a database query with automatic connection cleanup."""
    return await db.execute(sql)

Fire-and-forget tasks with BackgroundTasks:

from promptise.mcp.server import BackgroundTasks

@server.tool()
async def process_data(data: str, bg: BackgroundTasks = Depends(BackgroundTasks)) -> str:
    """Process data and send notifications in the background."""
    bg.add(send_notification, "Data processed", data)
    bg.add(log_analytics, "process_data", data)
    return "Processing started"

Step 10: Request Tracing and Response Metadata

Request tracing propagates a unique ID through the entire request lifecycle. Clients send X-Request-ID headers (or one is generated automatically), and you can use it for distributed debugging:

@server.tool(auth=True)
async def traced_operation(ctx: RequestContext, data: str) -> dict:
    """An operation with full tracing context."""
    # Request ID available on every context
    request_id = ctx.request_id  # Client-provided or auto-generated

    # Pass it to downstream services
    await downstream_api.call(data, trace_id=request_id)

    return {"processed": data, "trace_id": request_id}

ToolResponse wraps your return value with metadata for audit and observability -- the metadata is captured automatically without changing what the client sees:

from promptise.mcp.server import ToolResponse

@server.tool(auth=True)
async def create_order(ctx: RequestContext, product_id: str, qty: int) -> ToolResponse:
    """Create an order with audit metadata."""
    order = await db.create_order(product_id, qty)

    return ToolResponse(
        content={"order_id": order.id, "status": "created"},
        metadata={
            "audit_action": "order.create",
            "actor": ctx.client.client_id,
            "ip": ctx.client.ip_address,
            "product_id": product_id,
        },
    )
    # Client sees: {"order_id": "...", "status": "created"}
    # Metadata stored on ctx.state["response_metadata"] for middleware/logging

Step 11: Job Queues for Long-Running Work

MCP tool calls are synchronous -- the agent blocks until the response. This breaks down for long-running work like report generation, data pipelines, or batch processing. MCPQueue turns any long operation into a non-blocking submit-and-poll workflow:

import asyncio
from promptise.mcp.server import MCPServer, MCPQueue

server = MCPServer(name="analytics-api", version="1.0.0")
queue = MCPQueue(server, max_workers=4)

@queue.job(name="generate_report", timeout=120)
async def generate_report(department: str, quarter: str = "Q4") -> dict:
    """Generate a quarterly analytics report (takes ~30 seconds)."""
    await asyncio.sleep(30)  # Simulate long-running work
    return {"department": department, "quarter": quarter, "rows": 1250, "status": "ready"}

@queue.job(name="train_model", timeout=600, max_retries=2, backoff_base=2.0)
async def train_model(dataset: str, epochs: int = 10) -> dict:
    """Train a model on the given dataset."""
    for i in range(epochs):
        await asyncio.sleep(1)
    return {"accuracy": 0.95, "model_id": "model-abc123"}

server.run(transport="http", port=8080)

MCPQueue auto-registers 5 tools that agents use to interact with the queue:

Auto-registered tool Purpose
queue_submit Submit a job -- returns a job_id immediately
queue_status Check job status and progress
queue_result Retrieve a completed job's return value
queue_cancel Cancel a pending or running job
queue_list List jobs (filterable by status)

Typical agent workflow:

Agent: queue_submit(job_type="generate_report", args={"department": "Engineering"})
  -> {"job_id": "a1b2c3d4", "status": "pending"}

Agent: queue_status(job_id="a1b2c3d4")
  -> {"status": "running", "progress": 0.3, "progress_message": "Processing Q3 data"}

Agent: queue_result(job_id="a1b2c3d4")
  -> {"status": "completed", "result": {"department": "Engineering", "rows": 1250}}

Jobs support priority levels (low, normal, high, critical), automatic retries with exponential backoff, per-job timeouts, and cancellation. The agent submits work and polls for completion -- no long-lived connections or blocking calls.


Step 12: Connect Agents

Once your server is running, connect an agent:

from promptise import build_agent, HTTPServerSpec

agent = await build_agent(
    model="openai:gpt-5-mini",
    servers={
        "tasks": HTTPServerSpec(
            url="http://localhost:8080/mcp",
            bearer_token="your-jwt-token",
        ),
    },
)

result = await agent.ainvoke(
    {"messages": [{"role": "user", "content": "Create a task to review the PR"}]}
)

The agent automatically discovers all tools from your server and can call them as needed.


Complete Example

import asyncio
from promptise.mcp.server import (
    MCPServer, MCPRouter, AuthMiddleware, JWTAuth,
    LoggingMiddleware, TimeoutMiddleware, RateLimitMiddleware,
    Depends, BackgroundTasks, cached, InMemoryCache,
    HasScope, RequestContext, ToolResponse,
)

# Server setup
server = MCPServer(name="task-api", version="1.0.0")
jwt_auth = JWTAuth(secret="your-production-secret-key-here!!")
cache = InMemoryCache(max_size=500)

# Client enrichment
async def enrich(client, ctx):
    client.extra["tier"] = "enterprise"

# Middleware stack
server.add_middleware(LoggingMiddleware())
server.add_middleware(AuthMiddleware(jwt_auth, on_authenticate=enrich))
server.add_middleware(TimeoutMiddleware(default_timeout=30.0))
server.add_middleware(RateLimitMiddleware(rate_per_minute=100))

# Task router
tasks = MCPRouter(prefix="tasks", tags=["tasks"])

@tasks.tool()
@cached(ttl=60, backend=cache)
async def list_tasks(status: str = "all") -> list[dict]:
    """List tasks with optional status filter."""
    return [{"id": "1", "title": "Example", "status": "open"}]

@tasks.tool(auth=True, guards=[HasScope("tasks:write")])
async def create_task(ctx: RequestContext, title: str, priority: int = 3) -> ToolResponse:
    """Create a new task with audit trail."""
    return ToolResponse(
        content={"id": "new", "title": title, "priority": priority},
        metadata={"actor": ctx.client.client_id, "action": "task.create"},
    )

@tasks.tool(auth=True, roles=["admin"])
async def delete_task(task_id: str) -> dict:
    """Delete a task (admin only)."""
    return {"deleted": task_id}

server.include_router(tasks)

# Dev token endpoint
server.enable_token_endpoint(
    jwt_auth=jwt_auth,
    clients={"agent": {"secret": "dev-secret", "roles": ["admin"]}},
)

# Lifecycle hooks
@server.on_startup
async def startup():
    print("Task API server ready")

# Run
server.run(transport="http", port=8080)

What's Next

Go deeper on each feature:

Feature used in this guide Deep reference
@server.tool(), resources, prompts Server Fundamentals
MCPRouter, @server.middleware Routers & Middleware
JWTAuth, AuthMiddleware, ClientContext, guards, scopes Auth & Security
@cached, rate limiting, health checks, metrics Production Features
TestClient for testing Testing

Connect agents to your server:

Other guides: