Context
RunContext, AgentContext, StepContext and checkpointing.
Context
The context system provides async-safe tracking of runs, agents, and steps using Python contextvars. Decorated functions access context implicitly — no parameter passing required.
Hierarchy: RunContext → AgentContext → StepContext
Context accessors
from multirun.context import (
get_current_run_context, # Returns RunContext; raises RuntimeError if none
get_current_agent_context, # Returns AgentContext; raises RuntimeError if none
get_current_client, # Returns MultiRunClient; raises RuntimeError if none
has_active_run, # bool
has_active_agent, # bool
has_active_client, # bool
)These are async-safe (backed by contextvars) and work correctly in concurrent tasks.
RunContext
Top-level context for a running execution. Created by the @run decorator or manually.
Fields
| Field | Type | Description |
|---|---|---|
client | MultiRunClient | The client managing this run |
run | Run | The run model |
is_replaying | bool | True when executing from a restored checkpoint |
agents | list[AgentContext] | All agents in this run |
State management
ctx = get_current_run_context()
ctx.set_state("results", my_results)
ctx.set_state("iteration", 5)
results = ctx.get_state("results")
iteration = ctx.get_state("iteration", default=0)State set via set_state() is included in checkpoints automatically.
Agent management
agent_ctx = await ctx.start_agent(
name="researcher",
agent_config_id="ac_123", # optional
metadata={"model": "gpt-4"}, # optional
)
async with agent_ctx:
# steps inside here are scoped to this agent
...When no explicit @agent is used, RunContext lazily creates a default agent.
Lifecycle
await ctx.complete(result) # Complete all agents, mark run completed
await ctx.fail(error) # Fail active agents, mark run failedAsync context manager
async with RunContext(client=client, run=run) as ctx:
# ctx is set as the current run context
...
# context is cleared on exitAgentContext
Tracks a running agent and its steps within a run.
Fields
| Field | Type | Description |
|---|---|---|
run_context | RunContext | Parent run context |
agent | Agent | The agent model |
is_default | bool | True if this is the implicit default agent |
is_replaying | bool | Inherited from RunContext |
steps | list[Step] | Steps executed by this agent |
Starting steps
step_ctx = await agent_ctx.start_step(
name="call_llm",
kind=StepKind.LLM_CALL,
input_args=(prompt,),
input_kwargs={},
)Returns a StepContext. Inputs are serialized and hashed for replay detection.
Replay cache
found, result = await agent_ctx.get_step_result("call_llm", args, kwargs)
if found:
return result # cached from previous executionState & checkpoints
agent_ctx.set_state("key", value)
value = agent_ctx.get_state("key", default=None)
ckpt = await agent_ctx.checkpoint(
name="after_search",
extra_state={"custom": "data"},
)Lifecycle
await agent_ctx.complete(result) # aggregates step metrics
await agent_ctx.fail(error)StepContext
Context for a single step execution. Created by AgentContext.start_step().
Fields
| Field | Type | Description |
|---|---|---|
agent_context | AgentContext | Parent agent context |
step | Step | The step model |
Completing a step
await step_ctx.complete(
result,
tokens_used=500,
cost_usd=0.01,
)Automatically computes latency_ms, serializes the result, and updates run-level totals.
Failing a step
await step_ctx.fail(error)Checkpoint & Replay
Creating checkpoints
from multirun.context import get_current_run_context
ctx = get_current_run_context()
ctx.set_state("results", results)
ctx.set_state("iteration", 5)
ckpt = await ctx.checkpoint(name="mid_execution")Restoring from checkpoint
await ctx.restore_from_checkpoint(checkpoint_id)
# State is now restored
results = ctx.get_state("results")
iteration = ctx.get_state("iteration")
# ctx.is_replaying is now TrueRaises CheckpointError if loading fails, ReplayError if deserialization fails.
Note
Checkpoint endpoints are not yet available in the backend. Checkpoints work fully with multirun.init_local(). See Known Issues.