LangGraph Agent: How to Build a Deterministic Plan-Execute with Memory
Build a production-ready LangGraph agent that plans, executes, validates tools, persists state, remembers context, and serves a deterministic JSON /agent.
When you separate planning from execution, you get to lock down an explicit sequence of steps before any tool actually fires. This cuts down on all that unpredictable back-and-forth you usually see with LLMs. What you'll do is ask the model to output a structured, multi-step plan first. Then your executor runs tools deterministically against that plan, complete with validation and guardrails. The result? Behavior you can actually repeat. If you want to dig deeper into how prompt structure and where you place information affects model performance, check out our analysis of position bias in long prompts.

Here's what this pattern gives you:
Determinism. Your plan is fixed before execution starts, so you can log it, audit it, and replay it whenever you need.
Safety. Every single tool call gets validated with Pydantic schemas both before and after execution.
Recovery. When a step fails (and they will), the agent can replan and keep going instead of just crashing.
Memory. LangGraph checkpointers persist state across turns, which makes multi-turn workflows actually possible.
You're going to build a FastAPI /agent endpoint backed by a LangGraph state graph that plans, executes, and replans on error when needed. What you end up with is a working, production-ready agent that you can extend with new tools, memory backends, and observability features. If you're just getting started with LangChain or want something hands-on, our guide to building reliable LangChain LLM workflows walks through the whole setup, from prompt-driven chains to structured outputs that actually work in production.
How It Works
Let me walk you through the high-level flow:
User sends
thread_idand query to/agentPlanner node invokes the LLM with structured output to generate a PlanModel (basically a list of steps)
Executor node runs each step:
For tool steps: validate input, call the tool, validate output, store result
For respond step: synthesize final answer from step results using the LLM
On error: route to Replan node, which generates a revised plan and re-enters execution
Checkpointer persists state per
thread_idfor conversation memoryAPI returns plan, step results, final answer, error (if any), and trace
This architecture keeps planning and execution separate. That's what makes the system auditable, testable, and honestly, pretty easy to extend.
Setup & Installation
You can run this in a Colab notebook or any local Python 3.10+ environment. First, install your dependencies:
!pip install -q langgraph langchain-openai pydantic httpx fastapi uvicorn
Set your OpenAI API key. If you're in Colab, store it in Secrets as OPENAI_API_KEY. For a local environment, just export it:
import os
try:
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
except ImportError:
pass # Not in Colab; ensure OPENAI_API_KEY is set in your shell
Quick verification that everything's working:
assert "OPENAI_API_KEY" in os.environ, "Set OPENAI_API_KEY in environment or Colab Secrets"
print("✓ Environment ready")
Step-by-Step Implementation
Define the agent's shared state and plan models
We're using TypedDict for the state and Pydantic for structured plan output. This gives us type safety and validation at every single step.
from typing import TypedDict, List, Dict, Any, Optional
from pydantic import BaseModel, Field
class PlanStep(BaseModel):
"""
Represents a single step in the agent's plan.
"""
id: int = Field(..., description="Step index starting at 1")
action: str = Field(..., description="Either 'tool' or 'respond'")
name: Optional[str] = Field(None, description="Tool name if action is 'tool'")
args: Optional[Dict[str, Any]] = Field(None, description="Arguments for the tool")
description: str = Field(..., description="Short description of the step")
class PlanModel(BaseModel):
"""
Represents the overall plan, including rationale and steps.
"""
rationale: str
steps: List[PlanStep]
class AgentState(TypedDict, total=False):
"""
Shared state for the agent, passed between nodes.
"""
user_input: str
plan: List[PlanStep]
step_results: List[Dict[str, Any]]
final_answer: Optional[str]
error: Optional[str]
trace: List[Dict[str, Any]]
Define safe tools with explicit schemas
Each tool gets Pydantic input/output models for validation. This is crucial. It prevents malformed data from propagating through your system. If your agents need robust retrieval capabilities, take a look at our comprehensive guide on implementing vector store retrieval for RAG systems. It covers semantic search, chunking, and how to reduce those pesky hallucinations in LLM outputs.
from pydantic import BaseModel, Field
import httpx
import math
class SumInput(BaseModel):
numbers: list[float] = Field(..., min_items=1)
class SumOutput(BaseModel):
total: float
def sum_numbers_tool(inp: SumInput) -> SumOutput:
"""Sums a list of numbers."""
total = float(math.fsum(inp.numbers))
return SumOutput(total=total)
class KBQueryInput(BaseModel):
topic: str = Field(..., min_length=1)
class KBQueryOutput(BaseModel):
topic: str
content: str
KB = {
"refund_policy": "Refunds available within 30 days with receipt.",
"sla": "Standard support SLA is 24 hours response time.",
}
def kb_retrieve_tool(inp: KBQueryInput) -> KBQueryOutput:
"""Retrieves a KB article by topic."""
topic = inp.topic.strip().lower()
if topic not in KB:
raise ValueError(f"Topic '{topic}' not found")
return KBQueryOutput(topic=topic, content=KB[topic])
class HttpGetInput(BaseModel):
url: str = Field(..., pattern=r"^https://httpbin.org/.*")
class HttpGetOutput(BaseModel):
status_code: int
json: dict
def http_get_json_tool(inp: HttpGetInput) -> HttpGetOutput:
"""Fetches JSON from a safe endpoint."""
with httpx.Client(timeout=10.0) as client:
resp = client.get(inp.url)
data = resp.json() if "application/json" in resp.headers.get("content-type", "") else {}
return HttpGetOutput(status_code=resp.status_code, json=data)
Wrap tools in a registry with validation
The registry validates inputs and outputs, catching errors before they can spread. This is absolutely critical for determinism and safety.
from typing import Callable, Type, Any
from pydantic import ValidationError
class ToolError(Exception):
"""Custom exception for tool validation or execution errors."""
pass
class Tool:
"""
Registry entry for a tool, including validation and execution.
"""
def __init__(self, name: str, description: str, input_model: Type[BaseModel], output_model: Type[BaseModel], fn: Callable[[Any], Any]):
self.name = name
self.description = description
self.input_model = input_model
self.output_model = output_model
self.fn = fn
def run(self, args: dict) -> dict:
"""Validates input, runs the tool, and validates output."""
try:
validated_in = self.input_model(**args)
except ValidationError as ve:
raise ToolError(f"Input validation failed for {self.name}: {ve}") from ve
try:
raw_out = self.fn(validated_in)
except Exception as e:
raise ToolError(f"Tool {self.name} execution failed: {e}") from e
try:
validated_out = self.output_model.model_validate(raw_out)
except ValidationError as ve:
raise ToolError(f"Output validation failed for {self.name}: {ve}") from ve
return validated_out.model_dump()
TOOL_REGISTRY: dict[str, Tool] = {
"sum_numbers": Tool(
name="sum_numbers",
description="Return the sum of an array of numbers",
input_model=SumInput,
output_model=SumOutput,
fn=sum_numbers_tool,
),
"kb_retrieve": Tool(
name="kb_retrieve",
description="Retrieve a short KB article by topic",
input_model=KBQueryInput,
output_model=KBQueryOutput,
fn=kb_retrieve_tool,
),
"http_get_json": Tool(
name="http_get_json",
description="GET JSON from https://httpbin.org endpoints only",
input_model=HttpGetInput,
output_model=HttpGetOutput,
fn=http_get_json_tool,
),
}
Prepare the LLM with structured output
We use LangChain's OpenAI wrapper with temperature=0 for deterministic planning. If you want to further improve the reliability of your agent's responses, our article on prompt engineering strategies for reliable LLM outputs goes deep into step-by-step prompt design and output formatting. The with_structured_output method ensures the LLM returns a valid PlanModel. And if you're still deciding which language model to use for your agent, our guide on how to pick an LLM for your application breaks down all the tradeoffs.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.0)
planner_llm = llm.with_structured_output(PlanModel)
PLAN_PROMPT = ChatPromptTemplate.from_messages([
("system", "You are a planning assistant. You must output a feasible, minimal plan."),
("system", "Available tools:\n{tool_summaries}\nOnly call tools listed above."),
("user", "User request: {user_input}\nProduce a plan with one or more steps. Use 'respond' as the last step."),
])
Implement the planner node
The planner generates a structured plan from the user's input. It validates that all tool steps actually refer to known tools.
def tool_summaries() -> str:
"""Returns a summary of available tools and their input schemas."""
lines = []
for t in TOOL_REGISTRY.values():
lines.append(f"- {t.name}: {t.description}; input={t.input_model.model_json_schema()['properties']}")
return "\n".join(lines)
def plan_node(state: AgentState) -> AgentState:
"""Planner node: generates a plan from user input using the LLM."""
ui = state["user_input"]
result = planner_llm.invoke(PLAN_PROMPT.format_messages(
tool_summaries=tool_summaries(),
user_input=ui,
))
plan: PlanModel = result
steps = []
for s in plan.steps:
if s.action == "tool" and (not s.name or s.name not in TOOL_REGISTRY):
raise ValueError(f"Planner proposed unknown tool: {s.name}")
steps.append(s)
return {
"plan": steps,
"trace": (state.get("trace") or []) + [{"event": "plan", "plan": [s.model_dump() for s in steps]}],
}
Implement the executor node
The executor runs each step in the plan. Tool steps get validated and executed via the registry. The final respond step synthesizes an answer from all the step results.
from langchain_core.output_parsers import StrOutputParser
ANSWER_PROMPT = ChatPromptTemplate.from_messages([
("system", "You are a precise assistant. Use the provided step results to answer."),
("user", "Question: {user_input}\nStep results:\n{step_results}\nCompose a concise answer."),
])
answer_llm = llm
def execute_node(state: AgentState) -> AgentState:
"""Executor node: runs the plan step by step, validates tool calls, and builds the final answer."""
plan = state.get("plan") or []
step_results = state.get("step_results") or []
trace = state.get("trace") or []
error = None
for step in plan:
if step.action == "tool":
if step.name not in TOOL_REGISTRY:
error = f"Unknown tool {step.name}"
trace.append({"event": "tool_error", "step_id": step.id, "error": error})
break
tool = TOOL_REGISTRY[step.name]
try:
result = tool.run(step.args or {})
step_results.append({"step_id": step.id, "tool": step.name, "args": step.args, "output": result})
trace.append({"event": "tool_ok", "step_id": step.id, "tool": step.name, "output": result})
except ToolError as te:
error = str(te)
trace.append({"event": "tool_error", "step_id": step.id, "tool": step.name, "error": error})
break
elif step.action == "respond":
sr_str = "\n".join([f"- Step {r['step_id']} ({r['tool']}): {r['output']}" for r in step_results])
msg = ANSWER_PROMPT.format_messages(user_input=state["user_input"], step_results=sr_str)
final = answer_llm.invoke(msg).content
trace.append({"event": "respond", "text": final})
return {"step_results": step_results, "final_answer": final, "trace": trace}
else:
error = f"Unknown action {step.action}"
trace.append({"event": "plan_error", "error": error})
break
if error:
return {"step_results": step_results, "error": error, "trace": trace}
return {"step_results": step_results, "error": "Plan missing 'respond' step", "trace": trace}
Add a replan node for recovery
When execution fails, the replan node generates a revised plan based on what's been completed and what went wrong. This is how you get graceful recovery.
REPLAN_PROMPT = ChatPromptTemplate.from_messages([
("system", "You are a repair planner. Create a minimal revised plan to complete the task."),
("system", "Available tools:\n{tool_summaries}"),
("user", "Original request: {user_input}\nCompleted steps:\n{done}\nError: {error}\nPropose a revised plan (include 'respond' as last step)."),
])
replanner_llm = llm.with_structured_output(PlanModel)
def replan_node(state: AgentState) -> AgentState:
"""Replanner node: generates a revised plan after an error."""
done_lines = []
for r in state.get("step_results") or []:
done_lines.append(f"Step {r['step_id']} {r['tool']} -> OK")
msgs = REPLAN_PROMPT.format_messages(
tool_summaries=tool_summaries(),
user_input=state["user_input"],
done="\n".join(done_lines) or "None",
error=state.get("error") or "Unknown error",
)
revised: PlanModel = replanner_llm.invoke(msgs)
steps = []
for s in revised.steps:
if s.action == "tool" and (not s.name or s.name not in TOOL_REGISTRY):
raise ValueError(f"Replanner proposed unknown tool: {s.name}")
steps.append(s)
trace = (state.get("trace") or []) + [{"event": "replan", "plan": [s.model_dump() for s in steps]}]
return {"plan": steps, "error": None, "trace": trace}
Wire the graph with conditional edges
LangGraph's StateGraph connects all the nodes. After execution, we route to replan on error or end on success. Pretty straightforward.
from langgraph.graph import StateGraph, END
graph = StateGraph(AgentState)
graph.add_node("plan", plan_node)
graph.add_node("execute", execute_node)
graph.add_node("replan", replan_node)
graph.add_edge("plan", "execute")
def route_after_execute(state: AgentState) -> str:
"""Determines the next node after execution."""
return "replan" if state.get("error") else END
graph.add_conditional_edges("execute", route_after_execute, {"replan": "replan", END: END})
graph.add_edge("replan", "execute")
graph.set_entry_point("plan")
Persist memory with checkpointers
LangGraph checkpointers persist state across turns per thread. This is what gives you conversation memory or multi-call workflows. For quick starts, the in-memory saver works fine. For production, you'll want Postgres. Actually, if you're curious about why LLM memory isn't infinite and how to manage all that accumulated context, our guide on context rot and LLM memory limitations has some practical strategies.
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
app_graph = graph.compile(checkpointer=memory)
Visualize your graph
LangGraph has built-in utilities for visualizing and inspecting your graph. Super helpful for debugging.
from IPython.display import Image, display
display(Image(app_graph.get_graph().draw_mermaid_png()))Serve a production-friendly JSON API
FastAPI gives you a lightweight, typed API for the agent. The /agent endpoint accepts a thread_id and query, invokes the graph, and returns the full state.
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional
api = FastAPI(title="Plan-Execute Agent")
class AgentRequest(BaseModel):
thread_id: str
query: str
class AgentResponse(BaseModel):
thread_id: str
plan: list[dict]
step_results: list[dict]
final_answer: Optional[str]
error: Optional[str]
trace: list[dict]
@api.post("/agent", response_model=AgentResponse)
def agent_endpoint(req: AgentRequest):
"""FastAPI endpoint for agent queries."""
state = app_graph.invoke({"user_input": req.query}, config={"configurable": {"thread_id": req.thread_id}})
plan = [s.model_dump() if hasattr(s, "model_dump") else s for s in state.get("plan", [])]
return AgentResponse(
thread_id=req.thread_id,
plan=plan,
step_results=state.get("step_results", []),
final_answer=state.get("final_answer"),
error=state.get("error"),
trace=state.get("trace", []),
)
Run and Validate
Test the graph directly
Let's invoke the graph with a sample query to verify planning and execution work:
result = app_graph.invoke(
{"user_input": "What is 10 + 20 + 30?"},
config={"configurable": {"thread_id": "test-thread-1"}}
)
print("Plan:", result.get("plan"))
print("Final Answer:", result.get("final_answer"))
print("Trace:", result.get("trace"))
Expected output: The planner generates a plan with a sum_numbers tool step and a respond step. The executor runs the tool and synthesizes the answer.
Test error handling and replanning
Now let's trigger an error by requesting a non-existent KB topic:
result = app_graph.invoke(
{"user_input": "What is the warranty policy?"},
config={"configurable": {"thread_id": "test-thread-2"}}
)
print("Error:", result.get("error"))
print("Trace:", result.get("trace"))
Expected output: The executor fails on the kb_retrieve step, routes to replan, and generates a revised plan. Or it returns a partial answer if replanning also fails.
Run the FastAPI server
Start the server in a notebook or local environment:
import uvicorn
import nest_asyncio
nest_asyncio.apply() # Required for running uvicorn in Jupyter/Colab
uvicorn.run(api, host="0.0.0.0", port=8000, log_level="info")
In a separate terminal or notebook cell, test the endpoint:
import requests
response = requests.post("http://localhost:8000/agent", json={
"thread_id": "user-123",
"query": "What is 5 + 10?"
})
print(response.json())
Expected output: A JSON response with plan, step_results, final_answer, and trace.
Conclusion
So you've built a deterministic, plan-execute agent with LangGraph, Pydantic-validated tools, and a FastAPI endpoint. The system plans before it acts, validates every tool call, and recovers from errors through replanning. Memory persists across turns using checkpointers, which enables multi-turn workflows.
Key decisions we made:
LangGraph for deterministic routing and state management
Pydantic for strict input/output validation
FastAPI for a lightweight, typed API
Temperature=0 for reproducible planning
Next steps to consider:
Swap MemorySaver for PostgresSaver for production persistence
Add retries with exponential backoff for transient tool failures
Extend the tool registry with new tools. Think database queries, external APIs. For advanced use cases, consider standardizing and reusing AI tools across applications by building an MCP server. It's a great way to enable scalable and maintainable AI infrastructure.
Add observability with structured logging or tracing. LangSmith is a good option here.
Harden prompts with explicit constraints and few-shot examples
If you're planning to adapt your agent to specialized tasks, our guide to parameter-efficient fine-tuning techniques like LoRA shows how to customize LLMs efficiently for production.
This architecture scales from prototypes to production. Start with the core build, validate it end-to-end, then layer in production features as you need them.