LangGraph Adapter
Durable graph execution with step tracking and checkpoints.
LangGraph Adapter
Integrates LangGraph with MultiRun for durable graph execution with step tracking, checkpoints, and replay.
pip install multirun[langgraph]from multirun.adapters.langgraph import MultiRunGraph, MultiRunCheckpointer, wrap_langgraph_nodeMultiRunGraph
High-level wrapper that creates a tracked run for each graph invocation.
class MultiRunGraph:
def __init__(
self,
client: MultiRunClient,
graph: StateGraph,
*,
auto_wrap_nodes: bool = True,
checkpointer: MultiRunCheckpointer | None = None,
)| Param | Default | Description |
|---|---|---|
client | — | MultiRunClient instance |
graph | — | LangGraph StateGraph |
auto_wrap_nodes | True | Wrap all nodes with step tracking |
checkpointer | auto-created | Custom MultiRunCheckpointer |
ainvoke()
async def ainvoke(
self,
input: Any,
*,
run_name: str | None = None,
budget: Budget | None = None,
policy: Policy | None = None,
config: dict[str, Any] | None = None,
thread_id: str | None = None,
) -> AnyInvoke the graph with full tracking:
- Creates a new run with budget/policy
- Sets up
RunContextfor step tracking - Invokes LangGraph with checkpointing
- Marks run as completed or failed
from langgraph.graph import StateGraph, END
from multirun import MultiRunClient, Budget
from multirun.adapters.langgraph import MultiRunGraph
client = MultiRunClient.connect("http://localhost:8989")
graph = StateGraph(dict)
graph.add_node("search", search_node)
graph.add_node("summarize", summarize_node)
graph.add_edge("search", "summarize")
graph.add_edge("summarize", END)
graph.set_entry_point("search")
mr_graph = MultiRunGraph(client, graph)
result = await mr_graph.ainvoke(
{"query": "quantum computing"},
budget=Budget(max_tokens=50000),
)replay()
async def replay(
self,
run_id: str,
*,
from_checkpoint: str | None = None,
input_override: Any | None = None,
config: dict[str, Any] | None = None,
) -> AnyReplay a previous run from checkpoint:
- Loads the original run and checkpoint
- Creates a new run in replay mode (parent = original)
- Restores state and re-executes
result = await mr_graph.replay("run_abc123")
result = await mr_graph.replay(
"run_abc123",
from_checkpoint="ckpt_xyz",
input_override={"query": "modified query"},
)invoke()
Sync version of ainvoke. Not recommended for production.
MultiRunCheckpointer
Drop-in LangGraph checkpointer backed by MultiRun storage.
class MultiRunCheckpointer:
def __init__(
self,
client: MultiRunClient,
*,
auto_track_steps: bool = True,
serializer_name: str | None = None,
)| Param | Default | Description |
|---|---|---|
client | — | MultiRunClient instance |
auto_track_steps | True | Track each node as a step |
serializer_name | client config | Serializer override |
Usage with graph.compile()
from multirun.adapters.langgraph import MultiRunCheckpointer
checkpointer = MultiRunCheckpointer(client)
app = graph.compile(checkpointer=checkpointer)
result = await app.ainvoke(
{"query": "hello"},
{"configurable": {"thread_id": "my-thread"}}
)Methods
aget(config) -> dict | None— load checkpoint for a thread (async)aput(config, checkpoint, metadata) -> dict— save checkpoint (async)get(config)/put(config, checkpoint, metadata)— sync versions (not recommended)
wrap_langgraph_node
Decorator to add step tracking to individual LangGraph nodes.
def wrap_langgraph_node(
node_fn=None,
*,
step_name: str | None = None,
step_kind: StepKind = StepKind.LLM_CALL,
checkpoint_after: bool = True,
)from multirun.adapters.langgraph import wrap_langgraph_node
@wrap_langgraph_node(checkpoint_after=True)
async def search_node(state: dict) -> dict:
results = await search_api(state["query"])
return {"sources": results}
# Or wrap inline
graph.add_node("search", wrap_langgraph_node(search_func))If no active RunContext exists, the function runs without tracking.