Decorators
@run, @step, @checkpoint, @agent reference.
Decorators
All decorators support both bare (@decorator) and parameterized (@decorator(...)) syntax. They work with both async and sync functions.
@run
Mark a function as a MultiRun run with budget and policy enforcement.
def run(
func=None,
*,
name: str | None = None,
budget: Budget | None = None,
policy: Policy | None = None,
checkpoint_on_error: bool = True,
metadata: dict[str, Any] | None = None,
)| Param | Default | Description |
|---|---|---|
name | function name | Run name in the runtime |
budget | None | Budget constraints |
policy | None | Policy rules |
checkpoint_on_error | True | Create a checkpoint when the run fails |
metadata | None | Arbitrary metadata dict |
Behavior:
- Gets or creates a
MultiRunClientfrom the current context - Calls
client.start_run()to register the run - Sets up a
RunContextso inner@step/@checkpointcalls are tracked - On success: calls
RunContext.complete(result) - On
BudgetExceededError: fails the run, re-raises - On other exceptions: optionally checkpoints, fails the run, re-raises
from multirun import run
from multirun.models import Budget
@run(budget=Budget(max_tokens=100_000, max_cost_usd=1.0))
async def research_agent(query: str) -> Report:
...
# Bare usage
@run
async def simple_agent(query: str) -> str:
...@step
Mark a function as a transactional step with retry logic.
def step(
func=None,
*,
name: str | None = None,
kind: StepKind = StepKind.FUNCTION,
retry: int = 0,
retry_delay: float = 1.0,
retry_max_delay: float = 60.0,
retry_strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL,
retry_on: tuple[type[Exception], ...] | None = None,
checkpoint: bool = False,
idempotent: bool = True,
check_budget: bool = True,
)| Param | Default | Description |
|---|---|---|
name | function name | Step name in the runtime |
kind | FUNCTION | StepKind for tracing |
retry | 0 | Number of retry attempts (0 = no retries) |
retry_delay | 1.0 | Base delay between retries (seconds) |
retry_max_delay | 60.0 | Max delay ceiling |
retry_strategy | EXPONENTIAL | constant, linear, or exponential |
retry_on | None | Exception types to retry on (None = all) |
checkpoint | False | Create checkpoint after success |
idempotent | True | Return cached result during replay |
check_budget | True | Check budget before execution |
Behavior:
- If outside a run context, executes the function without tracking
- During replay with
idempotent=True, returns cached result if input hash matches - Checks budget before execution (raises
BudgetExceededError) - Registers the step with the runtime
- Executes with retry loop using configured backoff
- On final failure with
retry > 0, wraps inStepError
from multirun import step
from multirun.models import StepKind
@step(
kind=StepKind.LLM_CALL,
retry=3,
retry_delay=1.0,
checkpoint=True,
)
async def call_openai(prompt: str) -> str:
...
@step(retry=2, retry_on=(ConnectionError, TimeoutError))
async def fetch_url(url: str) -> str:
...@checkpoint
Create a durable checkpoint after function execution.
def checkpoint(
func=None,
*,
name: str | None = None,
capture_result: bool = True,
result_key: str = "result",
extra_state: dict[str, Any] | None = None,
)| Param | Default | Description |
|---|---|---|
name | after_{func_name} | Checkpoint name |
capture_result | True | Include return value in checkpoint state |
result_key | "result" | Key for the return value in state dict |
extra_state | None | Additional state to include |
Behavior:
- Executes the function first, then creates a checkpoint
- If outside a run context, skips checkpointing silently
- If checkpointing fails, logs a warning but does not fail the function
from multirun import checkpoint
@checkpoint(name="search_complete")
async def search_sources(query: str) -> list[Source]:
return await search_api.search(query)@agent
Mark a function as a named agent within a run. Steps inside this function are scoped to the agent.
def agent(
func=None,
*,
name: str | None = None,
agent_config_id: str | None = None,
metadata: dict[str, Any] | None = None,
)| Param | Default | Description |
|---|---|---|
name | function name | Agent name in the runtime |
agent_config_id | None | Reference to a workspace-level AgentConfig |
metadata | None | Arbitrary metadata dict |
Behavior:
- Gets the current
RunContext - Creates an
AgentContextviarun_ctx.start_agent() - Executes the function inside the agent context
- On success: marks agent completed
- On failure: marks agent failed, re-raises
from multirun import agent, step
@agent(name="researcher")
async def research(query: str) -> list[str]:
results = await search_step(query)
return await summarize_step(results)
@agent(name="planner", agent_config_id="ac_123")
async def planner(query: str) -> Plan:
...When no @agent is used, a default agent is created implicitly by RunContext.