State, Nodes & Conditional Edges
The state schema is the most important design decision in a LangGraph system: it's the contract between every node, the thing the checkpointer persists, and — in multi-agent graphs — the communication channel between agents. Get it right and routing, fan-out, and debugging all get easier.
Design the state schema the way you'd design a database table: every field should have a clear owner (who writes it) and clear consumers (who reads it). When two nodes both scribble into a vague notes field, you've built shared mutable state with no contract — the classic multi-agent failure mode. A good Lab 05 schema looks like: question (written by the user, read by planner), plan (written by planner, read by searchers), findings (appended by searchers, read by writer), draft (written by writer, read by critic), critique (written by critic, read by writer).
import operator
from typing import Annotated, TypedDict
class ResearchState(TypedDict):
question: str # input; nobody overwrites it
plan: list[str] # planner writes, searchers read
# Annotated[type, reducer]: when a node returns {"findings": [x]},
# the reducer APPENDS instead of replacing. This is how parallel
# searchers write concurrently without clobbering each other.
findings: Annotated[list[str], operator.add]
draft: str # writer writes, critic reads
critique: str # critic writes, writer reads
revision_count: int # loop guard
def planner(state: ResearchState) -> dict:
# decompose the question into 2-3 search subtasks (model call)
subtasks = ["subtask A", "subtask B"] # stub
return {"plan": subtasks}
def searcher(state: ResearchState) -> dict:
# runs once per subtask when fanned out; returns ONE finding
return {"findings": ["finding for one subtask"]}Annotated[list[str], operator.add] pattern is load-bearing: it declares a reducer that merges concurrent updates. Without it, two parallel searchers returning findings would conflict — with it, LangGraph appends both. Default behavior for un-annotated fields is last-write-wins replacement, which is what you want for draft and critique.Two rules keep nodes healthy. First, nodes should be as pure as possible: read state, do work (model calls, tool calls), return an update. Side effects beyond that (writing files, mutating globals) break resumability — if the graph replays from a checkpoint, side effects replay too. Second, state must be serializable — the checkpointer has to persist it. Strings, numbers, lists, dicts, Pydantic models: fine. Open file handles, DB connections, clients: keep those out of state (construct them inside nodes or pass via config).
Conditional edges: routing on state
from langgraph.graph import StateGraph, START, END
def critic(state: ResearchState) -> dict:
# model call that returns a verdict + critique text (stubbed)
verdict_ok = state["revision_count"] >= 1 # pretend logic
return {
"critique": "" if verdict_ok else "Tighten the citations.",
"revision_count": state["revision_count"] + 1,
}
def route_after_critic(state: ResearchState) -> str:
# Pure function of state -> name of the next node. No side effects.
if state["critique"] and state["revision_count"] <= 1:
return "revise" # one revision loop max (Lab 05 spec)
return "done"
builder = StateGraph(ResearchState)
builder.add_node("writer", writer)
builder.add_node("critic", critic)
builder.add_edge(START, "writer")
builder.add_edge("writer", "critic")
builder.add_conditional_edges(
"critic",
route_after_critic,
{"revise": "writer", "done": END}, # label -> destination map
)
graph = builder.compile()revision_count guard — every cycle in your graph needs a hard cap, exactly like the max-iteration guard in your Module 1 loop. A critic-writer cycle without one is an infinite loop with an API bill.| Edge type | API | Use for |
|---|---|---|
| Fixed | add_edge("a", "b") | Unconditional sequencing: planner always precedes searchers |
| Conditional | add_conditional_edges("a", router, mapping) | Branching on state: critic verdict, error vs. success paths |
| Fan-out | Router returns multiple destinations / per-task dispatch (Send-style API) | Parallel workers, one per subtask from the planner |
| Terminal | Edge to END | Graph completion; final state is returned |
For Lab 05's parallel searchers you need dynamic fan-out: the planner produces N subtasks at runtime, and you want N searcher executions with different inputs, running in the same step. LangGraph supports this via a dispatch mechanism in conditional edges (the Send API in current versions — check the docs for the exact signature in your installed version): the router returns one dispatch per subtask, each carrying its own slice of state, and the reducer on findings merges the results. If the API details shift, the concept doesn't: map over subtasks, reduce into shared state.
- ▸State is the contract: every field needs one clear writer and known readers.
- ▸Reducers (
Annotated[list, operator.add]) merge concurrent updates; default is last-write-wins. - ▸Nodes: read state, work, return partial update. No hidden side effects; state stays serializable.
- ▸Conditional edges = pure router function + label→destination map.
- ▸Every cycle gets a hard cap in state (
revision_count) — frameworks don't repeal the infinite-loop rule. - ▸Dynamic fan-out: map subtasks to parallel node runs, reduce results with a reducer.