Skip to content

Multi-Agent Coordination

Build systems where multiple agents collaborate — sharing tools, delegating tasks, communicating through events, and coordinating through shared state.

Architecture

Promptise provides four coordination primitives. Combine them to build any multi-agent topology.

┌─────────────────────────────────────────────────────────────┐
│                      AgentRuntime                           │
│                                                             │
│  ┌─────────┐  ask_peer()  ┌─────────┐  EventBus  ┌──────┐ │
│  │ Agent A  │────────────→│ Agent B  │←──────────→│Agent C│ │
│  │ research │             │ analysis │            │ write │ │
│  └────┬─────┘             └────┬─────┘            └───┬──┘ │
│       │                        │                      │     │
│       └────────────┬───────────┴──────────────────────┘     │
│                    │                                        │
│           ┌────────▼────────┐                               │
│           │  Shared MCP     │  ← Auth, rate limits,         │
│           │  Server         │     audit logging              │
│           └─────────────────┘                               │
└─────────────────────────────────────────────────────────────┘

1. Shared MCP Servers

Multiple agents connect to the same MCP server. The server enforces access policies per agent.

from promptise.mcp.server import MCPServer, JWTAuth, AuthMiddleware
from promptise.mcp.server.guards import HasRole

# Build a shared tool server
server = MCPServer("team-tools")
auth = JWTAuth(secret="shared-secret")
server.middleware = [AuthMiddleware(auth)]

@server.tool(guards=[HasRole("researcher")])
async def web_search(query: str, ctx=None) -> str:
    """Search the web. Only agents with 'researcher' role can call this."""
    return await search_engine.query(query)

@server.tool(guards=[HasRole("writer")])
async def publish(title: str, content: str, ctx=None) -> str:
    """Publish content. Only agents with 'writer' role can call this."""
    return await cms.publish(title, content)
from promptise import build_agent, HttpServerSpec

# Each agent connects to the same server with different roles
researcher = await build_agent(
    model="openai:gpt-5-mini",
    servers={"tools": HttpServerSpec(
        url="http://localhost:8080",
        bearer_token=jwt_for_role("researcher"),
    )},
)

writer = await build_agent(
    model="openai:gpt-5-mini",
    servers={"tools": HttpServerSpec(
        url="http://localhost:8080",
        bearer_token=jwt_for_role("writer"),
    )},
)

2. Cross-Agent Delegation

One agent delegates a subtask to another and awaits the result. Uses HTTP + JWT authentication.

from promptise import build_agent

analyst = await build_agent(
    model="openai:gpt-5-mini",
    servers=my_servers,
    cross_agents={
        "researcher": "http://localhost:9001",
        "fact_checker": "http://localhost:9002",
    },
)

# In conversation, the analyst can delegate:
# "Ask the researcher to find recent papers on quantum computing"
# → analyst calls ask_peer("researcher", "Find recent papers on...")
# → researcher processes the request and returns findings
# → analyst continues with the results

Programmatic delegation:

# ask_peer — single agent, await response
answer = await analyst.ask_peer(
    "researcher",
    "Find the top 3 papers on transformer architectures from 2025",
)

# broadcast — multiple agents, await all responses (with timeout)
answers = await analyst.broadcast(
    ["researcher", "fact_checker"],
    "Verify this claim: GPT-5 has 1 trillion parameters",
    timeout=30.0,  # Seconds — graceful degradation if a peer is slow
)
# answers = {"researcher": "...", "fact_checker": "..."}

3. EventBus Messaging

Decoupled pub/sub communication. Agents publish events; others subscribe by topic. No direct coupling.

from promptise.runtime import AgentRuntime, AgentProcess
from promptise.runtime.triggers import EventTrigger, MessageTrigger

runtime = AgentRuntime()

# Research agent publishes findings
research_process = AgentProcess(
    name="researcher",
    agent_config={
        "model": "openai:gpt-5-mini",
        "servers": research_servers,
        "instructions": "Research topics and publish findings as events.",
    },
)

# Analysis agent subscribes to research findings
analysis_process = AgentProcess(
    name="analyst",
    agent_config={
        "model": "openai:gpt-5-mini",
        "servers": analysis_servers,
        "instructions": "Analyze research findings and produce reports.",
    },
    triggers=[
        # Fires when researcher publishes a "research.complete" event
        EventTrigger(event_type="research.complete"),
    ],
)

# Writer agent subscribes to analysis results via topic
writer_process = AgentProcess(
    name="writer",
    agent_config={
        "model": "openai:gpt-5-mini",
        "servers": writing_servers,
        "instructions": "Write reports from analysis results.",
    },
    triggers=[
        # Fires on any message to the "reports" topic
        MessageTrigger(topic="reports.*"),
    ],
)

runtime.add_process(research_process)
runtime.add_process(analysis_process)
runtime.add_process(writer_process)
await runtime.start_all()

# Kick off the pipeline
await runtime.event_bus.emit("research.start", {
    "topic": "AI safety trends 2025",
})

4. Shared Context & State

Agents in the same runtime can share state through AgentContext with write permissions.

from promptise.runtime.context import AgentContext

# Create shared context with controlled write access
shared_ctx = AgentContext(
    initial_state={"project": "quarterly-report", "status": "in_progress"},
    write_permissions={
        "researcher": ["findings", "sources"],
        "analyst": ["analysis", "recommendations"],
        "writer": ["draft", "status"],
    },
)

# Researcher writes findings (allowed)
shared_ctx.set("findings", research_data, writer="researcher")

# Analyst reads findings, writes analysis (allowed)
findings = shared_ctx.get("findings")
shared_ctx.set("analysis", analysis_data, writer="analyst")

# Writer reads everything, writes draft (allowed)
shared_ctx.set("draft", final_draft, writer="writer")
shared_ctx.set("status", "complete", writer="writer")

# Researcher tries to write draft (denied — not in write_permissions)
# shared_ctx.set("draft", "...", writer="researcher")  # Raises PermissionError

5. Complete Example: Research Pipeline

Three agents collaborate on a research task: researcher gathers data, analyst evaluates quality, writer produces the report.

import asyncio
from promptise import build_agent, StdioServerSpec
from promptise.runtime import AgentRuntime, AgentProcess
from promptise.runtime.triggers import EventTrigger
from promptise.runtime.journal import FileJournal

async def main():
    runtime = AgentRuntime()

    # Shared MCP server for all agents
    shared_servers = {
        "tools": StdioServerSpec(command="python", args=["-m", "team_tools"]),
    }

    # 1. Researcher — triggered manually or on schedule
    runtime.add_process(AgentProcess(
        name="researcher",
        agent_config={
            "model": "openai:gpt-5-mini",
            "servers": shared_servers,
            "instructions": (
                "Research the given topic thoroughly. Use web_search and "
                "document_search tools. When done, summarize your findings."
            ),
        },
        journal=FileJournal("./journals/researcher"),
    ))

    # 2. Analyst — triggered when researcher publishes findings
    runtime.add_process(AgentProcess(
        name="analyst",
        agent_config={
            "model": "openai:gpt-5-mini",
            "servers": shared_servers,
            "instructions": (
                "Evaluate the research findings for accuracy and completeness. "
                "Rate quality 1-5. If below 3, request more research."
            ),
        },
        triggers=[EventTrigger(event_type="research.complete")],
        journal=FileJournal("./journals/analyst"),
    ))

    # 3. Writer — triggered when analysis passes quality gate
    runtime.add_process(AgentProcess(
        name="writer",
        agent_config={
            "model": "openai:gpt-5-mini",
            "servers": shared_servers,
            "instructions": (
                "Write a clear, well-structured report from the research "
                "findings and analysis. Cite sources."
            ),
        },
        triggers=[EventTrigger(event_type="analysis.approved")],
        journal=FileJournal("./journals/writer"),
    ))

    await runtime.start_all()

    # Kick off the pipeline
    await runtime.inject_event("researcher", {
        "type": "research.start",
        "topic": "Impact of AI on software development productivity",
    })

    # Wait for completion (in production, use webhooks or polling)
    await asyncio.sleep(120)
    await runtime.stop_all()

asyncio.run(main())

Patterns

Fan-out / Fan-in

Multiple agents work on subtasks in parallel, results are merged.

# Coordinator broadcasts subtasks
answers = await coordinator.broadcast(
    ["legal_reviewer", "technical_reviewer", "business_reviewer"],
    f"Review this proposal: {proposal}",
    timeout=60.0,
)
# Merge results from all reviewers
combined = "\n\n".join(f"**{name}**: {answer}" for name, answer in answers.items())

Supervisor Pattern

One agent oversees others, delegating and checking quality.

supervisor = await build_agent(
    model="openai:gpt-5-mini",
    servers=my_servers,
    cross_agents={
        "researcher": "http://localhost:9001",
        "writer": "http://localhost:9002",
        "editor": "http://localhost:9003",
    },
    instructions=(
        "You are a project supervisor. Delegate research to the researcher, "
        "writing to the writer, and editing to the editor. Check quality "
        "at each stage. If quality is below standard, send back for revision."
    ),
)

Pipeline with Quality Gates

Sequential processing with validation between stages.

from promptise.runtime.triggers import EventTrigger

# Each agent listens for the previous stage's completion event
stages = [
    ("data_collector", "collect.complete"),
    ("data_cleaner", "clean.complete"),
    ("analyzer", "analyze.complete"),
    ("reporter", "report.complete"),
]

for name, trigger_event in stages:
    runtime.add_process(AgentProcess(
        name=name,
        agent_config=configs[name],
        triggers=[EventTrigger(event_type=trigger_event)] if trigger_event != "collect.complete" else [],
    ))

Error Handling

Multi-agent systems need graceful degradation:

# Cross-agent delegation with timeout and fallback
try:
    answer = await agent.ask_peer("specialist", question, timeout=30.0)
except TimeoutError:
    # Peer didn't respond — handle locally
    answer = await agent.ainvoke({"messages": [{"role": "user", "content": question}]})

# Broadcast with partial failure tolerance
answers = await agent.broadcast(
    ["agent_a", "agent_b", "agent_c"],
    question,
    timeout=30.0,
)
# answers may have fewer keys than requested if some agents failed
successful = {k: v for k, v in answers.items() if v is not None}

When to Use What

Pattern Use When
Shared MCP Server Agents need the same tools with different permissions
ask_peer() One agent needs a specific answer from another
broadcast() Multiple agents should process the same input in parallel
EventBus Loose coupling — agents react to events without knowing who produces them
Shared Context Agents need to read/write a common state object
External Orchestration Complex DAG workflows, human-in-the-loop approval chains, or task decomposition beyond what EventBus provides