MultiRun

Decorators

@run, @step, @checkpoint, @agent reference.

Decorators

All decorators support both bare (@decorator) and parameterized (@decorator(...)) syntax. They work with both async and sync functions.

@run

Mark a function as a MultiRun run with budget and policy enforcement.

def run(
    func=None,
    *,
    name: str | None = None,
    budget: Budget | None = None,
    policy: Policy | None = None,
    checkpoint_on_error: bool = True,
    metadata: dict[str, Any] | None = None,
)
ParamDefaultDescription
namefunction nameRun name in the runtime
budgetNoneBudget constraints
policyNonePolicy rules
checkpoint_on_errorTrueCreate a checkpoint when the run fails
metadataNoneArbitrary metadata dict

Behavior:

  1. Gets or creates a MultiRunClient from the current context
  2. Calls client.start_run() to register the run
  3. Sets up a RunContext so inner @step / @checkpoint calls are tracked
  4. On success: calls RunContext.complete(result)
  5. On BudgetExceededError: fails the run, re-raises
  6. On other exceptions: optionally checkpoints, fails the run, re-raises
from multirun import run
from multirun.models import Budget

@run(budget=Budget(max_tokens=100_000, max_cost_usd=1.0))
async def research_agent(query: str) -> Report:
    ...

# Bare usage
@run
async def simple_agent(query: str) -> str:
    ...

@step

Mark a function as a transactional step with retry logic.

def step(
    func=None,
    *,
    name: str | None = None,
    kind: StepKind = StepKind.FUNCTION,
    retry: int = 0,
    retry_delay: float = 1.0,
    retry_max_delay: float = 60.0,
    retry_strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL,
    retry_on: tuple[type[Exception], ...] | None = None,
    checkpoint: bool = False,
    idempotent: bool = True,
    check_budget: bool = True,
)
ParamDefaultDescription
namefunction nameStep name in the runtime
kindFUNCTIONStepKind for tracing
retry0Number of retry attempts (0 = no retries)
retry_delay1.0Base delay between retries (seconds)
retry_max_delay60.0Max delay ceiling
retry_strategyEXPONENTIALconstant, linear, or exponential
retry_onNoneException types to retry on (None = all)
checkpointFalseCreate checkpoint after success
idempotentTrueReturn cached result during replay
check_budgetTrueCheck budget before execution

Behavior:

  1. If outside a run context, executes the function without tracking
  2. During replay with idempotent=True, returns cached result if input hash matches
  3. Checks budget before execution (raises BudgetExceededError)
  4. Registers the step with the runtime
  5. Executes with retry loop using configured backoff
  6. On final failure with retry > 0, wraps in StepError
from multirun import step
from multirun.models import StepKind

@step(
    kind=StepKind.LLM_CALL,
    retry=3,
    retry_delay=1.0,
    checkpoint=True,
)
async def call_openai(prompt: str) -> str:
    ...

@step(retry=2, retry_on=(ConnectionError, TimeoutError))
async def fetch_url(url: str) -> str:
    ...

@checkpoint

Create a durable checkpoint after function execution.

def checkpoint(
    func=None,
    *,
    name: str | None = None,
    capture_result: bool = True,
    result_key: str = "result",
    extra_state: dict[str, Any] | None = None,
)
ParamDefaultDescription
nameafter_{func_name}Checkpoint name
capture_resultTrueInclude return value in checkpoint state
result_key"result"Key for the return value in state dict
extra_stateNoneAdditional state to include

Behavior:

  • Executes the function first, then creates a checkpoint
  • If outside a run context, skips checkpointing silently
  • If checkpointing fails, logs a warning but does not fail the function
from multirun import checkpoint

@checkpoint(name="search_complete")
async def search_sources(query: str) -> list[Source]:
    return await search_api.search(query)

@agent

Mark a function as a named agent within a run. Steps inside this function are scoped to the agent.

def agent(
    func=None,
    *,
    name: str | None = None,
    agent_config_id: str | None = None,
    metadata: dict[str, Any] | None = None,
)
ParamDefaultDescription
namefunction nameAgent name in the runtime
agent_config_idNoneReference to a workspace-level AgentConfig
metadataNoneArbitrary metadata dict

Behavior:

  1. Gets the current RunContext
  2. Creates an AgentContext via run_ctx.start_agent()
  3. Executes the function inside the agent context
  4. On success: marks agent completed
  5. On failure: marks agent failed, re-raises
from multirun import agent, step

@agent(name="researcher")
async def research(query: str) -> list[str]:
    results = await search_step(query)
    return await summarize_step(results)

@agent(name="planner", agent_config_id="ac_123")
async def planner(query: str) -> Plan:
    ...

When no @agent is used, a default agent is created implicitly by RunContext.

On this page