MultiRun

Context

RunContext, AgentContext, StepContext and checkpointing.

Context

The context system provides async-safe tracking of runs, agents, and steps using Python contextvars. Decorated functions access context implicitly — no parameter passing required.

Hierarchy: RunContextAgentContextStepContext

Context accessors

from multirun.context import (
    get_current_run_context,       # Returns RunContext; raises RuntimeError if none
    get_current_agent_context,     # Returns AgentContext; raises RuntimeError if none
    get_current_client,            # Returns MultiRunClient; raises RuntimeError if none
    has_active_run,                # bool
    has_active_agent,              # bool
    has_active_client,             # bool
)

These are async-safe (backed by contextvars) and work correctly in concurrent tasks.

RunContext

Top-level context for a running execution. Created by the @run decorator or manually.

Fields

FieldTypeDescription
clientMultiRunClientThe client managing this run
runRunThe run model
is_replayingboolTrue when executing from a restored checkpoint
agentslist[AgentContext]All agents in this run

State management

ctx = get_current_run_context()

ctx.set_state("results", my_results)
ctx.set_state("iteration", 5)

results = ctx.get_state("results")
iteration = ctx.get_state("iteration", default=0)

State set via set_state() is included in checkpoints automatically.

Agent management

agent_ctx = await ctx.start_agent(
    name="researcher",
    agent_config_id="ac_123",   # optional
    metadata={"model": "gpt-4"},  # optional
)

async with agent_ctx:
    # steps inside here are scoped to this agent
    ...

When no explicit @agent is used, RunContext lazily creates a default agent.

Lifecycle

await ctx.complete(result)        # Complete all agents, mark run completed
await ctx.fail(error)             # Fail active agents, mark run failed

Async context manager

async with RunContext(client=client, run=run) as ctx:
    # ctx is set as the current run context
    ...
# context is cleared on exit

AgentContext

Tracks a running agent and its steps within a run.

Fields

FieldTypeDescription
run_contextRunContextParent run context
agentAgentThe agent model
is_defaultboolTrue if this is the implicit default agent
is_replayingboolInherited from RunContext
stepslist[Step]Steps executed by this agent

Starting steps

step_ctx = await agent_ctx.start_step(
    name="call_llm",
    kind=StepKind.LLM_CALL,
    input_args=(prompt,),
    input_kwargs={},
)

Returns a StepContext. Inputs are serialized and hashed for replay detection.

Replay cache

found, result = await agent_ctx.get_step_result("call_llm", args, kwargs)
if found:
    return result  # cached from previous execution

State & checkpoints

agent_ctx.set_state("key", value)
value = agent_ctx.get_state("key", default=None)

ckpt = await agent_ctx.checkpoint(
    name="after_search",
    extra_state={"custom": "data"},
)

Lifecycle

await agent_ctx.complete(result)  # aggregates step metrics
await agent_ctx.fail(error)

StepContext

Context for a single step execution. Created by AgentContext.start_step().

Fields

FieldTypeDescription
agent_contextAgentContextParent agent context
stepStepThe step model

Completing a step

await step_ctx.complete(
    result,
    tokens_used=500,
    cost_usd=0.01,
)

Automatically computes latency_ms, serializes the result, and updates run-level totals.

Failing a step

await step_ctx.fail(error)

Checkpoint & Replay

Creating checkpoints

from multirun.context import get_current_run_context

ctx = get_current_run_context()

ctx.set_state("results", results)
ctx.set_state("iteration", 5)

ckpt = await ctx.checkpoint(name="mid_execution")

Restoring from checkpoint

await ctx.restore_from_checkpoint(checkpoint_id)

# State is now restored
results = ctx.get_state("results")
iteration = ctx.get_state("iteration")
# ctx.is_replaying is now True

Raises CheckpointError if loading fails, ReplayError if deserialization fails.

Note

Checkpoint endpoints are not yet available in the backend. Checkpoints work fully with multirun.init_local(). See Known Issues.

On this page