Event and Webhook Triggers¶
The EventTrigger, MessageTrigger, and WebhookTrigger provide event-driven activation for agent processes. EventTrigger listens to the framework's internal EventBus, MessageTrigger subscribes to MessageBroker topics, and WebhookTrigger exposes an HTTP endpoint for external systems.
from promptise.runtime import ProcessConfig, TriggerConfig
config = ProcessConfig(
model="openai:gpt-5-mini",
instructions="Handle incoming events and webhooks.",
triggers=[
TriggerConfig(type="event", event_type="pipeline.error"),
TriggerConfig(type="webhook", webhook_path="/events", webhook_port=9090),
TriggerConfig(type="message", topic="reports.daily"),
],
)
Concepts¶
All three trigger types use an internal asyncio.Queue to decouple event delivery from consumption. Events arriving faster than the agent can process them are buffered (up to the queue capacity). If the queue fills, events are dropped with a warning.
| Trigger | Source | Use Case |
|---|---|---|
EventTrigger |
Internal EventBus |
React to events from other agent processes |
MessageTrigger |
Internal MessageBroker |
Subscribe to topic-based message streams |
WebhookTrigger |
External HTTP POST | Receive events from external systems (CI/CD, monitoring, APIs) |
EventTrigger¶
Wraps an EventBus subscription and converts matching events into TriggerEvent objects.
Configuration¶
from promptise.runtime import TriggerConfig
# Listen for pipeline errors
TriggerConfig(type="event", event_type="pipeline.error")
# With source filtering
TriggerConfig(
type="event",
event_type="task.completed",
event_source="data-pipeline",
)
Direct instantiation¶
from promptise.runtime.triggers.event import EventTrigger
from promptise.runtime.events import EventBus
bus = EventBus()
trigger = EventTrigger(
bus,
"pipeline.error",
source_filter="data-pipeline",
trigger_id="pipe-err-listener",
)
await trigger.start()
event = await trigger.wait_for_next()
print(event.payload)
# {
# "event_type": "pipeline.error",
# "event_id": "...",
# "source": "data-pipeline",
# "data": {...},
# }
await trigger.stop()
Source filtering¶
When source_filter is set, only events from that specific source fire the trigger. Events from other sources are silently ignored. This is useful when multiple producers publish to the same event type.
Event payload¶
| Field | Description |
|---|---|
event_type |
The EventBus event type string |
event_id |
Unique event identifier |
source |
Event source |
data |
Event data dict |
MessageTrigger¶
Wraps a MessageBroker subscription for topic-based messaging.
Configuration¶
from promptise.runtime import TriggerConfig
# Subscribe to a specific topic
TriggerConfig(type="message", topic="reports.daily")
# Wildcard topics are supported by the broker
TriggerConfig(type="message", topic="reports.*")
Direct instantiation¶
from promptise.runtime.triggers.event import MessageTrigger
from promptise.runtime.broker import MessageBroker
broker = MessageBroker()
trigger = MessageTrigger(
broker,
"reports.daily",
trigger_id="daily-report-sub",
)
await trigger.start()
event = await trigger.wait_for_next()
print(event.payload)
# {
# "topic": "reports.daily",
# "message_id": "...",
# "sender": "report-generator",
# "content": "...",
# }
await trigger.stop()
Message payload¶
| Field | Description |
|---|---|
topic |
The topic the message arrived on |
message_id |
Unique message identifier |
sender |
Message sender |
content |
Message content |
WebhookTrigger¶
An aiohttp HTTP server that listens for incoming POST requests and converts them to TriggerEvent objects.
Configuration¶
from promptise.runtime import TriggerConfig
TriggerConfig(
type="webhook",
webhook_path="/events",
webhook_port=9090,
)
Direct instantiation¶
from promptise.runtime.triggers.webhook import WebhookTrigger
trigger = WebhookTrigger(
path="/webhook",
port=9090,
host="0.0.0.0",
)
await trigger.start()
# HTTP server now listening at http://0.0.0.0:9090/webhook
event = await trigger.wait_for_next()
print(event.payload) # POST body (JSON or text)
print(event.metadata) # {"method": "POST", "path": "/webhook", ...}
await trigger.stop()
How requests are handled¶
- A POST request arrives at the configured path.
- The body is parsed as JSON (falling back to plain text).
- A
TriggerEventis created with the body aspayload. - Request metadata (method, path, query params, safe headers) is stored in
metadata. - The response
202 Acceptedis returned with the event ID.
Sensitive headers (Authorization, Cookie, Set-Cookie) are automatically stripped from the metadata.
Health check endpoint¶
The webhook server also exposes GET /health:
Webhook payload¶
The payload field contains the raw POST body:
- If the body is valid JSON, it is stored as a dict.
- Otherwise, it is stored as a string.
Webhook metadata¶
| Field | Description |
|---|---|
method |
HTTP method (always POST) |
path |
Request path |
query |
Query parameters dict |
headers |
Safe headers (auth headers excluded) |
remote |
Client IP address |
Queue overflow¶
If the webhook queue (capacity: 1000) is full, the server returns 503 Service Unavailable:
Shared Patterns¶
Queue-based decoupling¶
All three triggers use asyncio.Queue to buffer events:
EventTrigger: maxsize=100MessageTrigger: maxsize=100WebhookTrigger: maxsize=1000
Graceful stop¶
When stop() is called, each trigger:
- Unsubscribes from its event source (EventBus, MessageBroker, or shuts down the HTTP server).
- Enqueues a sentinel event with
metadata={"_stop": True}. - Any
wait_for_next()call receiving the sentinel raisesasyncio.CancelledError.
API Summary¶
| Class | Description |
|---|---|
EventTrigger(event_bus, event_type, source_filter, trigger_id) |
EventBus trigger |
MessageTrigger(broker, topic, trigger_id) |
MessageBroker trigger |
WebhookTrigger(path, port, host) |
HTTP webhook trigger |
All implement the BaseTrigger protocol: start(), stop(), wait_for_next().
Tips and Gotchas¶
Use EventTrigger for inter-process coordination
When one process needs to trigger another, use EventTrigger with a shared EventBus. This is more efficient than webhooks for in-process communication.
Webhook security
The webhook server binds to 0.0.0.0 by default. In production, use a reverse proxy (nginx, Caddy) for TLS termination and authentication. The trigger itself does not perform any request authentication.
aiohttp shipped with base install
WebhookTrigger uses aiohttp, which is included in the base pip install promptise.
EventBus/Broker must be shared
For EventTrigger and MessageTrigger to work, the same EventBus or MessageBroker instance must be shared between the event producer and the trigger. Pass them to the AgentRuntime constructor.
Queue capacity is finite
If events arrive faster than the agent processes them, the queue will fill and events will be dropped. Monitor queue sizes via the dashboard or status() API. Increase ProcessConfig.concurrency for high-throughput scenarios.
What's Next¶
- Triggers Overview -- all trigger types and the base protocol
- Cron Trigger -- time-based scheduling
- File Watch Trigger -- filesystem monitoring
- Configuration -- full
TriggerConfigreference