MultiRun

Client

MultiRunClient API for runs, steps, and lifecycle management.

Client

MultiRunClient is the main interface for interacting with the MultiRun runtime. It manages runs, steps, and their lifecycle.

Creating a client

multirun.init()

def init(
    endpoint: str = "http://localhost:8989",
    **kwargs,
) -> MultiRunClient

Initialize the global default client. Keyword arguments are forwarded to MultiRunConfig.

import multirun
multirun.init("http://localhost:8989", api_key="your-key")

multirun.init_local()

def init_local() -> MultiRunClient

Initialize a local client with in-memory transport. No server required.

client = multirun.init_local()

multirun.get_client()

def get_client() -> MultiRunClient

Get the default client. Raises RuntimeError if init() has not been called.

MultiRunClient.connect()

@classmethod
def connect(
    cls,
    endpoint: str = "http://localhost:8989",
    *,
    api_key: str | None = None,
    **kwargs,
) -> MultiRunClient

Create a client connected to a runtime. Raises ValueError if api_key is not provided for non-localhost endpoints.

MultiRunClient.local()

@classmethod
def local(cls) -> MultiRunClient

Create a client with in-memory LocalTransport.

Using the client

The client is an async context manager. Always use async with:

async with client:
    run = await client.start_run("my_agent")
    # ...

Run operations

start_run()

async def start_run(
    self,
    name: str,
    *,
    budget: Budget | None = None,
    policy: Policy | None = None,
    parent_run_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> Run

Start a new run. Returns the created Run with a server-assigned ID.

ParamDescription
nameName of the run (e.g. "research_agent")
budgetOptional Budget constraints
policyOptional Policy rules
parent_run_idParent run ID for sub-runs
metadataArbitrary metadata dict

get_run()

async def get_run(self, run_id: str) -> Run

Get a run by ID. Raises TransportError if not found.

patch_run()

async def patch_run(
    self,
    run_id: str,
    *,
    status: RunStatus | None = None,
    result: bytes | None = None,
    error: str | None = None,
    tokens_used: int | None = None,
    cost_usd: float | None = None,
    latest_checkpoint_id: str | None = None,
) -> Run

Low-level partial update. Prefer the convenience methods below for common operations.

complete_run()

async def complete_run(
    self,
    run: Run,
    result: bytes | None = None,
    *,
    tokens_used: int | None = None,
    cost_usd: float | None = None,
) -> None

Mark a run as completed. Mutates the run object in place and persists to the runtime.

fail_run()

async def fail_run(self, run: Run, error: Exception) -> None

Mark a run as failed. Records str(error) on the run.

pause_run()

async def pause_run(
    self,
    run: Run,
    *,
    checkpoint_id: str | None = None,
) -> Run

Pause a run with an optional checkpoint reference. A paused run can be resumed via replay.

cancel_run()

async def cancel_run(self, run: Run) -> None

Cancel a run. Sets status to CANCELLED.

Step operations

start_step()

async def start_step(
    self,
    run: Run,
    name: str,
    *,
    kind: StepKind = StepKind.FUNCTION,
    input_hash: str = "",
    input_data: bytes | None = None,
) -> Step

Start a new step within a run. Returns the registered Step.

ParamDescription
nameStep name
kindStepKindfunction, llm_call, tool_call, sub_agent, decision, checkpoint
input_hashSHA-256 hash of input for replay detection
input_dataSerialized input bytes

complete_step()

async def complete_step(
    self,
    step: Step,
    *,
    output_data: bytes | None = None,
    tokens_used: int = 0,
    cost_usd: float = 0.0,
    latency_ms: int = 0,
) -> None

Mark a step as completed with output and metrics.

fail_step()

async def fail_step(self, step: Step, error: Exception) -> None

Mark a step as failed.

Known issue

complete_step and fail_step use PATCH /api/v1/steps/{step_id} but the backend expects PATCH /api/v1/runs/{run_id}/steps/{step_id}. See Known Issues.

On this page