Skip to content

Runtime API Reference

Agent process lifecycle, triggers, governance (budget, health, mission, secrets), journals, distributed coordination, and the orchestration API.

Core

AgentProcess

promptise.runtime.process.AgentProcess

Lifecycle container for a long-running agent process.

Parameters:

Name Type Description Default
name str

Unique process name.

required
config ProcessConfig

Process configuration.

required
process_id str | None

Unique ID (auto-generated if not provided).

None
event_bus Any | None

Optional shared EventBus for inter-process events.

None
broker Any | None

Optional shared MessageBroker for message triggers.

None
runtime Any | None

Optional parent :class:AgentRuntime reference (enables the spawn_process meta-tool in open mode).

None
context property

The unified agent context.

lifecycle property

The lifecycle state machine (for inspection).

state property

Current process state.

ask(content, *, sender_id=None, timeout=120) async

Ask the agent a question and wait for the response.

The question is delivered to the agent on its next invocation. This method blocks until the agent responds or the timeout expires.

Parameters:

Name Type Description Default
content str

The question text.

required
sender_id str | None

Who is asking.

None
timeout float

Maximum seconds to wait for an answer.

120

Returns:

Name Type Description
An Any

class:InboxResponse with the agent's answer.

Raises:

Type Description
RuntimeError

If inbox is not enabled.

TimeoutError

If timeout expires.

inject(event) async

Manually inject a trigger event into the queue.

Parameters:

Name Type Description Default
event TriggerEvent

The trigger event to inject.

required
resume() async

Resume from SUSPENDED or AWAITING state.

rollback() async

Revert to the original configuration.

Clears all dynamic state (instructions, tools, servers, triggers) and rebuilds the agent from the original config.

Returns:

Type Description
str

Status message.

Raises:

Type Description
RuntimeError

If called in strict mode.

send_message(content, *, message_type='context', priority='normal', sender_id=None, ttl=None, metadata=None) async

Send a human message to the running agent.

The agent sees the message as additional context on its next invocation cycle.

Parameters:

Name Type Description Default
content str

Message text.

required
message_type str

One of "directive", "context", "question", "correction".

'context'
priority str

"low", "normal", "high", "critical".

'normal'
sender_id str | None

Who sent it (for audit trail).

None
ttl float | None

Time-to-live in seconds.

None
metadata dict[str, Any] | None

Custom data.

None

Returns:

Type Description
str

The message ID.

Raises:

Type Description
RuntimeError

If inbox is not enabled.

ValueError

If rate limit exceeded.

start() async

Build agent, start triggers, begin processing.

Transitions: CREATED → STARTING → RUNNING (or STOPPED/FAILED → STARTING → RUNNING on restart).

Raises:

Type Description
ProcessStateError

If the transition is invalid.

status()

Serializable status snapshot.

Returns:

Type Description
dict[str, Any]

Dict with process name, state, counters, uptime, etc.

stop() async

Gracefully stop: cancel workers, stop triggers, shutdown agent.

Transitions: * → STOPPING → STOPPED

If the process is in FAILED state, cleanup is performed and the state transitions to FAILED → STARTING is not attempted; instead we go straight to STOPPED via internal reset.

suspend() async

Pause processing without tearing down the agent.

Triggers continue to fire but events are queued, not processed.

AgentRuntime

promptise.runtime.runtime.AgentRuntime

Multi-process agent runtime manager.

Manages a dictionary of :class:AgentProcess instances with shared resources and coordinated lifecycle.

Parameters:

Name Type Description Default
config RuntimeConfig | None

Optional :class:RuntimeConfig for global settings.

None
event_bus Any | None

Optional shared EventBus for inter-process events.

None
broker Any | None

Optional shared MessageBroker for message triggers.

None
processes property

Read-only view of registered processes.

add_process(name, config, *, process_id=None) async

Register a new agent process.

Parameters:

Name Type Description Default
name str

Unique process name.

required
config ProcessConfig

Process configuration.

required
process_id str | None

Optional explicit process ID.

None

Returns:

Type Description
AgentProcess

The created :class:AgentProcess.

Raises:

Type Description
RuntimeBaseError

If a process with this name already exists.

from_config(config, *, event_bus=None, broker=None, event_notifier=None) async classmethod

Create a runtime from a :class:RuntimeConfig with all processes pre-registered.

Parameters:

Name Type Description Default
config RuntimeConfig

Runtime configuration.

required
event_bus Any | None

Optional shared EventBus.

None
broker Any | None

Optional shared MessageBroker.

None
event_notifier Any | None

Optional :class:EventNotifier for all processes.

None

Returns:

Type Description
AgentRuntime

A new :class:AgentRuntime with processes registered.

get_process(name)

Get a process by name.

Parameters:

Name Type Description Default
name str

Process name.

required

Returns:

Name Type Description
The AgentProcess

class:AgentProcess.

Raises:

Type Description
KeyError

If the process does not exist.

list_processes()

List all processes with summary info.

Returns:

Type Description
list[dict[str, Any]]

List of dicts with name, state, and uptime for each process.

load_directory(directory) async

Load all .agent files from a directory.

Parameters:

Name Type Description Default
directory str | Path

Directory containing .agent files.

required

Returns:

Type Description
list[str]

List of process names that were loaded.

Raises:

Type Description
ManifestError

If any manifest fails to load.

load_manifest(path, *, name_override=None) async

Load and register a process from a .agent manifest.

Parameters:

Name Type Description Default
path str | Path

Path to the .agent file.

required
name_override str | None

Override the process name from the manifest.

None

Returns:

Type Description
AgentProcess

The created :class:AgentProcess.

Raises:

Type Description
ManifestError

If the manifest cannot be loaded.

process_status(name)

Status for a single process.

Parameters:

Name Type Description Default
name str

Process name.

required

Returns:

Type Description
dict[str, Any]

Process status dict.

Raises:

Type Description
KeyError

If the process does not exist.

remove_process(name) async

Remove a process (stopping it first if running).

Parameters:

Name Type Description Default
name str

Process name to remove.

required

Raises:

Type Description
KeyError

If the process does not exist.

restart_process(name) async

Restart a single process (stop then start).

Parameters:

Name Type Description Default
name str

Process name.

required

Raises:

Type Description
KeyError

If the process does not exist.

start_all() async

Start all registered processes.

start_process(name) async

Start a single process.

Parameters:

Name Type Description Default
name str

Process name.

required

Raises:

Type Description
KeyError

If the process does not exist.

status()

Global runtime status.

Returns:

Type Description
dict[str, Any]

Dict with process count and per-process status.

stop_all() async

Stop all running processes.

stop_process(name) async

Stop a single process.

Parameters:

Name Type Description Default
name str

Process name.

required

Raises:

Type Description
KeyError

If the process does not exist.

AgentContext

promptise.runtime.context.AgentContext

Unified context layer for an agent process.

Provides:

  • .get() / .put() — persistent key-value state with audit trail
  • .memory — optional :class:~promptise.memory.MemoryProvider
  • .env — filtered environment variables
  • .files — mounted file paths

Parameters:

Name Type Description Default
writable_keys list[str] | None

Keys the agent is allowed to write to. An empty list means all keys are writable.

None
memory_provider Any | None

Optional :class:~promptise.memory.MemoryProvider instance.

None
file_mounts dict[str, str] | None

Mapping of logical name → filesystem path.

None
env_prefix str

Only expose env vars starting with this prefix.

'AGENT_'
initial_state dict[str, Any] | None

Pre-populated key-value state.

None
env property

Filtered snapshot of environment variables.

Only variables whose name starts with :attr:env_prefix are included. The prefix is not stripped from the key.

files property

Mapping of logical mount name → filesystem path.

memory property

The underlying :class:~promptise.memory.MemoryProvider, or None.

add_memory(content, *, metadata=None) async

Store a new memory.

Parameters:

Name Type Description Default
content str

Text content to store.

required
metadata dict[str, Any] | None

Optional metadata.

None

Returns:

Type Description
str | None

Memory ID string, or None if no provider is configured.

clear_state()

Remove all state and history.

delete_memory(memory_id) async

Delete a memory by ID.

Returns:

Type Description
bool

True if deleted, False if no provider or not found.

from_dict(data, *, memory_provider=None) classmethod

Reconstruct an :class:AgentContext from a serialized dict.

Parameters:

Name Type Description Default
data dict[str, Any]

Dict produced by :meth:to_dict.

required
memory_provider Any | None

Optional provider to re-attach.

None

Returns:

Name Type Description
Restored AgentContext

class:AgentContext instance.

get(key, default=None)

Read a value from the state store.

Parameters:

Name Type Description Default
key str

State key.

required
default Any

Value to return if key is absent.

None

Returns:

Type Description
Any

The stored value or default.

put(key, value, *, source='agent')

Write a value to the state store.

Parameters:

Name Type Description Default
key str

State key.

required
value Any

Value to store.

required
source str

Origin of the write.

'agent'

Raises:

Type Description
KeyError

If key is not in the writable keys set (when writable keys are configured).

search_memory(query, *, limit=5, min_score=0.0) async

Search semantic memory for relevant context.

Parameters:

Name Type Description Default
query str

Search query text.

required
limit int

Max results to return.

5
min_score float

Min relevance score (0.0–1.0).

0.0

Returns:

Type Description
list[Any]

List of :class:~promptise.memory.MemoryResult objects,

list[Any]

or empty list if no provider is configured.

state_history(key)

Return the audit trail for a given key.

Parameters:

Name Type Description Default
key str

State key.

required

Returns:

Type Description
list[StateEntry]

Chronological list of :class:StateEntry records.

state_keys()

Return all keys in the current state.

state_snapshot()

Return a shallow copy of the current state.

to_dict()

Serialize context state for checkpointing or distribution.

Note

The memory provider is not serialized — it must be re-attached when restoring from a checkpoint.

StateEntry

promptise.runtime.context.StateEntry dataclass

Audit record for a single state write.

Attributes:

Name Type Description
key str

The key that was written.

value Any

The value that was stored.

timestamp float

Unix timestamp of the write.

source str

Origin of the write ("system", "agent", "trigger", etc.).

from_dict(data) classmethod

Deserialize from a dict.

to_dict()

Serialize to a JSON-compatible dict.

ConversationBuffer

promptise.runtime.conversation.ConversationBuffer dataclass

Rolling buffer of conversation messages (short-term memory).

Maintains the last max_messages message exchanges across invocations within a single process lifecycle. Oldest messages are evicted when the buffer is full.

The buffer is safe for concurrent async access: the async helpers :meth:async_snapshot and :meth:async_replace use an internal :class:asyncio.Lock. The synchronous methods (:meth:append, :meth:get_messages, etc.) are lock-free and intended for single- threaded or test use.

Parameters:

Name Type Description Default
max_messages int

Maximum messages to retain. Oldest are evicted.

100
append(message)

Add a single message to the buffer.

Parameters:

Name Type Description Default
message dict[str, Any]

Message dict with role and content keys.

required
async_append(message) async

Thread-safe append (async).

Parameters:

Name Type Description Default
message dict[str, Any]

Message dict to append.

required
async_replace(messages) async

Thread-safe replacement of the entire buffer (async).

Parameters:

Name Type Description Default
messages list[dict[str, Any]]

New message list.

required
async_snapshot() async

Thread-safe snapshot of all messages (async).

Returns:

Type Description
list[dict[str, Any]]

Copy of the current message list.

clear()

Clear all messages from the buffer.

extend(messages)

Add multiple messages to the buffer.

Parameters:

Name Type Description Default
messages list[dict[str, Any]]

List of message dicts.

required
from_dict(data) classmethod

Reconstruct from a serialized dict.

Parameters:

Name Type Description Default
data dict[str, Any]

Dict produced by :meth:to_dict.

required

Returns:

Name Type Description
Restored ConversationBuffer

class:ConversationBuffer instance.

get_messages()

Return all buffered messages as a list.

Returns:

Type Description
list[dict[str, Any]]

Ordered list of message dicts (oldest first).

set_messages(messages)

Replace the buffer contents.

Used during hot-reload to preserve conversation across agent rebuilds.

Parameters:

Name Type Description Default
messages list[dict[str, Any]]

New message list to set.

required
to_dict()

Serialize for journal checkpointing.

Returns:

Type Description
dict[str, Any]

Dict with max_messages and messages keys.


Configuration

ProcessConfig

promptise.runtime.config.ProcessConfig

Bases: BaseModel

Configuration for a single agent process.

Composes all atomic configs and adds process-level settings.

Attributes:

Name Type Description
model str

LLM model identifier (e.g. openai:gpt-5-mini).

instructions str | None

System prompt for the agent.

servers dict[str, Any]

MCP server specifications (same format as .superagent).

triggers list[TriggerConfig]

Trigger configurations.

journal JournalConfig

Journal configuration.

context ContextConfig

AgentContext configuration.

concurrency int

Max concurrent trigger invocations.

heartbeat_interval float

Heartbeat period in seconds.

idle_timeout float

Seconds of inactivity before suspending (0 = never).

max_lifetime float

Max process lifetime in seconds (0 = unlimited).

max_consecutive_failures int

Consecutive failures before FAILED state.

restart_policy Literal['always', 'on_failure', 'never']

When to restart a failed process.

max_restarts int

Max restart attempts (for on_failure / always).

RuntimeConfig

promptise.runtime.config.RuntimeConfig

Bases: BaseModel

Top-level runtime configuration (composite).

Aggregates per-process configs and global settings.

Attributes:

Name Type Description
processes dict[str, ProcessConfig]

Named process configurations.

distributed DistributedConfig

Distributed coordination settings.

from_dict(data) classmethod

Deserialize from a dict.

to_dict()

Serialize to a JSON-compatible dict.

DistributedConfig

promptise.runtime.config.DistributedConfig

Bases: BaseModel

Distributed runtime coordination configuration.

Attributes:

Name Type Description
enabled bool

Enable distributed mode.

coordinator_url str | None

URL of the coordinator node.

transport_port int

Port for the management HTTP transport.

discovery_method Literal['registry', 'multicast']

How runtime nodes discover each other.

heartbeat_interval float

Health check interval for node monitoring.

ContextConfig

promptise.runtime.config.ContextConfig

Bases: BaseModel

AgentContext configuration.

Attributes:

Name Type Description
writable_keys list[str]

State keys the agent is allowed to write to. Empty list means all keys are writable.

memory_provider Literal['in_memory', 'chroma', 'mem0'] | None

Memory provider type (or None to disable).

memory_max int

Max memories to inject per invocation.

memory_min_score float

Min relevance score for memory injection.

memory_auto_store bool

Automatically store exchanges in long-term memory.

memory_collection str

Collection name for ChromaDB backend.

memory_persist_directory str | None

Persist directory for ChromaDB.

memory_user_id str

User ID for Mem0 scoping.

conversation_max_messages int

Max messages in conversation buffer (short-term memory).

file_mounts dict[str, str]

Mapping of logical name → filesystem path.

env_prefix str

Only expose environment variables with this prefix.

initial_state dict[str, Any]

Pre-populated key-value state.

ExecutionMode

promptise.runtime.config.ExecutionMode

Bases: str, Enum

Agent execution mode.

Attributes:

Name Type Description
STRICT

Agent cannot modify itself. Current behavior, default.

OPEN

Agent can adapt identity, memory, tools, and triggers at runtime — a self-evolving, autonomous agent.

OpenModeConfig

promptise.runtime.config.OpenModeConfig

Bases: BaseModel

Guardrails for open execution mode.

Controls what the agent is allowed to self-modify and how far. Only takes effect when ProcessConfig.execution_mode is :attr:ExecutionMode.OPEN.

Attributes:

Name Type Description
allow_identity_change bool

Agent can modify its own instructions.

allow_tool_creation bool

Agent can define new Python tools.

allow_mcp_connect bool

Agent can connect to new MCP servers.

allow_trigger_management bool

Agent can add/remove triggers.

allow_memory_management bool

Agent can explicitly store/search/forget.

max_custom_tools int

Max number of agent-created tools.

max_dynamic_triggers int

Max number of dynamically added triggers.

max_instruction_length int

Max character length for modified instructions.

max_rebuilds int | None

Max agent rebuilds per lifetime (None = unlimited).

allowed_mcp_urls list[str]

Whitelist of MCP server URLs (empty = any).

sandbox_custom_tools bool

Execute agent-written tools in sandbox.

allow_process_spawn bool

Agent can spawn new processes within the runtime. Defaults to False.

max_spawned_processes int

Max number of processes this agent can spawn.

EscalationTarget

promptise.runtime.config.EscalationTarget

Bases: BaseModel

Where to send escalation notifications.

Attributes:

Name Type Description
webhook_url str | None

HTTP endpoint to POST a JSON payload to.

event_type str | None

EventBus event type to emit.


Lifecycle

ProcessState

promptise.runtime.lifecycle.ProcessState

Bases: str, Enum

Agent process lifecycle states.

ProcessLifecycle

promptise.runtime.lifecycle.ProcessLifecycle

Thread-safe state machine for process lifecycle.

Tracks the current state, validates transitions against :data:VALID_TRANSITIONS, and records an audit trail of all transitions.

Parameters:

Name Type Description Default
initial ProcessState

Starting state (default :attr:ProcessState.CREATED).

CREATED

Example::

lc = ProcessLifecycle()
assert lc.state == ProcessState.CREATED

await lc.transition(ProcessState.STARTING, reason="user requested")
await lc.transition(ProcessState.RUNNING)

assert len(lc.history) == 2
history property

Chronological list of all transitions.

state property

Current process state.

can_transition(target)

Check whether transitioning to target is valid.

from_snapshot(data) classmethod

Reconstruct a :class:ProcessLifecycle from a snapshot dict.

snapshot()

Serializable snapshot of current state and full history.

transition(target, *, reason='', metadata=None) async

Attempt a state transition.

Parameters:

Name Type Description Default
target ProcessState

Desired next state.

required
reason str

Human-readable explanation.

''
metadata dict[str, Any] | None

Extra context to store with the transition.

None

Returns:

Type Description
ProcessTransition

The recorded :class:ProcessTransition.

Raises:

Type Description
StateError

If the transition is not valid from the current state.

ProcessTransition

promptise.runtime.lifecycle.ProcessTransition dataclass

Record of a single state transition.

Attributes:

Name Type Description
from_state ProcessState

Previous state.

to_state ProcessState

New state.

timestamp datetime

When the transition occurred (UTC).

reason str

Human-readable reason for the transition.

metadata dict[str, Any]

Additional context (e.g. error details).

from_dict(data) classmethod

Deserialize from a dict.

to_dict()

Serialize to a JSON-compatible dict.


Triggers

TriggerConfig

promptise.runtime.config.TriggerConfig

Bases: BaseModel

Configuration for a single trigger.

Each trigger type requires a different subset of fields. The :meth:_validate_trigger_fields model validator enforces this for built-in types. Custom trigger types use the custom_config dict.

Attributes:

Name Type Description
type str

Trigger type (built-in or custom-registered via :func:~promptise.runtime.triggers.register_trigger_type).

cron_expression str | None

Cron expression (cron type only).

webhook_path str

URL path for the webhook endpoint (webhook only).

webhook_port int

Listening port (webhook only).

watch_path str | None

Directory to watch (file_watch only).

watch_patterns list[str]

Glob patterns (file_watch only).

watch_events list[str]

Filesystem events to react to (file_watch only).

event_type str | None

EventBus event type string (event only).

event_source str | None

Optional source filter (event only).

topic str | None

MessageBroker topic (message only).

custom_config dict[str, Any]

Additional key-value configuration for custom trigger types.

filter_expression str | None

Cheap pre-filter (all types, optional).

BaseTrigger

promptise.runtime.triggers.base.BaseTrigger

Bases: Protocol

Protocol for all trigger implementations.

Triggers produce :class:TriggerEvent objects that cause the agent process to invoke the agent. They run as background tasks and must be safely cancellable.

start() async

Start listening for trigger conditions.

stop() async

Stop the trigger and release resources.

wait_for_next() async

Block until the next trigger event occurs.

Returns:

Type Description
TriggerEvent

The next :class:TriggerEvent.

Raises:

Type Description
CancelledError

If the trigger is stopped.

TriggerError

If the trigger encounters a fatal error.

TriggerEvent

promptise.runtime.triggers.base.TriggerEvent dataclass

Event produced by a trigger.

Attributes:

Name Type Description
trigger_id str

Which trigger produced this event.

trigger_type str

Type of the trigger (cron, webhook, etc.).

event_id str

Unique event identifier.

timestamp datetime

When the event was produced (UTC).

payload dict[str, Any]

Trigger-specific data (cron time, webhook body, file path, event data, message content).

metadata dict[str, Any]

Additional context.

from_dict(data) classmethod

Deserialize from a dict.

to_dict()

Serialize to a JSON-compatible dict.

CronTrigger

promptise.runtime.triggers.cron.CronTrigger

Fires at scheduled intervals defined by a cron expression.

Parameters:

Name Type Description Default
cron_expression str

Standard cron expression (e.g. */5 * * * *).

required
trigger_id str | None

Unique identifier (auto-generated if not provided).

None
start() async

Mark the trigger as active.

stop() async

Mark the trigger as inactive and unblock waiters.

wait_for_next() async

Block until the next scheduled time.

Uses self._event to allow :meth:stop to unblock the wait immediately instead of sleeping for the full delay.

Returns:

Name Type Description
A TriggerEvent

class:TriggerEvent with the scheduled time in the payload.

Raises:

Type Description
CancelledError

If stopped while waiting.

TriggerError

If the cron expression is invalid.

WebhookTrigger

promptise.runtime.triggers.webhook.WebhookTrigger

HTTP webhook trigger.

Starts an aiohttp server that listens for incoming POST requests and converts them to :class:TriggerEvent objects.

Parameters:

Name Type Description Default
path str

URL path to listen on (e.g. "/webhook").

'/webhook'
port int

TCP port to bind to.

9090
host str

Host/IP to bind to. Defaults to "127.0.0.1" (loopback only). Set to "0.0.0.0" for multi-node deployments where external services or other nodes need to reach this webhook.

'127.0.0.1'
hmac_secret str | None

Optional HMAC secret for signature verification. When set, incoming requests must include a valid X-Webhook-Signature header containing sha256=<hex-digest> computed over the raw request body. Verification uses timing-safe comparison. Strongly recommended when host is "0.0.0.0".

None
start() async

Start the webhook HTTP server.

stop() async

Stop the webhook HTTP server.

wait_for_next() async

Wait for the next webhook event.

Returns:

Name Type Description
A TriggerEvent

class:TriggerEvent with the request body as payload.

Raises:

Type Description
CancelledError

If the wait is cancelled.

FileWatchTrigger

promptise.runtime.triggers.file_watch.FileWatchTrigger

Filesystem watch trigger.

Monitors a directory for file changes and produces :class:TriggerEvent objects.

Parameters:

Name Type Description Default
watch_path str

Directory to monitor.

required
patterns list[str] | None

Glob patterns to match (e.g. ["*.csv", "*.json"]).

None
events list[str] | None

Event types to react to.

None
recursive bool

Watch subdirectories recursively.

True
debounce_seconds float

Debounce interval to avoid duplicate events.

0.5
poll_interval float

Polling interval in seconds (used when watchdog is not available).

1.0
start() async

Start watching for file changes.

stop() async

Stop watching for file changes.

wait_for_next() async

Wait for the next file change event.

Returns:

Name Type Description
A TriggerEvent

class:TriggerEvent with file change details.

Raises:

Type Description
CancelledError

If the wait is cancelled.

EventTrigger

promptise.runtime.triggers.event.EventTrigger

Trigger that fires when an EventBus event matches.

Wraps an EventBus subscription and converts matching events into :class:TriggerEvent objects via an internal queue.

Parameters:

Name Type Description Default
event_bus Any

An object with subscribe(event_type, handler) and unsubscribe(event_type, handler) methods.

required
event_type str

Event type string to listen for (e.g. "task.completed").

required
source_filter str | None

Optional source filter — only events from this source will fire the trigger.

None
trigger_id str | None

Unique identifier (auto-generated if not provided).

None
start() async

Subscribe to the EventBus.

stop() async

Unsubscribe and drain the queue.

wait_for_next() async

Block until a matching event arrives.

Raises:

Type Description
CancelledError

If the trigger is stopped.

MessageTrigger

promptise.runtime.triggers.event.MessageTrigger

Trigger that fires when a message arrives on a broker topic.

Wraps a message broker subscription.

Parameters:

Name Type Description Default
broker Any

An object with subscribe(topic, handler) and unsubscribe(topic, subscription_id) methods.

required
topic str

Topic to subscribe to (supports wildcards like reports.*).

required
trigger_id str | None

Unique identifier (auto-generated if not provided).

None
start() async

Subscribe to the MessageBroker topic.

stop() async

Unsubscribe from the topic.

wait_for_next() async

Block until a message arrives on the topic.

Raises:

Type Description
CancelledError

If the trigger is stopped.

create_trigger

promptise.runtime.triggers.create_trigger(config, *, event_bus=None, broker=None)

Factory: create a trigger from a :class:TriggerConfig.

Looks up the trigger type in the registry (built-in and custom types registered via :func:register_trigger_type).

Parameters:

Name Type Description Default
config TriggerConfig

Trigger configuration.

required
event_bus Any | None

Required for event type triggers.

None
broker Any | None

Required for message type triggers.

None

Returns:

Type Description
BaseTrigger

A trigger instance satisfying :class:BaseTrigger.

Raises:

Type Description
TriggerError

If the trigger type is unknown or dependencies are missing.

register_trigger_type

promptise.runtime.triggers.register_trigger_type(type_name, factory, *, overwrite=False)

Register a custom trigger type.

After registration, the type name can be used in :class:~promptise.runtime.config.TriggerConfig and :func:create_trigger.

Parameters:

Name Type Description Default
type_name str

The type string used in TriggerConfig.type.

required
factory TriggerFactory

Callable with signature (config, *, event_bus=None, broker=None) -> BaseTrigger.

required
overwrite bool

If True, allow overwriting an existing registration.

False

Raises:

Type Description
ValueError

If type_name is already registered and overwrite is False.

Example::

from promptise.runtime.triggers import register_trigger_type

def sqs_factory(config, *, event_bus=None, broker=None):
    return SQSTrigger(config.custom_config["queue_url"])

register_trigger_type("sqs", sqs_factory)

registered_trigger_types

promptise.runtime.triggers.registered_trigger_types()

Return the names of all registered trigger types.

Returns:

Type Description
list[str]

Sorted list of registered type names (built-in + custom).


Governance — Budget

BudgetConfig

promptise.runtime.config.BudgetConfig

Bases: BaseModel

Autonomy budget configuration.

Attributes:

Name Type Description
enabled bool

Enable budget tracking for this process.

max_tool_calls_per_run int | None

Max tool calls per invocation.

max_llm_turns_per_run int | None

Max LLM turns per invocation.

max_cost_per_run float | None

Max cost units per invocation.

max_irreversible_per_run int | None

Max irreversible actions per invocation.

max_tool_calls_per_day int | None

Max tool calls in a day.

max_runs_per_day int | None

Max invocations in a day.

max_cost_per_day float | None

Max cost units in a day.

tool_costs dict[str, ToolCostAnnotation]

Per-tool cost annotations.

on_exceeded Literal['pause', 'stop', 'escalate']

Action on budget violation.

escalation EscalationTarget | None

Escalation target for violations.

daily_reset_hour_utc int

UTC hour to reset daily counters.

inject_remaining bool

Inject remaining budget into agent context.

ToolCostAnnotation

promptise.runtime.config.ToolCostAnnotation

Bases: BaseModel

Cost and reversibility annotation for a tool.

Attributes:

Name Type Description
cost_weight float

Abstract cost units per invocation (default 1.0).

irreversible bool

Whether the tool performs an irreversible action.

BudgetState

promptise.runtime.budget.BudgetState

Thread-safe budget counters with per-run and daily tracking.

Parameters:

Name Type Description Default
config BudgetConfig

Budget configuration with limits and tool cost annotations.

required
budget_context()

Format remaining budget as a context string for injection.

check_daily_reset() async

Reset daily counters if the reset hour has passed.

Returns:

Type Description
bool

True if counters were reset, False otherwise.

from_dict(data, config) classmethod

Reconstruct from a journal checkpoint.

record_llm_turn() async

Record an LLM turn and check limits.

record_run_start() async

Record start of a new invocation and check daily run limit.

record_tool_call(tool_name) async

Record a tool call and check per-run + daily limits.

Returns the first :class:BudgetViolation found, or None.

remaining()

Return remaining budget per category.

None means unlimited. Non-negative values indicate remaining capacity. Negative values mean the limit was exceeded.

reset_run() async

Reset per-run counters at the start of each invocation.

to_dict()

Serialise for journal checkpointing.

BudgetEnforcer

promptise.runtime.budget.BudgetEnforcer

Handles budget violations according to the configured policy.

Parameters:

Name Type Description Default
config BudgetConfig

Budget configuration with on_exceeded policy.

required
handle_violation(violation, process) async

Execute the configured response to a budget violation.

Actions: - "pause": Suspend the process. - "stop": Stop the process entirely. - "escalate": Fire escalation notification, then suspend.

Parameters:

Name Type Description Default
violation BudgetViolation

The budget violation that occurred.

required
process Any

The owning AgentProcess.

required

BudgetViolation

promptise.runtime.budget.BudgetViolation dataclass

Describes a budget limit that was exceeded.

Attributes:

Name Type Description
limit_name str

Which limit was hit (e.g. "max_tool_calls_per_run").

limit_value float

The configured maximum.

current_value float

The current counter value that exceeded the limit.

tool_name str | None

Tool that triggered the violation (if applicable).

BudgetWarning

promptise.runtime.budget.BudgetWarning dataclass

Describes a budget limit approaching its threshold (default 80%).

Attributes:

Name Type Description
limit_name str

Which limit is approaching (e.g. "max_tool_calls_per_day").

limit_value int | float

The configured maximum.

current_value int | float

The current counter value.

percentage float

How close to the limit (0.0-1.0).


Governance — Health

HealthConfig

promptise.runtime.config.HealthConfig

Bases: BaseModel

Behavioral health monitoring configuration.

Attributes:

Name Type Description
enabled bool

Enable health monitoring for this process.

stuck_threshold int

Same tool+args N times = stuck.

loop_window int

Tool calls to examine for loop detection.

loop_min_repeats int

Min pattern repeats to trigger.

empty_threshold int

N consecutive short responses = anomaly.

empty_max_chars int

Below this = trivial response.

error_window int

Sliding window for error rate.

error_rate_threshold float

Error rate above this triggers anomaly.

on_anomaly Literal['log', 'pause', 'escalate']

Action when anomaly is detected.

cooldown float

Seconds between same anomaly type.

escalation EscalationTarget | None

Escalation target for anomalies.

HealthMonitor

promptise.runtime.health.HealthMonitor

Lightweight behavioral health analysis.

All detection methods are pure logic — no I/O, no LLM calls. Anomalies respect a configurable cooldown to avoid spam.

Parameters:

Name Type Description Default
config HealthConfig

Health monitoring configuration.

required
process_id str

Owning process ID.

required
anomalies property

All detected anomalies.

latest_anomaly property

Most recent anomaly, or None.

tool_history property

Recent tool call history [(name, args_hash)].

from_dict(data, config) classmethod

Reconstruct from a journal checkpoint.

health_status()

Return health summary for process.status().

record_error() async

Record a failed invocation and check error rate.

record_response(content) async

Record an agent response and check for empty response anomaly.

record_success() async

Record a successful invocation.

Returns:

Type Description
bool

True if this success clears a previous anomaly (recovery).

record_tool_call(tool_name, args) async

Record a tool call and check for stuck/loop anomalies.

to_dict()

Serialise for journal checkpointing.

Anomaly

promptise.runtime.health.Anomaly dataclass

A detected behavioral anomaly.

Attributes:

Name Type Description
anomaly_type AnomalyType

Kind of anomaly.

description str

Human-readable explanation.

timestamp datetime

When the anomaly was detected.

details dict[str, Any]

Additional evidence (tool sequences, counts, etc.).

AnomalyType

promptise.runtime.health.AnomalyType

Bases: str, Enum

Types of behavioral anomaly.


Governance — Mission

MissionConfig

promptise.runtime.config.MissionConfig

Bases: BaseModel

Mission-oriented process configuration.

Attributes:

Name Type Description
enabled bool

Enable mission tracking for this process.

objective str

What the agent is trying to achieve.

success_criteria str

How to judge completion.

eval_model str | None

LLM for evaluation (None = use process model).

eval_every int

Evaluate every N invocations.

confidence_threshold float

Below this = pause + escalate.

timeout_hours float

Max hours (0 = no timeout).

max_invocations int

Max invocations (0 = unlimited).

auto_complete bool

Stop process when mission achieved.

on_complete Literal['stop', 'continue', 'suspend']

Behavior when mission completes.

escalation EscalationTarget | None

Escalation target for low confidence.

MissionTracker

promptise.runtime.mission.MissionTracker

Tracks mission progress across invocations.

Handles evaluation scheduling, LLM-as-judge calls, programmatic checks, timeout/limit checking, and context prompt generation.

Parameters:

Name Type Description Default
config MissionConfig

Mission configuration.

required
process_id str

Owning process ID.

required
journal JournalProvider | None

Optional journal provider for logging evaluations.

None
success_check Callable[[MissionEvidence], bool | None] | None

Optional callable (MissionEvidence) -> bool | None. Return True for achieved, False for not achieved, None for inconclusive (fall through to LLM evaluator).

None
evaluations property

All evaluations performed so far.

invocation_count property

Number of invocations since mission started.

state property

Current mission state.

complete()

Transition mission to COMPLETED state.

context_summary()

Generate context for injection into the agent's system prompt.

evaluate(evidence, model) async

Run a mission evaluation.

If a success_check callable is configured, it runs first. If it returns True → mission achieved. False → not achieved. None → fall through to LLM evaluator.

If the LLM call fails, returns a safe default evaluation (achieved=False, confidence=0.5).

Parameters:

Name Type Description Default
evidence MissionEvidence

Evidence bundle with conversation, state, tool log.

required
model str

Fallback model ID if eval_model is not set.

required

Returns:

Type Description
MissionEvaluation

The evaluation result.

fail(reason)

Transition mission to FAILED state.

from_dict(data, config) classmethod

Reconstruct from a journal checkpoint.

increment_invocation()

Increment the invocation counter. Called by AgentProcess.

is_over_limit()

Check whether the invocation limit has been exceeded.

is_timed_out()

Check whether the mission has exceeded its timeout.

pause()

Transition mission to PAUSED state (awaiting human input).

resume()

Transition mission from PAUSED back to ACTIVE.

should_evaluate()

Check whether it's time to run an evaluation.

Returns True if the mission is active and the invocation count is a multiple of eval_every.

to_dict()

Serialise for journal checkpointing.

MissionEvidence

promptise.runtime.mission.MissionEvidence dataclass

Bundle of evidence passed to the evaluator.

Built automatically by AgentProcess._invoke_agent() from the conversation buffer, AgentContext state, health monitor tool history, and the trigger event.

Attributes:

Name Type Description
conversation list[dict[str, Any]]

Recent conversation messages.

state dict[str, Any]

AgentContext key-value state snapshot.

tool_calls list[tuple[str, str]]

Recent tool call log [(name, args_hash)].

trigger_event dict[str, Any]

The trigger event that caused this invocation.

invocation_count int

Total invocations so far.

MissionEvaluation

promptise.runtime.mission.MissionEvaluation dataclass

Result of an evaluation.

Attributes:

Name Type Description
achieved bool

Whether the mission is complete.

confidence float

Evaluator confidence (0.0–1.0).

reasoning str

Explanation of the evaluation.

progress_summary str

Brief summary of progress so far.

timestamp datetime

When the evaluation was performed.

invocation_number int

Which invocation triggered this evaluation.

source str

"llm" or "programmatic".

MissionState

promptise.runtime.mission.MissionState

Bases: str, Enum

Mission lifecycle states.


Governance — Secrets

SecretScopeConfig

promptise.runtime.config.SecretScopeConfig

Bases: BaseModel

Per-process secret scoping configuration.

Attributes:

Name Type Description
enabled bool

Enable secret scoping for this process.

secrets dict[str, str]

Secret name to value or ${ENV_VAR} reference.

default_ttl float | None

Default TTL in seconds (None = no expiry).

ttls dict[str, float]

Per-secret TTL overrides in seconds.

revoke_on_stop bool

Zero-fill and remove all secrets on stop.

SecretScope

promptise.runtime.secrets.SecretScope

Per-process secret vault with TTL, access logging, and rotation.

Parameters:

Name Type Description Default
config SecretScopeConfig

Secret scoping configuration.

required
process_id str

Owning process ID (for journal entries).

required
journal JournalProvider | None

Optional journal provider for access logging.

None
active_secret_count property

Number of non-expired secrets currently stored.

active_secret_names property

Names of non-expired secrets currently stored.

get(name)

Get a secret value by name.

Returns None if the secret doesn't exist or has expired. Every access is logged to the journal (name only, never value).

list_names()

Return names of all active (non-expired) secrets.

resolve_initial() async

Resolve all secrets from config on startup.

Each value in config.secrets is either a literal string or an env var reference like ${VAR_NAME}. References are resolved from os.environ.

Raises:

Type Description
KeyError

If an env var reference cannot be resolved.

revoke_all() async

Remove all secrets from the in-memory store.

Called on process stop. Overwrites values with null bytes as a best-effort measure (Python's immutable strings mean the original value may persist in the interpreter's memory pool until garbage-collected).

rotate(name, new_value) async

Replace a secret's value immediately.

The old value is overwritten in memory. The rotation event is logged to the journal (name only).

Raises:

Type Description
KeyError

If the secret name doesn't exist.

sanitize_text(text)

Replace any secret values found in text with [REDACTED].

Used to sanitise conversation buffers and status output.


Inbox (Human-in-the-loop)

InboxConfig

promptise.runtime.config.InboxConfig

Bases: BaseModel

Configuration for the human-to-agent message inbox.

Attributes:

Name Type Description
enabled bool

Enable the message inbox.

max_messages int

Maximum messages in the inbox at once.

max_message_length int

Maximum characters per message.

default_ttl float

Default time-to-live in seconds (0 = no expiry).

max_ttl float

Maximum allowed TTL in seconds.

rate_limit_per_sender int

Maximum messages per sender per hour.

scan_with_guardrails bool

Scan incoming messages for prompt injection.

MessageInbox

promptise.runtime.inbox.MessageInbox

Async-safe inbox for human-to-agent messages.

Provides a bounded, priority-sorted queue with TTL-based expiry, per-sender rate limiting, and question/response tracking.

Parameters:

Name Type Description Default
max_messages int

Maximum messages in the inbox. When full, lowest-priority messages are evicted.

50
max_message_length int

Maximum characters per message.

2000
default_ttl float

Default time-to-live in seconds (0 = no expiry).

3600
max_ttl float

Maximum allowed TTL in seconds.

86400
rate_limit_per_sender int

Maximum messages per sender per hour.

20
add(message) async

Add a message to the inbox.

Parameters:

Name Type Description Default
message InboxMessage

The message to add.

required

Returns:

Type Description
str

The message_id.

Raises:

Type Description
ValueError

If rate limit exceeded or content too long.

clear() async

Clear all messages and cancel pending question futures.

get_pending() async

Get all non-expired pending messages, sorted by priority.

Returns messages in order: critical → high → normal → low, then by timestamp (oldest first within same priority). Does NOT remove messages — call :meth:mark_processed after the agent has seen them.

get_questions() async

Get pending questions that haven't been answered yet.

mark_processed(message_id) async

Mark a message as processed (remove from inbox).

Questions are kept until their response is submitted.

purge_expired() async

Remove all expired messages. Returns count removed.

status() async

Get inbox status summary.

submit_response(question_id, response) async

Submit an agent's response to a question.

Resolves the future that the caller of :meth:wait_for_response is awaiting.

Parameters:

Name Type Description Default
question_id str

The message_id of the original question.

required
response InboxResponse

The agent's response.

required

Raises:

Type Description
KeyError

If no pending question with this ID.

wait_for_response(question_id, timeout=300) async

Wait for the agent to respond to a question.

Parameters:

Name Type Description Default
question_id str

The message_id of the question.

required
timeout float

Maximum seconds to wait.

300

Returns:

Type Description
InboxResponse

The agent's response.

Raises:

Type Description
KeyError

If no pending question with this ID.

TimeoutError

If timeout expires.

InboxMessage

promptise.runtime.inbox.InboxMessage dataclass

A message from a human to a running agent.

Attributes:

Name Type Description
message_id str

Unique cryptographic ID.

content str

The message text.

message_type MessageType

Type of message (directive, context, question, correction).

sender_id str | None

Who sent it (for audit trail).

priority str

Message priority (low, normal, high, critical).

created_at float

When the message was created.

expires_at float | None

When the message becomes stale (None = never).

metadata dict[str, Any]

Developer-provided custom data.

is_expired()

Check if the message has expired.

to_dict()

Serialize to a JSON-safe dict.

InboxResponse

promptise.runtime.inbox.InboxResponse dataclass

An agent's response to a question.

Attributes:

Name Type Description
question_id str

The message_id of the original question.

content str

The agent's answer text.

answered_at float

When the agent generated the answer.

invocation_id str | None

Which invocation produced the answer.

to_dict()

Serialize to a JSON-safe dict.

MessageType

promptise.runtime.inbox.MessageType

Bases: str, Enum

Types of human-to-agent messages.


Journal

JournalConfig

promptise.runtime.config.JournalConfig

Bases: BaseModel

Journal (durable audit log) configuration.

Attributes:

Name Type Description
level Literal['none', 'checkpoint', 'full']

Detail level — none (disabled), checkpoint (state snapshots per cycle), or full (every side effect).

backend Literal['file', 'memory']

Storage backend.

path str

Directory for the file backend.

JournalProvider

promptise.runtime.journal.JournalProvider

Bases: Protocol

Protocol for journal backends.

append(entry) async

Append an entry to the journal.

checkpoint(process_id, state) async

Store a full state checkpoint.

close() async

Release any resources.

last_checkpoint(process_id) async

Return the most recent checkpoint state, or None.

read(process_id, *, since=None, entry_type=None, limit=None) async

Read entries for a process with optional filters.

JournalEntry

promptise.runtime.journal.JournalEntry dataclass

Single journal record.

Attributes:

Name Type Description
entry_id str

Unique entry ID.

process_id str

Owning process.

timestamp datetime

When the entry was recorded (UTC).

entry_type str

Type of entry (state_transition, trigger_event, invocation_start, invocation_result, checkpoint, error).

data dict[str, Any]

Entry payload.

from_dict(data) classmethod

Deserialize from a dict.

to_dict()

Serialize to a JSON-compatible dict.

JournalLevel

promptise.runtime.journal.JournalLevel

Bases: str, Enum

Journal detail level.

FileJournal

promptise.runtime.journal.FileJournal

JSONL-based journal for durable process audit logging.

One JSONL file per process, append-only writes. Checkpoints are stored separately in a JSON file.

Parameters:

Name Type Description Default
base_path str

Directory for journal files (created if missing).

'.promptise/journal'

Example::

journal = FileJournal("/tmp/journals")
await journal.append(JournalEntry(
    process_id="p-1",
    entry_type="trigger_event",
    data={"trigger_type": "cron"},
))
entries = await journal.read("p-1")
append(entry) async

Append an entry to the process's JSONL file.

checkpoint(process_id, state) async

Store a checkpoint and record it in the journal.

close() async

No-op — file handles are opened/closed per operation.

last_checkpoint(process_id) async

Return the latest checkpoint from the checkpoint file.

read(process_id, *, since=None, entry_type=None, limit=None) async

Read entries from the process's JSONL file.

InMemoryJournal

promptise.runtime.journal.InMemoryJournal

In-memory journal for testing and development.

All entries are stored in a Python list — nothing is persisted.

Example::

journal = InMemoryJournal()
await journal.append(JournalEntry(process_id="p-1", entry_type="test"))
entries = await journal.read("p-1")
assert len(entries) == 1
append(entry) async

Append an entry.

checkpoint(process_id, state) async

Store a checkpoint.

close() async

No-op for in-memory backend.

last_checkpoint(process_id) async

Return the latest checkpoint.

read(process_id, *, since=None, entry_type=None, limit=None) async

Read entries with optional filters.

ReplayEngine

promptise.runtime.journal.ReplayEngine

Replays journal entries to reconstruct process state.

Parameters:

Name Type Description Default
journal JournalProvider

:class:JournalProvider to read from.

required

Example::

engine = ReplayEngine(journal)
recovered = await engine.recover("process-1")
# recovered = {
#     "context_state": {...},
#     "lifecycle_state": "running",
#     "last_entry_type": "invocation_result",
# }
recover(process_id) async

Recover process state from journal.

  1. Load the last checkpoint (if any).
  2. Read all entries after the checkpoint.
  3. Replay state transitions and context mutations.

Parameters:

Name Type Description Default
process_id str

Process to recover.

required

Returns:

Type Description
dict[str, Any]

Dict with context_state, lifecycle_state,

dict[str, Any]

last_entry_type, and entries_replayed.


Manifests

AgentManifestSchema

promptise.runtime.manifest.AgentManifestSchema

Bases: BaseModel

Root schema for .agent manifest files.

Attributes:

Name Type Description
version Literal['1.0']

Schema version (currently "1.0").

name str

Process name (unique identifier).

model str

LLM model ID.

instructions str | None

System prompt.

servers dict[str, Any]

MCP server specifications.

triggers list[dict[str, Any]]

Trigger configurations (list of dicts).

world dict[str, Any]

Initial world state (key-value).

memory dict[str, Any] | None

Memory provider configuration.

journal dict[str, Any] | None

Journal configuration.

config dict[str, Any]

Additional process config overrides.

entrypoint str | None

Optional Python module for custom hooks.

load_manifest

promptise.runtime.manifest.load_manifest(path)

Load and validate a .agent manifest file.

Parameters:

Name Type Description Default
path str | Path

Path to the .agent YAML file.

required

Returns:

Name Type Description
Validated AgentManifestSchema

class:AgentManifestSchema.

Raises:

Type Description
ManifestError

If the file cannot be read or parsed.

ManifestValidationError

If schema validation fails.

save_manifest

promptise.runtime.manifest.save_manifest(manifest, path)

Write a manifest to a YAML file.

Parameters:

Name Type Description Default
manifest AgentManifestSchema

The manifest to save.

required
path str | Path

Destination file path.

required

validate_manifest

promptise.runtime.manifest.validate_manifest(path)

Validate a manifest file, returning a list of warnings.

Parameters:

Name Type Description Default
path str | Path

Path to the manifest file.

required

Returns:

Type Description
list[str]

List of warning messages (empty if fully valid).

Raises:

Type Description
ManifestError

If the file cannot be loaded.

ManifestValidationError

If the schema is invalid.

manifest_to_process_config

promptise.runtime.manifest.manifest_to_process_config(manifest)

Convert a manifest into a :class:ProcessConfig.

Parameters:

Name Type Description Default
manifest AgentManifestSchema

Validated manifest schema.

required

Returns:

Name Type Description
A ProcessConfig

class:ProcessConfig ready to pass to :class:AgentProcess.


Orchestration API

Remote HTTP control of running processes.

OrchestrationAPI

promptise.runtime.api.OrchestrationAPI

REST API server for managing an AgentRuntime.

Parameters:

Name Type Description Default
runtime Any

The :class:AgentRuntime to manage.

required
host str

Bind address (default 127.0.0.1).

'127.0.0.1'
port int

Port to listen on (default 9100).

9100
auth_token str | None

Bearer token for authentication. Required when host is not localhost.

None
start() async

Start the API server.

stop() async

Stop the API server.

OrchestrationClient

promptise.runtime.api_client.OrchestrationClient

Async client for the Promptise Orchestration API.

Provides typed methods for every API endpoint. Uses httpx internally — the client is an async context manager that manages the HTTP connection pool.

Parameters:

Name Type Description Default
base_url str

Base URL of the Orchestration API (e.g. "http://localhost:9100").

required
token str | None

Bearer token for authentication.

None
timeout float

Default request timeout in seconds.

30.0

Example::

async with OrchestrationClient("http://localhost:9100", token="tok") as c:
    await c.start_process("monitor")
    status = await c.get_process("monitor")
add_trigger(name, trigger_config) async

Add a trigger to a process.

Example::

await client.add_trigger("monitor", {
    "type": "cron",
    "cron_expression": "0 9 * * 1-5",
})
ask(name, question, *, timeout=120, sender_id=None) async

Ask a question and wait for the agent's answer.

Long-polls until the agent responds or timeout expires.

Parameters:

Name Type Description Default
name str

Process name.

required
question str

Question text.

required
timeout float

Max seconds to wait for a response.

120
sender_id str | None

Identifier for the sender.

None

Returns:

Type Description
str

The agent's response text.

Raises:

Type Description
TimeoutException

If the agent doesn't respond in time.

clear_anomalies(name) async

Clear anomaly history after resolving the issue.

clear_inbox(name) async

Clear all messages in the inbox.

deploy(name, config, *, start=False) async

Deploy a new agent process.

Parameters:

Name Type Description Default
name str

Process name (unique).

required
config dict[str, Any]

ProcessConfig as a dict.

required
start bool

Whether to start immediately after deployment.

False
fail_mission(name, reason='Manually failed via client') async

Manually fail a mission.

get_anomalies(name) async

Get anomaly history for a process.

get_context(name) async

Get the process's AgentContext state.

get_inbox(name) async

Get inbox status for a process.

get_journal(name, *, limit=50, entry_type=None) async

Read journal entries for a process.

Parameters:

Name Type Description Default
name str

Process name.

required
limit int

Max entries to return (1-500).

50
entry_type str | None

Filter by entry type (e.g. "state_transition").

None
get_metrics(name) async

Get process metrics (invocation count, budget usage, etc.).

get_mission(name) async

Get mission state and evaluation history.

get_process(name) async

Get detailed status for a single process.

health() async

Health check (no auth required).

list_processes() async

List all registered processes.

list_secrets(name) async

List secret names and active status (never values).

list_triggers(name) async

List all triggers (static + dynamic) for a process.

pause_mission(name) async

Pause mission evaluation.

remove_process(name) async

Remove a process (stops it first if running).

remove_trigger(name, trigger_id) async

Remove a trigger by ID.

restart_process(name) async

Restart a process (stop then start).

resume_mission(name) async

Resume mission evaluation.

resume_process(name) async

Resume a suspended process.

revoke_secrets(name) async

Revoke all secrets (zero-fill, immediate).

rotate_secret(name, secret_name, value, *, ttl=None) async

Rotate a secret value (immediate effect).

Parameters:

Name Type Description Default
name str

Process name.

required
secret_name str

Name of the secret to rotate.

required
value str

New secret value.

required
ttl int | None

Optional TTL in seconds.

None
runtime_status() async

Get global runtime status.

send_message(name, content, *, message_type='context', priority='normal', sender_id=None, ttl=None) async

Send a message to a running agent.

Parameters:

Name Type Description Default
name str

Process name.

required
content str

Message content.

required
message_type str

One of "directive", "context", "question", "correction".

'context'
priority str

One of "low", "normal", "high", "critical".

'normal'
sender_id str | None

Identifier for the sender (for audit trail).

None
ttl int | None

Time-to-live in seconds (message expires after TTL).

None
start_all() async

Start all processes.

start_process(name) async

Start a process.

stop_all() async

Stop all processes.

stop_process(name) async

Stop a process.

suspend_process(name) async

Suspend a process (pause trigger processing).

update_budget(name, **fields) async

Update budget limits (effective immediately).

Example::

await client.update_budget("monitor", max_cost_per_day=500.0)
update_context(name, state) async

Update AgentContext key-value state.

Only writable keys are accepted. Others return 403.

Parameters:

Name Type Description Default
name str

Process name.

required
state dict[str, Any]

Dict of key-value pairs to update.

required
update_health(name, **fields) async

Update health monitoring thresholds.

update_instructions(name, instructions) async

Update the system prompt (effective next invocation).

update_mission(name, **fields) async

Update mission configuration.


Distributed coordination

RuntimeCoordinator

promptise.runtime.distributed.RuntimeCoordinator

Cluster coordinator for distributed runtime management.

Tracks nodes, monitors health, and aggregates cluster state.

Parameters:

Name Type Description Default
health_check_interval float

Seconds between health checks.

15.0
node_timeout float

Seconds before a node is considered unhealthy.

45.0
healthy_nodes property

List of currently healthy nodes.

nodes property

Read-only view of registered nodes.

check_health() async

Check health of all registered nodes.

Makes HTTP requests to each node's /health endpoint.

Returns:

Type Description
dict[str, dict[str, Any]]

Dict mapping node_id to health status.

cluster_status() async

Aggregate status across all nodes.

Returns:

Type Description
dict[str, Any]

Dict with cluster-wide summary and per-node details.

get_node(node_id)

Get information about a registered node.

Parameters:

Name Type Description Default
node_id str

Node identifier.

required

Returns:

Type Description
NodeInfo

class:NodeInfo for the node.

Raises:

Type Description
KeyError

If node doesn't exist.

get_node_status(node_id) async

Get detailed status from a specific node via HTTP.

Parameters:

Name Type Description Default
node_id str

Node to query.

required

Returns:

Type Description
dict[str, Any]

Full status from the node.

Raises:

Type Description
KeyError

If node doesn't exist.

RuntimeError

If aiohttp is not available.

inject_event_on_node(node_id, process_name, payload, trigger_type='remote') async

Inject a trigger event on a remote node.

Parameters:

Name Type Description Default
node_id str

Target node.

required
process_name str

Target process.

required
payload Any

Event payload.

required
trigger_type str

Type of trigger event.

'remote'

Returns:

Type Description
dict[str, Any]

Response from the node.

register_node(node_id, url, metadata=None)

Register a runtime node with the coordinator.

Parameters:

Name Type Description Default
node_id str

Unique node identifier.

required
url str

Base URL for the node's transport API.

required
metadata dict[str, Any] | None

Optional additional metadata.

None

Returns:

Type Description
NodeInfo

The registered :class:NodeInfo.

start_health_monitor() async

Start the background health monitoring loop.

start_process_on_node(node_id, process_name) async

Start a process on a specific node.

Parameters:

Name Type Description Default
node_id str

Target node.

required
process_name str

Process to start.

required

Returns:

Type Description
dict[str, Any]

Response from the node.

stop_health_monitor() async

Stop the health monitoring loop.

stop_process_on_node(node_id, process_name) async

Stop a process on a specific node.

Parameters:

Name Type Description Default
node_id str

Target node.

required
process_name str

Process to stop.

required

Returns:

Type Description
dict[str, Any]

Response from the node.

unregister_node(node_id)

Remove a node from the cluster.

Parameters:

Name Type Description Default
node_id str

Node to remove.

required

Raises:

Type Description
KeyError

If node doesn't exist.

NodeInfo

promptise.runtime.distributed.NodeInfo dataclass

Information about a runtime node in the cluster.

Attributes:

Name Type Description
node_id str

Unique node identifier.

url str

Base URL for the node's transport API.

last_heartbeat float

Timestamp of last successful health check.

status str

Current node status.

process_count int

Number of processes on this node.

metadata dict[str, Any]

Additional node metadata.

is_healthy property

Check if node has had a recent heartbeat.

to_dict()

Serialize to JSON-compatible dict.

DiscoveredNode

promptise.runtime.distributed.DiscoveredNode dataclass

A discovered runtime node.

Attributes:

Name Type Description
node_id str

Unique node identifier.

url str

Base URL for the node's transport API.

discovered_at float

Timestamp when the node was discovered.

metadata dict[str, Any]

Additional node metadata.

to_dict()

Serialize to JSON-compatible dict.

StaticDiscovery

promptise.runtime.distributed.StaticDiscovery

Static discovery from a known list of node URLs.

Suitable for fixed-topology deployments where all node addresses are known at configuration time.

Parameters:

Name Type Description Default
nodes dict[str, str] | None

Mapping of node_id → URL.

None
discover() async

Return all statically configured nodes.

register(node_id, url, metadata=None) async

Add a node to the static list.

unregister(node_id) async

Remove a node from the static list.

RegistryDiscovery

promptise.runtime.distributed.RegistryDiscovery

In-process registry for node discovery.

Nodes register themselves with the registry, and other nodes can discover them. This is suitable for single-process testing or when a shared registry object is accessible to all nodes.

For production deployments across machines, use with a central coordinator (the coordinator can hold the RegistryDiscovery instance and expose it via its HTTP API).

Parameters:

Name Type Description Default
ttl float

Time-to-live for registrations in seconds. After this period without re-registration, nodes are considered stale and removed from discovery.

60.0
discover() async

Return all registered nodes, pruning stale entries.

heartbeat(node_id) async

Update a node's registration timestamp.

Parameters:

Name Type Description Default
node_id str

Node to refresh.

required
register(node_id, url, metadata=None) async

Register a node for discovery.

unregister(node_id) async

Remove a node from the registry.


Dashboard

RuntimeDashboard

promptise.runtime.RuntimeDashboard

Full-screen tabbed terminal dashboard for the Agent Runtime.

Follows the same architecture as the MCP server's :class:~promptise.mcp.server._dashboard.Dashboard:

  • 7 tabs -- Overview, Processes, Triggers, Context, Logs, Events, Commands
  • Arrow-key / number-key navigation
  • Live refresh -- ~250ms in a daemon thread
  • Log capture -- Python logger output in Logs tab
  • Command input -- interactive process control

Parameters:

Name Type Description Default
state RuntimeDashboardState

:class:RuntimeDashboardState instance (shared with data collector).

required
runtime AgentRuntime | None

Optional :class:AgentRuntime for command execution.

None
start()

Start the live dashboard (takes over the terminal).

stop()

Stop the dashboard and restore the terminal.

RuntimeDashboardState

promptise.runtime.RuntimeDashboardState dataclass

Thread-safe shared state for the runtime dashboard.

Mirrors the MCP dashboard's :class:DashboardState pattern but tracks runtime-specific data: processes, triggers, world state, and journal entries.

record_command(result)

Record a command executed.

record_event(event)

Record a trigger event received.

record_invocation(log)

Record a completed agent invocation.

record_journal(entry)

Record a journal entry.

RuntimeDataCollector

promptise.runtime.RuntimeDataCollector

Collects real-time data from a running AgentRuntime into DashboardState.

Runs as a background task, periodically polling the runtime for process status, context state, and trigger info.

start()

Start the data collection thread.

stop()

Stop collecting.


Exceptions

RuntimeBaseError

promptise.runtime.exceptions.RuntimeBaseError

Bases: RuntimeError

Base exception for all runtime module errors.

ProcessStateError

promptise.runtime.exceptions.ProcessStateError

Bases: RuntimeBaseError

Raised when a lifecycle state transition is invalid.

Attributes:

Name Type Description
process_id

The process that attempted the transition.

current_state

The process's current state.

attempted_state

The target state that was rejected.

ManifestError

promptise.runtime.exceptions.ManifestError

Bases: RuntimeBaseError

Raised when .agent manifest loading fails (I/O or parse error).

ManifestValidationError

promptise.runtime.exceptions.ManifestValidationError

Bases: ManifestError

Raised when .agent manifest schema validation fails.

Attributes:

Name Type Description
errors

Pydantic validation error details (if available).

file_path

Path to the manifest file that failed validation.

TriggerError

promptise.runtime.exceptions.TriggerError

Bases: RuntimeBaseError

Raised when a trigger fails to start, stop, or produce events.

JournalError

promptise.runtime.exceptions.JournalError

Bases: RuntimeBaseError

Raised when journal read, write, or replay fails.