Advanced Patterns¶
Patterns for evolving, composing, and extending MCP servers — tool versioning for backwards compatibility, transforms for dynamic tool visibility, server composition for microservice gateways, OpenAPI bridging, batch execution, streaming results, elicitation, and sampling.
Tool Versioning¶
When you need it¶
You ship search v1 to production. Agents hard-code search in their prompts. Now you want to add a filters parameter — but you can't change the signature without breaking every agent that already uses v1. Versioned tools let you ship v2 while keeping v1 available.
VersionedToolRegistry¶
from promptise.mcp.server import MCPServer, VersionedToolRegistry
server = MCPServer(name="search-api")
@server.tool(version="1.0")
async def search(query: str) -> list[dict]:
"""Full-text search across documents.
v1: Basic query string matching.
"""
return await db.full_text_search(query)
@server.tool(version="2.0")
async def search(
query: str,
filters: dict | None = None,
sort_by: str = "relevance",
) -> list[dict]:
"""Full-text search across documents.
v2: Adds filtering and sorting.
"""
results = await db.full_text_search(query, filters=filters)
return sorted(results, key=lambda r: r.get(sort_by, 0), reverse=True)
What agents see:
| Tool name | Description |
|---|---|
search |
Points to the latest version (v2) |
search@1.0 |
Pinned to v1 (basic search) |
search@2.0 |
Pinned to v2 (with filters) |
Working with versions programmatically¶
vr = VersionedToolRegistry()
vr.register("search", "1.0", tool_def_v1)
vr.register("search", "2.0", tool_def_v2)
# Get latest
vr.get("search") # → v2.0 ToolDef
# Get specific version
vr.get("search@1.0") # → v1.0 ToolDef
# List all versions
vr.list_versions("search") # → ["1.0", "2.0"]
# Check if versioned
vr.has("search") # → True
Version comparison¶
Versions are compared semantically: "2.0" > "1.0", "1.10" > "1.9". The latest property always returns the highest version.
Tool Transforms¶
When you need it¶
You have 50 tools registered. Some are admin-only, some are experimental. You don't want every agent to see all 50 — you want to control what each agent discovers based on context.
NamespaceTransform¶
Prefix tool names when composing multiple servers or separating concerns:
from promptise.mcp.server import MCPServer, NamespaceTransform
server = MCPServer(name="analytics-api")
@server.tool()
async def query(sql: str) -> list[dict]:
"""Run an analytics query."""
return await analytics_db.execute(sql)
@server.tool()
async def export(format: str = "csv") -> str:
"""Export latest report."""
return await reports.export(format)
# Agents see: analytics_query, analytics_export
server.add_transform(NamespaceTransform(prefix="analytics"))
VisibilityTransform¶
Hide tools based on who's asking:
from promptise.mcp.server import MCPServer, VisibilityTransform
server = MCPServer(name="admin-api")
@server.tool()
async def list_users() -> list[dict]:
"""List all users."""
return await db.list_users()
@server.tool()
async def delete_user(user_id: str) -> dict:
"""Delete a user permanently."""
return await db.delete_user(user_id)
@server.tool()
async def reset_database() -> str:
"""Reset the entire database."""
await db.reset()
return "Database reset complete"
# Hide destructive tools from non-admin agents
server.add_transform(VisibilityTransform(
hidden={
"delete_user": lambda ctx: "admin" not in (ctx.state.get("roles") if ctx else set()),
"reset_database": lambda ctx: "superadmin" not in (ctx.state.get("roles") if ctx else set()),
}
))
Hidden tools are removed from list_tools results but remain callable if an agent already has the tool name cached. This is a soft visibility control, not a security boundary — use guards for that.
TagFilterTransform¶
Only expose tools that match required tags:
from promptise.mcp.server import MCPServer, TagFilterTransform
server = MCPServer(name="multi-tier-api")
@server.tool(tags=["public", "read"])
async def get_product(product_id: str) -> dict:
"""Get product details."""
return await catalog.get(product_id)
@server.tool(tags=["internal", "write"])
async def update_inventory(product_id: str, quantity: int) -> dict:
"""Update inventory count."""
return await inventory.update(product_id, quantity)
@server.tool(tags=["admin", "write"])
async def set_pricing(product_id: str, price_cents: int) -> dict:
"""Set product pricing."""
return await pricing.set(product_id, price_cents)
# External agents only see tools tagged "public"
server.add_transform(TagFilterTransform(required_tags={"public"}))
Custom transforms¶
Transforms implement the ToolTransform protocol:
from promptise.mcp.server import ToolTransform, ToolDef, RequestContext
from dataclasses import replace
class DescriptionRewriter:
"""Append usage hints to tool descriptions."""
def apply(
self,
tools: list[ToolDef],
ctx: RequestContext | None = None,
) -> list[ToolDef]:
result = []
for t in tools:
new_desc = f"{t.description}\n\nUsage: call with JSON arguments."
result.append(replace(t, description=new_desc))
return result
Transforms are applied in order — each sees the output of the previous one. This lets you compose them:
server.add_transform(NamespaceTransform(prefix="myapp"))
server.add_transform(TagFilterTransform(required_tags={"public"}))
# Result: only public tools, prefixed with "myapp_"
Server Composition¶
When you need it¶
Your company has separate teams building separate MCP servers — payments, users, analytics. You want to expose them all through a single gateway server so agents only connect to one endpoint.
mount()¶
from promptise.mcp.server import MCPServer, mount
# Team 1: Payment tools
payments = MCPServer(name="payments")
@payments.tool()
async def charge(customer_id: str, amount_cents: int) -> dict:
"""Charge a customer's payment method."""
return await stripe.charge(customer_id, amount_cents)
@payments.tool()
async def refund(charge_id: str) -> dict:
"""Refund a charge."""
return await stripe.refund(charge_id)
# Team 2: User tools
users = MCPServer(name="users")
@users.tool()
async def get_user(user_id: str) -> dict:
"""Get user profile."""
return await db.get_user(user_id)
@users.tool()
async def update_user(user_id: str, name: str | None = None) -> dict:
"""Update user profile."""
return await db.update_user(user_id, name=name)
# Gateway: compose into one server
gateway = MCPServer(name="api-gateway", version="1.0.0")
mount(gateway, payments, prefix="pay")
mount(gateway, users, prefix="usr")
# Agents see: pay_charge, pay_refund, usr_get_user, usr_update_user
gateway.run(transport="http", port=8080)
What gets mounted¶
mount() copies everything from the child into the parent:
| Registry | Behavior |
|---|---|
| Tools | Names prefixed with {prefix}_ |
| Resources | Copied as-is (no prefix) |
| Prompts | Copied as-is |
| Exception handlers | Merged into parent |
| Input models | Re-mapped to prefixed tool names |
Adding tags during mount¶
# Tag all payment tools for filtering
mount(gateway, payments, prefix="pay", tags=["payment", "billing"])
mount(gateway, users, prefix="usr", tags=["user-management"])
OpenAPI Bridge¶
When you need it¶
You have an existing REST API with an OpenAPI spec (Swagger). You want agents to call it through MCP without writing individual tool wrappers for each endpoint.
OpenAPIProvider¶
from promptise.mcp.server import MCPServer, OpenAPIProvider
server = MCPServer(name="github-bridge")
# Load from URL
provider = OpenAPIProvider(
"https://raw.githubusercontent.com/github/rest-api-description/main/descriptions/api.github.com/api.github.com.json",
prefix="gh_",
include={"repos_list_for_user", "repos_get", "issues_list"},
auth_header=("Authorization", f"Bearer {GITHUB_TOKEN}"),
tags=["github"],
)
provider.register(server)
server.run()
Each OpenAPI operation becomes an MCP tool that makes HTTP requests to the target API.
Spec loading options¶
# From a URL (requires httpx)
provider = OpenAPIProvider("https://api.example.com/openapi.json")
# From a local file (JSON or YAML)
provider = OpenAPIProvider("./specs/api.yaml")
# From a pre-parsed dict
spec = {"openapi": "3.0.0", "paths": {...}}
provider = OpenAPIProvider(spec)
Filtering operations¶
# Only include specific operations
provider = OpenAPIProvider(spec, include={"getUser", "listUsers"})
# Exclude specific operations
provider = OpenAPIProvider(spec, exclude={"deleteUser", "resetDatabase"})
Tool annotations¶
The provider automatically sets MCP tool annotations based on the HTTP method:
| HTTP method | read_only_hint |
destructive_hint |
idempotent_hint |
|---|---|---|---|
GET |
True |
False |
True |
POST |
False |
False |
False |
PUT |
False |
False |
True |
DELETE |
False |
True |
True |
Configuration¶
| Parameter | Default | Description |
|---|---|---|
spec |
(required) | URL, file path, or dict |
base_url |
From spec | Override API base URL |
prefix |
"" |
Prefix for tool names |
include |
All | Only these operation IDs |
exclude |
None | Skip these operation IDs |
auth_header |
None | (header_name, value) tuple |
tags |
[] |
Tags for all generated tools |
Batch Tool Calls¶
When you need it¶
An agent needs to fetch 20 user profiles. Without batching, that's 20 sequential MCP round-trips. With a batch tool, it's one request that executes all 20 in parallel.
register_batch_tool()¶
from promptise.mcp.server import MCPServer, register_batch_tool
server = MCPServer(name="user-api")
@server.tool()
async def get_user(user_id: str) -> dict:
"""Get a user by ID."""
return await db.get_user(user_id)
@server.tool()
async def get_team(team_id: str) -> dict:
"""Get a team by ID."""
return await db.get_team(team_id)
# Register the batch meta-tool
register_batch_tool(server, name="batch_call", max_parallel=10)
Agents call it like this:
{
"tool": "batch_call",
"arguments": {
"calls": [
{"tool": "get_user", "args": {"user_id": "u-001"}},
{"tool": "get_user", "args": {"user_id": "u-002"}},
{"tool": "get_team", "args": {"team_id": "t-100"}}
]
}
}
Response:
[
{"tool": "get_user", "status": "ok", "result": ["..."]},
{"tool": "get_user", "status": "ok", "result": ["..."]},
{"tool": "get_team", "status": "ok", "result": ["..."]}
]
Configuration¶
| Parameter | Default | Description |
|---|---|---|
name |
"batch_call" |
Name of the batch tool |
description |
"Execute multiple tool calls in parallel." |
Description |
max_parallel |
10 |
Maximum concurrent executions |
The total number of calls in a single batch is capped at max_parallel * 2. Failed individual calls return {"status": "error", "error": "..."} without failing the batch.
Streaming Results¶
When you need it¶
Your tool processes a large dataset or streams search results. Instead of accumulating everything in memory and returning a flat dict, you build up results incrementally and return them with metadata.
StreamingResult¶
from promptise.mcp.server import MCPServer, StreamingResult
server = MCPServer(name="search-api")
@server.tool()
async def search_products(
query: str,
max_results: int = 50,
) -> StreamingResult:
"""Search the product catalog.
Returns results as they're found, with search metadata.
"""
result = StreamingResult()
result.set_metadata("query", query)
async for hit in catalog.stream_search(query):
result.add({
"id": hit.id,
"name": hit.name,
"price": hit.price,
"score": hit.relevance_score,
})
if len(result) >= max_results:
break
result.set_metadata("total_available", await catalog.count(query))
return result
The framework serializes StreamingResult as JSON:
{
"items": [
{"id": "p-001", "name": "Widget", "price": 999, "score": 0.95},
{"id": "p-002", "name": "Gadget", "price": 1499, "score": 0.87}
],
"count": 2,
"metadata": {
"query": "electronics",
"total_available": 1250
}
}
Batch adding¶
result = StreamingResult()
# Add one at a time
result.add({"id": 1, "name": "Alice"})
# Add many at once
result.add_many([
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Charlie"},
])
# Attach metadata
result.set_metadata("source", "users_table")
result.set_metadata("cached", False)
len(result) # → 3
result.items # → [{"id": 1, ...}, {"id": 2, ...}, {"id": 3, ...}]
result.metadata # → {"source": "users_table", "cached": False}
Elicitation¶
When you need it¶
Your tool is about to deploy to production or delete data. You want to ask the user for confirmation during tool execution, not before.
Elicitor¶
from promptise.mcp.server import MCPServer, Elicitor, Depends
server = MCPServer(name="deploy-api")
@server.tool()
async def deploy_to_production(
service: str,
version: str,
elicit: Elicitor = Depends(Elicitor),
) -> dict:
"""Deploy a service version to production.
Asks the user for confirmation before proceeding.
"""
# Show what's about to happen and ask for confirmation
answer = await elicit.ask(
message=(
f"About to deploy {service} v{version} to production.\n"
f"Current production version: {await get_current_version(service)}\n"
f"Confirm deployment?"
),
schema={
"type": "object",
"properties": {
"confirm": {
"type": "boolean",
"description": "Set to true to proceed with deployment",
},
"reason": {
"type": "string",
"description": "Optional: reason for deployment",
},
},
"required": ["confirm"],
},
)
if not answer or not answer.get("confirm"):
return {"status": "cancelled", "reason": answer.get("reason", "User declined")}
deploy_id = await deploy(service, version)
return {"status": "deployed", "deploy_id": deploy_id, "version": version}
Graceful degradation¶
If the client doesn't support elicitation (not all MCP clients do), ask() returns None. Design your tools to handle this:
answer = await elicit.ask("Confirm deletion?", schema={...})
if answer is None:
# Client doesn't support elicitation — proceed with caution
# or return an error asking the user to confirm manually
return {"error": "This action requires confirmation. Please confirm and retry."}
Configuration¶
| Parameter | Default | Description |
|---|---|---|
timeout |
60.0 |
Seconds to wait for user response |
Sampling¶
When you need it¶
Your tool needs to call an LLM to process data — summarize text, classify content, extract entities — but the client controls which LLM to use. MCP sampling lets the server request completions through the client's model.
Sampler¶
from promptise.mcp.server import MCPServer, Sampler, Depends
server = MCPServer(name="content-api")
@server.tool()
async def summarize_document(
document_url: str,
sampler: Sampler = Depends(Sampler),
) -> dict:
"""Summarize a document using the client's LLM.
The server fetches the document; the client provides the model.
"""
content = await fetch_document(document_url)
summary = await sampler.create_message(
messages=[
{"role": "user", "content": f"Summarize this document in 3 bullet points:\n\n{content[:5000]}"},
],
max_tokens=500,
system="You are a professional document summarizer. Be concise and factual.",
temperature=0.3,
)
if summary is None:
return {"error": "Sampling not supported by client"}
return {
"url": document_url,
"summary": summary,
"model": "client-provided",
}
Parameters¶
result = await sampler.create_message(
messages=[{"role": "user", "content": "..."}],
max_tokens=1024, # Maximum tokens to generate
model="claude-sonnet-4-20250514", # Model hint (client may ignore)
system="You are...", # System prompt
temperature=0.7, # Sampling temperature
stop_sequences=["\n\n"], # Stop sequences
)
The client controls which model actually runs. The model parameter is a hint — the client can override it based on cost, availability, or policy.
Server Manifest¶
When you need it¶
You want agents to introspect your server — discover all tools, their schemas, tags, auth requirements, and rate limits — without calling each tool individually.
register_manifest()¶
from promptise.mcp.server import MCPServer, register_manifest
server = MCPServer(name="my-api", version="2.0.0")
@server.tool(tags=["math"], roles=["user"])
async def add(a: int, b: int) -> int:
"""Add two numbers."""
return a + b
@server.tool(tags=["math"], auth=True)
async def divide(a: float, b: float) -> float:
"""Divide two numbers."""
return a / b
# Auto-registers a docs://manifest resource
register_manifest(server)
Agents read the manifest via MCP's resource protocol:
{
"server": {"name": "my-api", "version": "2.0.0"},
"tools": [
{
"name": "add",
"description": "Add two numbers.",
"input_schema": {"type": "object", "properties": {"a": {...}, "b": {...}}},
"tags": ["math"],
"roles": ["user"]
},
{
"name": "divide",
"description": "Divide two numbers.",
"input_schema": {...},
"tags": ["math"],
"auth_required": true
}
],
"resources": [...],
"prompts": [...]
}
Programmatic access¶
from promptise.mcp.server import build_manifest
manifest = build_manifest(server)
print(manifest["tools"]) # All registered tools with metadata
Hot Reload¶
When you need it¶
You're developing an MCP server and want to see changes immediately without restarting manually. Hot reload watches your Python files and restarts the server when you save.
hot_reload()¶
from promptise.mcp.server import MCPServer, hot_reload
server = MCPServer(name="dev-server")
@server.tool()
async def greet(name: str) -> str:
"""Greet someone."""
return f"Hello, {name}!"
if __name__ == "__main__":
hot_reload(
server,
transport="http",
port=8080,
watch_dirs=["src/"], # Directories to watch
poll_interval=1.0, # Check every second
)
--- Starting server (pid will follow) ---
Watching: src/
Poll interval: 1.0s
MCP server running on http://127.0.0.1:8080/mcp
--- Detected changes in 1 file(s) ---
src/tools.py
--- Restarting server ---
How it works¶
- The parent process watches
*.pyfiles for modification time changes - When a change is detected, the child server process is terminated
- A new child process is started with the updated code
- If the server crashes on startup (syntax error), it waits for the next file change
Configuration¶
| Parameter | Default | Description |
|---|---|---|
transport |
"http" |
Transport for the child server |
host |
"127.0.0.1" |
Bind host |
port |
8080 |
Bind port |
watch_dirs |
["."] |
Directories to watch |
poll_interval |
1.0 |
Seconds between checks |
dashboard |
False |
Enable dashboard in child |
Development only
Hot reload is for development. In production, use a process manager like systemd, supervisord, or Kubernetes.
CLI Serve¶
When you need it¶
You've built an MCP server in a Python module but don't want to add if __name__ == "__main__" boilerplate. The CLI serve command runs any server from the command line.
Usage¶
# Run with stdio transport (default)
promptise serve myapp.server:server
# Run with HTTP transport
promptise serve myapp.server:server --transport http --port 8080
# With hot reload (development)
promptise serve myapp.server:server --transport http --reload
# With live dashboard
promptise serve myapp.server:server --transport http --dashboard
Target format¶
The target uses module:attribute format:
# module.path:attribute_name
promptise serve myapp.server:server # myapp/server.py → server variable
promptise serve myapp.api:app # myapp/api.py → app variable
promptise serve tools:tool_server # tools.py → tool_server variable
CLI options¶
| Flag | Default | Description |
|---|---|---|
--transport, -t |
stdio |
Transport: stdio, http, or sse |
--host |
127.0.0.1 |
Bind host |
--port, -p |
8080 |
Bind port |
--dashboard |
off | Enable live dashboard |
--reload |
off | Enable hot reload |
API Summary¶
| Symbol | Type | Description |
|---|---|---|
VersionedToolRegistry |
Class | Manage multiple tool versions |
ToolTransform |
Protocol | Interface for tool list transforms |
NamespaceTransform(prefix) |
Class | Prefix tool names |
VisibilityTransform(hidden) |
Class | Hide tools by predicate |
TagFilterTransform(required_tags) |
Class | Filter tools by tags |
mount(parent, child, prefix, tags) |
Function | Compose servers |
OpenAPIProvider(spec, ...) |
Class | Generate tools from OpenAPI |
register_batch_tool(server, ...) |
Function | Parallel batch execution |
StreamingResult |
Class | Incremental result collection |
Elicitor |
Class | Ask user for input mid-execution (via DI) |
Sampler |
Class | Request LLM completions from client (via DI) |
build_manifest(server) |
Function | Build server manifest dict |
register_manifest(server) |
Function | Register docs://manifest resource |
hot_reload(server, ...) |
Function | File-watching server restarter |
build_serve_parser(...) |
Function | CLI argument parser |
resolve_server(target) |
Function | Import server from module:attr |
run_serve(args) |
Function | Execute CLI serve command |
What's Next¶
- Deployment — HTTP transport, CORS, containers, reverse proxy
- Caching & Performance — Cache, rate limit, concurrency control
- Observability & Monitoring — Metrics, tracing, Prometheus, logging
- Resilience Patterns — Circuit breakers, health checks, webhooks