Part 11. Deployment, recursion limits, error handling, debugging strategies, and what changes when you move to prod.
Compilation is not free
Build the graph once at module scope, not per request:
# Right — module level
agent = builder.compile(checkpointer=PostgresSaver(conn))
async def handle_message(request: Request) -> Response:
result = await agent.ainvoke(
{"messages": [HumanMessage(content=request.body)]},
config={"configurable": {"thread_id": request.session_id}},
)
return result
# Wrong — compiles on every request
async def handle_message(request: Request) -> Response:
agent = builder.compile() # compiles every time!
result = await agent.ainvoke(...)compile() creates the internal execution engine. Doing it per
request adds latency and memory pressure.
recursion_limit — preventing infinite loops
The default recursion limit is 25. An agent that calls tools more
than 25 times hits it and raises GraphRecursionError:
# Raise the limit for complex agents
graph = builder.compile(recursion_limit=100)Each node execution counts as one step toward the limit. A graph
with call_model → run_tools → call_model → run_tools → ... uses
one step per loop. After 100 steps, it stops.
Design for termination. If your agent can theoretically run forever (e.g., a tool-calling loop with no exit condition), add a step counter to the state:
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
step_count: int
def call_model(state: AgentState) -> dict:
if state["step_count"] >= 50:
return {"messages": [AIMessage(content="I've reached the maximum number of steps.")]}
return {"messages": [llm.bind_tools(tools).invoke(state["messages"])]}Error handling
NodeInterrupt — intentional pause
NodeInterrupt from langgraph.errors is raised by interrupt().
Catch it to handle the pause:
from langgraph.errors import NodeInterrupt
try:
result = graph.invoke(input_state, config=config)
except NodeInterrupt as e:
# Graph paused — inspect e.state and resume
pending = e.state["pending_approval"]
approved = ask_user(pending)
result = graph.invoke(Command(resume={"approval": approved}), config=config)GraphRecursionError — recursion limit hit
from langgraph.errors import GraphRecursionError
try:
result = graph.invoke(input_state)
except GraphRecursionError:
return {"messages": [AIMessage(content="I need more steps to complete this task.")]}Tool exceptions
ToolNode with handle_tool_errors=True (default) catches tool
exceptions and returns them as ToolMessage with status="error".
The agent loop continues — the model sees the error and can react.
To change the error message:
tool_node = ToolNode(tools, handle_tool_errors="Tool unavailable. Try again later.")Debugging strategies
Inspect state with get_state_history
history = graph.get_state_history(config={"configurable": {"thread_id": "abc"}})
for checkpoint in history:
print(f"Step {checkpoint.metadata['step']}")
print(f" Last message: {checkpoint.values['messages'][-1].content[:100]}")Walk the conversation backwards to find where things went wrong.
stream_mode="updates" — per-node logging
async for node_name, output in graph.astream(input_state, stream_mode="updates"):
print(f"[{node_name}] returned: {list(output.keys())}")Each step shows which node ran and what it returned.
astream_events — full trace
async for event in graph.astream_events(input_state, config=config, version="v2"):
print(f"{event['event']}: {event.get('name', '')}")The full trace — every model call, tool call, and state change. Use this to reproduce issues locally.
update_state — manual corrections
# Correct the model's last response
graph.update_state(
config,
{"messages": [AIMessage(content="sorry, let me reconsider")]},
)update_state writes to the current checkpoint. The next invoke
continues from the modified state. Useful for corrections without
restarting the conversation.
LangGraph Platform
LangGraph Platform (the managed offering) handles:
- Deployment (Docker, Kubernetes)
- Scaling (multiple replicas)
- Persistence (built-in Postgres checkpointer)
- UI (LangGraph Studio for visualization)
If you’re self-hosting, the open-source library gives you the graph execution. You handle the deployment, scaling, and persistence yourself.
Self-hosting checklist
- Compile the graph once at startup, not per request
-
PostgresSaverfor the checkpointer (notMemorySaver) -
PostgresStorefor cross-thread memory -
recursion_limitset appropriately for your agent’s complexity -
NodeInterrupthandling for human-in-the-loop -
GraphRecursionErrorhandling for runaway loops - Health check endpoint that calls
graph.invokewith a test input
invoke vs ainvoke in FastAPI
from fastapi import FastAPI, Request
app = FastAPI()
# Sync endpoint — use run_in_executor to avoid blocking
@app.post("/chat")
async def chat(request: Request):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: graph.invoke({"messages": [HumanMessage(content=request.json()["message"])]}),
)
return result
# Or use ainvoke (fully async)
@app.post("/chat")
async def chat(request: Request):
result = await graph.ainvoke(
{"messages": [HumanMessage(content=request.json()["message"])]},
config={"configurable": {"thread_id": request.session_id}},
)
return resultainvoke is the cleanest for async frameworks. It releases the
event loop while waiting for the model.
Common pitfalls
- Re-compiling on every request. Move
builder.compile()to module scope. recursion_limittoo low. 25 is conservative. Complex agents with many tool calls need 50–100.MemorySaverin production. State is lost on restart. UsePostgresSaver.interrupt()not handled. The caller must catchNodeInterruptand resume. If the caller just propagates the error, the graph run is abandoned.streamin an async handler. Blocks the event loop. Useastreaminstead.
See also
- 01-mental-model — the agent loop
- 08-checkpointers —
PostgresSaverfor prod - 07-streaming — streaming in prod
- 12-testing — testing before prod