MultiRun

LangGraph Adapter

Durable graph execution with step tracking and checkpoints.

LangGraph Adapter

Integrates LangGraph with MultiRun for durable graph execution with step tracking, checkpoints, and replay.

pip install multirun[langgraph]
from multirun.adapters.langgraph import MultiRunGraph, MultiRunCheckpointer, wrap_langgraph_node

MultiRunGraph

High-level wrapper that creates a tracked run for each graph invocation.

class MultiRunGraph:
    def __init__(
        self,
        client: MultiRunClient,
        graph: StateGraph,
        *,
        auto_wrap_nodes: bool = True,
        checkpointer: MultiRunCheckpointer | None = None,
    )
ParamDefaultDescription
clientMultiRunClient instance
graphLangGraph StateGraph
auto_wrap_nodesTrueWrap all nodes with step tracking
checkpointerauto-createdCustom MultiRunCheckpointer

ainvoke()

async def ainvoke(
    self,
    input: Any,
    *,
    run_name: str | None = None,
    budget: Budget | None = None,
    policy: Policy | None = None,
    config: dict[str, Any] | None = None,
    thread_id: str | None = None,
) -> Any

Invoke the graph with full tracking:

  1. Creates a new run with budget/policy
  2. Sets up RunContext for step tracking
  3. Invokes LangGraph with checkpointing
  4. Marks run as completed or failed
from langgraph.graph import StateGraph, END
from multirun import MultiRunClient, Budget
from multirun.adapters.langgraph import MultiRunGraph

client = MultiRunClient.connect("http://localhost:8989")
graph = StateGraph(dict)
graph.add_node("search", search_node)
graph.add_node("summarize", summarize_node)
graph.add_edge("search", "summarize")
graph.add_edge("summarize", END)
graph.set_entry_point("search")

mr_graph = MultiRunGraph(client, graph)

result = await mr_graph.ainvoke(
    {"query": "quantum computing"},
    budget=Budget(max_tokens=50000),
)

replay()

async def replay(
    self,
    run_id: str,
    *,
    from_checkpoint: str | None = None,
    input_override: Any | None = None,
    config: dict[str, Any] | None = None,
) -> Any

Replay a previous run from checkpoint:

  1. Loads the original run and checkpoint
  2. Creates a new run in replay mode (parent = original)
  3. Restores state and re-executes
result = await mr_graph.replay("run_abc123")

result = await mr_graph.replay(
    "run_abc123",
    from_checkpoint="ckpt_xyz",
    input_override={"query": "modified query"},
)

invoke()

Sync version of ainvoke. Not recommended for production.

MultiRunCheckpointer

Drop-in LangGraph checkpointer backed by MultiRun storage.

class MultiRunCheckpointer:
    def __init__(
        self,
        client: MultiRunClient,
        *,
        auto_track_steps: bool = True,
        serializer_name: str | None = None,
    )
ParamDefaultDescription
clientMultiRunClient instance
auto_track_stepsTrueTrack each node as a step
serializer_nameclient configSerializer override

Usage with graph.compile()

from multirun.adapters.langgraph import MultiRunCheckpointer

checkpointer = MultiRunCheckpointer(client)
app = graph.compile(checkpointer=checkpointer)

result = await app.ainvoke(
    {"query": "hello"},
    {"configurable": {"thread_id": "my-thread"}}
)

Methods

  • aget(config) -> dict | None — load checkpoint for a thread (async)
  • aput(config, checkpoint, metadata) -> dict — save checkpoint (async)
  • get(config) / put(config, checkpoint, metadata) — sync versions (not recommended)

wrap_langgraph_node

Decorator to add step tracking to individual LangGraph nodes.

def wrap_langgraph_node(
    node_fn=None,
    *,
    step_name: str | None = None,
    step_kind: StepKind = StepKind.LLM_CALL,
    checkpoint_after: bool = True,
)
from multirun.adapters.langgraph import wrap_langgraph_node

@wrap_langgraph_node(checkpoint_after=True)
async def search_node(state: dict) -> dict:
    results = await search_api(state["query"])
    return {"sources": results}

# Or wrap inline
graph.add_node("search", wrap_langgraph_node(search_func))

If no active RunContext exists, the function runs without tracking.

On this page