Client
MultiRunClient API for runs, steps, and lifecycle management.
Client
MultiRunClient is the main interface for interacting with the MultiRun runtime. It manages runs, steps, and their lifecycle.
Creating a client
multirun.init()
def init(
endpoint: str = "http://localhost:8989",
**kwargs,
) -> MultiRunClientInitialize the global default client. Keyword arguments are forwarded to MultiRunConfig.
import multirun
multirun.init("http://localhost:8989", api_key="your-key")multirun.init_local()
def init_local() -> MultiRunClientInitialize a local client with in-memory transport. No server required.
client = multirun.init_local()multirun.get_client()
def get_client() -> MultiRunClientGet the default client. Raises RuntimeError if init() has not been called.
MultiRunClient.connect()
@classmethod
def connect(
cls,
endpoint: str = "http://localhost:8989",
*,
api_key: str | None = None,
**kwargs,
) -> MultiRunClientCreate a client connected to a runtime. Raises ValueError if api_key is not provided for non-localhost endpoints.
MultiRunClient.local()
@classmethod
def local(cls) -> MultiRunClientCreate a client with in-memory LocalTransport.
Using the client
The client is an async context manager. Always use async with:
async with client:
run = await client.start_run("my_agent")
# ...Run operations
start_run()
async def start_run(
self,
name: str,
*,
budget: Budget | None = None,
policy: Policy | None = None,
parent_run_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> RunStart a new run. Returns the created Run with a server-assigned ID.
| Param | Description |
|---|---|
name | Name of the run (e.g. "research_agent") |
budget | Optional Budget constraints |
policy | Optional Policy rules |
parent_run_id | Parent run ID for sub-runs |
metadata | Arbitrary metadata dict |
get_run()
async def get_run(self, run_id: str) -> RunGet a run by ID. Raises TransportError if not found.
patch_run()
async def patch_run(
self,
run_id: str,
*,
status: RunStatus | None = None,
result: bytes | None = None,
error: str | None = None,
tokens_used: int | None = None,
cost_usd: float | None = None,
latest_checkpoint_id: str | None = None,
) -> RunLow-level partial update. Prefer the convenience methods below for common operations.
complete_run()
async def complete_run(
self,
run: Run,
result: bytes | None = None,
*,
tokens_used: int | None = None,
cost_usd: float | None = None,
) -> NoneMark a run as completed. Mutates the run object in place and persists to the runtime.
fail_run()
async def fail_run(self, run: Run, error: Exception) -> NoneMark a run as failed. Records str(error) on the run.
pause_run()
async def pause_run(
self,
run: Run,
*,
checkpoint_id: str | None = None,
) -> RunPause a run with an optional checkpoint reference. A paused run can be resumed via replay.
cancel_run()
async def cancel_run(self, run: Run) -> NoneCancel a run. Sets status to CANCELLED.
Step operations
start_step()
async def start_step(
self,
run: Run,
name: str,
*,
kind: StepKind = StepKind.FUNCTION,
input_hash: str = "",
input_data: bytes | None = None,
) -> StepStart a new step within a run. Returns the registered Step.
| Param | Description |
|---|---|
name | Step name |
kind | StepKind — function, llm_call, tool_call, sub_agent, decision, checkpoint |
input_hash | SHA-256 hash of input for replay detection |
input_data | Serialized input bytes |
complete_step()
async def complete_step(
self,
step: Step,
*,
output_data: bytes | None = None,
tokens_used: int = 0,
cost_usd: float = 0.0,
latency_ms: int = 0,
) -> NoneMark a step as completed with output and metrics.
fail_step()
async def fail_step(self, step: Step, error: Exception) -> NoneMark a step as failed.
Known issue
complete_step and fail_step use PATCH /api/v1/steps/{step_id} but the backend expects PATCH /api/v1/runs/{run_id}/steps/{step_id}. See Known Issues.