Faire tourner un agent LangGraph dans un notebook Jupyter est simple. Le faire tourner en production à l'échelle — avec état persistant, streaming et contrôle des coûts — c'est là que la plupart des équipes se heurtent à un mur. Ce guide couvre les patterns avancés qui séparent les agents de démonstration des systèmes de qualité production.
Pourquoi LangGraph pour les agents en production ?
LangGraph modélise la logique d'agent sous forme de graphe orienté avec état où les noeuds sont des fonctions Python et les arêtes sont des règles de routage conditionnelles. Contrairement à une simple chaîne LLM, un graphe peut boucler, se ramifier et mémoriser — exactement ce dont vous avez besoin pour des agents qui planifient, exécutent des outils, observent les résultats et s'adaptent.
En avril 2026, LangGraph 0.2.x embarque le support natif pour le streaming async, des checkpointers configurables (SQLite, Postgres, DynamoDB) et un studio visuel (langgraph-studio) pour déboguer les transitions d'état en local.
1. Gestion d'état persistant
L'erreur de production la plus courante est de traiter chaque appel API comme sans état. Dans LangGraph, l'état est un dictionnaire typé qui traverse chaque noeud. Le persister vous permet de reprendre des workflows interrompus, d'implémenter des approbations humaines-dans-la-boucle, et de déboguer les échecs en production en rejouant les checkpoints.
Définir un schéma d'état typé
# state.py
from typing import Annotated, TypedDict
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
# add_messages fusionne les listes au lieu d'écraser (requis pour l'historique)
messages: Annotated[list, add_messages]
# Champs personnalisés pour votre domaine
task_id: str
tool_calls_remaining: int
cost_usd: float # suivre la dépense par exécution de grapheCheckpointer DynamoDB (grade production)
SQLite fonctionne en développement local. Pour Lambda, utilisez le checkpointer DynamoDB depuis langgraph-checkpoint-dynamodb (package communautaire, compatible avec LangGraph 0.2.x) :
import boto3
from langgraph.checkpoint.dynamodb import DynamoDBSaver
from langgraph.graph import StateGraph, END
from state import AgentState
# Une table, deux GSI : thread_id + checkpoint_id
dynamodb = boto3.resource("dynamodb", region_name="eu-west-1")
checkpointer = DynamoDBSaver(
table_name="langgraph-checkpoints",
dynamodb_resource=dynamodb,
)
builder = StateGraph(AgentState)
builder.add_node("planificateur", planner_node)
builder.add_node("executeur", executor_node)
builder.add_node("critique", critic_node)
builder.set_entry_point("planificateur")
builder.add_conditional_edges(
"executeur",
router_apres_execution, # retourne "critique" ou END
{"critique": "critique", END: END},
)
builder.add_edge("critique", "planificateur") # boucle de rétroaction
graph = builder.compile(checkpointer=checkpointer)
# Reprendre une conversation : passer le même thread_id suffit
config = {"configurable": {"thread_id": "session-utilisateur-abc123"}}
result = await graph.ainvoke({"messages": [message_utilisateur]}, config=config)2. Streaming et gestion de fenêtre de contexte
Les utilisateurs abandonnent les interfaces qui semblent figées. Le streaming token par token n'est plus optionnel pour les agents conversationnels. LangGraph fournit deux API de streaming : astream() pour les snapshots d'état et astream_events() pour les événements granulaires au niveau token.
Streamer une réponse de graphe via FastAPI
# api.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post("/chat/{thread_id}")
async def chat(thread_id: str, body: dict):
config = {"configurable": {"thread_id": thread_id}}
message = {"role": "user", "content": body["content"]}
async def flux_tokens():
async for event in graph.astream_events(
{"messages": [message]},
config=config,
version="v2",
):
kind = event["event"]
# Streamer les tokens depuis n'importe quel noeud LLM
if kind == "on_chat_model_stream":
chunk = event["data"]["chunk"]
if chunk.content:
# Format Server-Sent Events
yield f"data: {json.dumps({'token': chunk.content})}\n\n"
# Signaler les transitions de noeuds pour les indicateurs de progression UI
elif kind == "on_chain_start":
noeud = event.get("name", "")
if noeud in ("planificateur", "executeur", "critique"):
yield f"data: {json.dumps({'noeud': noeud})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(flux_tokens(), media_type="text/event-stream")Budgéter la fenêtre de contexte
Les agents à longue durée de vie accumulent l'historique des messages jusqu'à dépasser la fenêtre de contexte du modèle — et les coûts explosent. Implémentez un noeud de fenêtre glissante qui taille l'historique avant chaque appel LLM :
from langchain_core.messages import trim_messages
from langchain_anthropic import ChatAnthropic
model = ChatAnthropic(model="claude-sonnet-4-5", max_tokens=4096)
def noeud_trim_contexte(state: AgentState) -> AgentState:
"""Garder le message système + les N derniers tokens d'historique."""
trimmed = trim_messages(
state["messages"],
strategy="last",
token_counter=model,
max_tokens=60_000, # laisser 4k de marge pour la complétion
start_on="human", # toujours commencer par un tour humain
include_system=True,
)
return {"messages": trimmed}
# Insérer avant chaque noeud LLM
builder.add_node("trim_contexte", noeud_trim_contexte)
builder.add_edge("trim_contexte", "planificateur")3. Optimisation des coûts : routage LLM étagé
Tous les noeuds du graphe n'ont pas besoin de Claude ou GPT-4o. Une stratégie de routage à trois niveaux réduit systématiquement les coûts par requête de 50 à 70% :
| Type de tâche | Modèle | Coût / 1M tokens | Latence |
|---|---|---|---|
| Classification, décisions de routage | Ollama qwen3:8b (local) | 0 EUR | ~200ms |
| Sélection d'outils, extraction structurée | claude-haiku-4-5 | EUR 0,75 / EUR 3,75 | ~600ms |
| Synthèse finale, raisonnement complexe | claude-sonnet-4-5 | EUR 2,80 / EUR 14 | ~1,5s |
Implémenter le routage étagé dans LangGraph
from langchain_anthropic import ChatAnthropic
from langchain_ollama import ChatOllama
# Modèles instanciés une fois au niveau module (réutilisation container Lambda)
modele_economique = ChatOllama(
model="qwen3:8b",
base_url="http://100.x.x.x:11434", # IP Tailscale du GPU local
)
modele_moyen = ChatAnthropic(model="claude-haiku-4-5-20251001")
modele_fort = ChatAnthropic(model="claude-sonnet-4-5-20251001")
def router_par_complexite(state: AgentState) -> str:
"""Utiliser le modèle économique pour classifier, puis router vers le bon niveau."""
dernier_msg = state["messages"][-1].content
classification = modele_economique.invoke(
f"Classifie cette tâche (un mot) : SIMPLE | MODEREE | COMPLEXE\n{dernier_msg}"
).content.strip()
if "SIMPLE" in classification:
return "planificateur_economique"
elif "MODEREE" in classification:
return "planificateur_moyen"
else:
return "planificateur_fort"
builder.add_conditional_edges("routeur", router_par_complexite, {
"planificateur_economique": "planificateur_economique",
"planificateur_moyen": "planificateur_moyen",
"planificateur_fort": "planificateur_fort",
})ollama-python et un fichier GGUF embarqué) — mais attendez-vous à une image container de 2 à 3 Go.4. Déploiement sur AWS Lambda via Tailscale
La configuration de production canonique : une fonction Lambda exécute la logique du graphe tandis qu'un démon réseau Tailscale en mode userspace la connecte à vos serveurs d'inférence privés sans exposer aucun endpoint public. Cela supprime le besoin d'un VPC, d'une NAT gateway, et des coûts associés (~45 EUR/mois économisés par endpoint VPC).
Dockerfile (arm64, Lambda Web Adapter)
FROM public.ecr.aws/lambda/python:3.12-arm64
# Lambda Web Adapter pour le support streaming
COPY --from=public.ecr.aws/awsguru/aws-lambda-adapter:0.8.4 /lambda-adapter /opt/extensions/lambda-adapter
WORKDIR /var/task
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Binaire Tailscale userspace (aucun module kernel requis)
RUN curl -fsSL https://pkgs.tailscale.com/stable/tailscale_1.78.0_arm64.tgz | tar -xz
RUN mv tailscale_1.78.0_arm64/tailscale tailscale_1.78.0_arm64/tailscaled /usr/local/bin/
COPY app/ .
COPY entrypoint.sh .
RUN chmod +x entrypoint.sh
# Lambda Web Adapter attend la variable d'env PORT
ENV PORT=8080
EXPOSE 8080
CMD ["./entrypoint.sh"]entrypoint.sh — démarrage Tailscale
#!/bin/bash
set -e
# Démarrer le daemon Tailscale en mode userspace (pas de device TUN dans Lambda)
tailscaled --state=/tmp/tailscaled.state --tun=userspace-networking &
TS_PID=$!
# S'authentifier (clé auth depuis Secrets Manager, injectée via variable d'env)
tailscale up \
--authkey="${TS_AUTHKEY}" \
--hostname="lambda-agent-${AWS_LAMBDA_FUNCTION_NAME}" \
--advertise-tags=tag:lambda \
--accept-routes \
--timeout=15s
echo "Tailscale connecté : $(tailscale ip)"
# Démarrer FastAPI (accessible depuis Lambda Web Adapter)
exec uvicorn api:app --host 0.0.0.0 --port 8080Fragment serverless.yml
functions:
graph-agent:
image:
uri: 874735685088.dkr.ecr.eu-west-1.amazonaws.com/langgraph-agent:latest
architecture: arm64
memorySize: 1024 # Tailscale + Python tient dans 1 Go
timeout: 120 # autoriser les longues exécutions de graphe
environment:
TS_AUTHKEY: ${ssm:/talki/tailscale/lambda-authkey}
LANGGRAPH_CHECKPOINT_TABLE: langgraph-checkpoints
events:
- http:
path: /chat/{thread_id}
method: post
cors: true
provisionedConcurrency: 1 # garder un warm pour cold start < 2sCas d'usage : 60% de réduction de coûts chez une SaaS B2B
Une startup SaaS B2B utilisait un agent LangGraph pour alimenter son automatisation du support client. Configuration initiale : chaque message allait vers claude-sonnet-4-5, l'état était stocké dans un cluster ElastiCache compatible Redis, et la stack tournait sur une instance EC2 dédiée.
Facture mensuelle avant : EUR 2 200/mois (EC2 + ElastiCache + appels API Anthropic moyennant 12 000 tokens d'entrée par conversation).
Après migration vers LangGraph sur Lambda avec routage étagé et checkpoints DynamoDB :
- 73% des requêtes routent maintenant vers Ollama qwen3:8b sur un serveur local (connecté via Tailscale) — coût : 0 EUR.
- Le taillage de contexte a réduit les tokens d'entrée moyens de 12 000 à 4 800 par conversation (réduction de 60%).
- Lambda arm64 + Graviton2 a réduit le coût compute de 34% vs x86.
- DynamoDB On-Demand a remplacé ElastiCache — EUR 12/mois vs EUR 170/mois.
Facture mensuelle après : EUR 860/mois. Une réduction de 61% sans dégradation de la qualité des réponses (mesurée par les scores CSAT avant et après).
5. Débogage de graphe en production
Quand un agent produit une mauvaise réponse en production, vous devez reproduire l'état exact au moment de l'échec. Le checkpointer de LangGraph rend cela direct :
import asyncio
from votre_graphe import graph
async def rejouer_session_echouee(thread_id: str, checkpoint_id: str | None = None):
"""Rejouer un échec de production en local pour le débogage."""
config = {
"configurable": {
"thread_id": thread_id,
# Omettre checkpoint_id pour partir du dernier, ou épingler un spécifique
**({"checkpoint_id": checkpoint_id} if checkpoint_id else {}),
}
}
# Obtenir le snapshot d'état à ce checkpoint
state = await graph.aget_state(config)
print("État au checkpoint :")
print(f" messages : {len(state.values['messages'])} items")
print(f" noeuds suivants : {state.next}")
print(f" cout jusqu'ici : {state.values.get('cost_usd', 0):.4f} EUR")
# Relancer depuis ce checkpoint
result = await graph.ainvoke(None, config) # None = reprendre, ne pas ajouter de message
return result
asyncio.run(rejouer_session_echouee("session-utilisateur-abc123"))Checklist d'implémentation
- Schéma d'état : typé avec
TypedDict, inclure un champ de suivi des coûts dès le premier jour. - Checkpointer : DynamoDB pour les déploiements Lambda (serverless, pas de connection pooling), Postgres pour les serveurs dédiés.
- Taillage de contexte : insérer un noeud trim avant chaque appel LLM, ciblant 80% de la fenêtre de contexte du modèle.
- Routage étagé : classifier la complexité de la tâche avec un modèle local avant d'invoquer des modèles frontier coûteux.
- Streaming : exposer
astream_events()via SSE pour tout endpoint orienté utilisateur. - Réseau Tailscale : utiliser le mode userspace dans Lambda — pas de VPC, pas de NAT, pas d'IP publique pour votre serveur d'inférence.
- Outillage de rejeu : toujours vérifier que vous pouvez rejouer un checkpoint avant de déployer en production.
FAQ
LangChain est une boîte à outils pour construire des applications LLM avec des chaînes et de la récupération. LangGraph est une couche au-dessus qui modélise la logique d'agent sous forme de graphe orienté et avec état, permettant les cycles, les branchements et les checkpoints persistants — indispensable pour les agents multi-étapes complexes.
Oui. LangGraph expose astream() et astream_events() pour le streaming au niveau des tokens. Vous pouvez streamer à la fois la réponse finale et les sorties intermédiaires des noeuds, ce qui est critique pour des expériences utilisateur à faible latence.
Utilisez le checkpointer intégré de LangGraph avec un backend DynamoDB ou PostgreSQL. Chaque invocation charge le checkpoint par thread_id, exécute le prochain noeud du graphe, et réécrit l'état mis à jour avant de retourner.
Routez les tâches de raisonnement court vers un modèle Ollama local (via Tailscale), réservez Claude ou GPT-4o uniquement pour la synthèse finale, et mettez en cache les résultats répétés de sous-graphes. Les équipes utilisant cette approche étagée réduisent typiquement leurs coûts de tokens de 50 à 70%.
Oui. Packagés en container Docker (arm64), connectés à votre serveur d'inférence local via le réseau userspace Tailscale, et exposés derrière API Gateway. Le cold-start est typiquement inférieur à 2 secondes avec une concurrence provisionnée pré-chauffée.