Talki Academy
Technique22 min de lecture

LangChain & LangGraph : Patterns Avancés pour la Production

Plongée dans les patterns LangGraph avancés en production : gestion d'état persistant, streaming avec fenêtres de contexte, optimisation des coûts et déploiement d'agents graph sur AWS Lambda via Tailscale. Code réel et cas d'usage avec réduction des coûts de 60%.

Par Talki Academy·Mis a jour le 28 avril 2026

Faire tourner un agent LangGraph dans un notebook Jupyter est simple. Le faire tourner en production à l'échelle — avec état persistant, streaming et contrôle des coûts — c'est là que la plupart des équipes se heurtent à un mur. Ce guide couvre les patterns avancés qui séparent les agents de démonstration des systèmes de qualité production.

Pourquoi LangGraph pour les agents en production ?

LangGraph modélise la logique d'agent sous forme de graphe orienté avec état où les noeuds sont des fonctions Python et les arêtes sont des règles de routage conditionnelles. Contrairement à une simple chaîne LLM, un graphe peut boucler, se ramifier et mémoriser — exactement ce dont vous avez besoin pour des agents qui planifient, exécutent des outils, observent les résultats et s'adaptent.

En avril 2026, LangGraph 0.2.x embarque le support natif pour le streaming async, des checkpointers configurables (SQLite, Postgres, DynamoDB) et un studio visuel (langgraph-studio) pour déboguer les transitions d'état en local.

1. Gestion d'état persistant

L'erreur de production la plus courante est de traiter chaque appel API comme sans état. Dans LangGraph, l'état est un dictionnaire typé qui traverse chaque noeud. Le persister vous permet de reprendre des workflows interrompus, d'implémenter des approbations humaines-dans-la-boucle, et de déboguer les échecs en production en rejouant les checkpoints.

Définir un schéma d'état typé

# state.py
from typing import Annotated, TypedDict
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    # add_messages fusionne les listes au lieu d'écraser (requis pour l'historique)
    messages: Annotated[list, add_messages]
    # Champs personnalisés pour votre domaine
    task_id: str
    tool_calls_remaining: int
    cost_usd: float  # suivre la dépense par exécution de graphe

Checkpointer DynamoDB (grade production)

SQLite fonctionne en développement local. Pour Lambda, utilisez le checkpointer DynamoDB depuis langgraph-checkpoint-dynamodb (package communautaire, compatible avec LangGraph 0.2.x) :

import boto3
from langgraph.checkpoint.dynamodb import DynamoDBSaver
from langgraph.graph import StateGraph, END
from state import AgentState

# Une table, deux GSI : thread_id + checkpoint_id
dynamodb = boto3.resource("dynamodb", region_name="eu-west-1")
checkpointer = DynamoDBSaver(
    table_name="langgraph-checkpoints",
    dynamodb_resource=dynamodb,
)

builder = StateGraph(AgentState)
builder.add_node("planificateur", planner_node)
builder.add_node("executeur", executor_node)
builder.add_node("critique", critic_node)
builder.set_entry_point("planificateur")
builder.add_conditional_edges(
    "executeur",
    router_apres_execution,       # retourne "critique" ou END
    {"critique": "critique", END: END},
)
builder.add_edge("critique", "planificateur")  # boucle de rétroaction

graph = builder.compile(checkpointer=checkpointer)

# Reprendre une conversation : passer le même thread_id suffit
config = {"configurable": {"thread_id": "session-utilisateur-abc123"}}
result = await graph.ainvoke({"messages": [message_utilisateur]}, config=config)
Astuce pro : Définissez un attribut TTL sur votre table DynamoDB de checkpoints (ex. 7 jours) pour éviter des coûts de stockage incontrôlés. Le checkpointer écrit un item par invocation de graphe par thread.

2. Streaming et gestion de fenêtre de contexte

Les utilisateurs abandonnent les interfaces qui semblent figées. Le streaming token par token n'est plus optionnel pour les agents conversationnels. LangGraph fournit deux API de streaming : astream() pour les snapshots d'état et astream_events() pour les événements granulaires au niveau token.

Streamer une réponse de graphe via FastAPI

# api.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/chat/{thread_id}")
async def chat(thread_id: str, body: dict):
    config = {"configurable": {"thread_id": thread_id}}
    message = {"role": "user", "content": body["content"]}

    async def flux_tokens():
        async for event in graph.astream_events(
            {"messages": [message]},
            config=config,
            version="v2",
        ):
            kind = event["event"]
            # Streamer les tokens depuis n'importe quel noeud LLM
            if kind == "on_chat_model_stream":
                chunk = event["data"]["chunk"]
                if chunk.content:
                    # Format Server-Sent Events
                    yield f"data: {json.dumps({'token': chunk.content})}\n\n"
            # Signaler les transitions de noeuds pour les indicateurs de progression UI
            elif kind == "on_chain_start":
                noeud = event.get("name", "")
                if noeud in ("planificateur", "executeur", "critique"):
                    yield f"data: {json.dumps({'noeud': noeud})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(flux_tokens(), media_type="text/event-stream")

Budgéter la fenêtre de contexte

Les agents à longue durée de vie accumulent l'historique des messages jusqu'à dépasser la fenêtre de contexte du modèle — et les coûts explosent. Implémentez un noeud de fenêtre glissante qui taille l'historique avant chaque appel LLM :

from langchain_core.messages import trim_messages
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-sonnet-4-5", max_tokens=4096)

def noeud_trim_contexte(state: AgentState) -> AgentState:
    """Garder le message système + les N derniers tokens d'historique."""
    trimmed = trim_messages(
        state["messages"],
        strategy="last",
        token_counter=model,
        max_tokens=60_000,   # laisser 4k de marge pour la complétion
        start_on="human",    # toujours commencer par un tour humain
        include_system=True,
    )
    return {"messages": trimmed}

# Insérer avant chaque noeud LLM
builder.add_node("trim_contexte", noeud_trim_contexte)
builder.add_edge("trim_contexte", "planificateur")

3. Optimisation des coûts : routage LLM étagé

Tous les noeuds du graphe n'ont pas besoin de Claude ou GPT-4o. Une stratégie de routage à trois niveaux réduit systématiquement les coûts par requête de 50 à 70% :

Type de tâcheModèleCoût / 1M tokensLatence
Classification, décisions de routageOllama qwen3:8b (local)0 EUR~200ms
Sélection d'outils, extraction structuréeclaude-haiku-4-5EUR 0,75 / EUR 3,75~600ms
Synthèse finale, raisonnement complexeclaude-sonnet-4-5EUR 2,80 / EUR 14~1,5s

Implémenter le routage étagé dans LangGraph

from langchain_anthropic import ChatAnthropic
from langchain_ollama import ChatOllama

# Modèles instanciés une fois au niveau module (réutilisation container Lambda)
modele_economique = ChatOllama(
    model="qwen3:8b",
    base_url="http://100.x.x.x:11434",  # IP Tailscale du GPU local
)
modele_moyen = ChatAnthropic(model="claude-haiku-4-5-20251001")
modele_fort = ChatAnthropic(model="claude-sonnet-4-5-20251001")

def router_par_complexite(state: AgentState) -> str:
    """Utiliser le modèle économique pour classifier, puis router vers le bon niveau."""
    dernier_msg = state["messages"][-1].content
    classification = modele_economique.invoke(
        f"Classifie cette tâche (un mot) : SIMPLE | MODEREE | COMPLEXE\n{dernier_msg}"
    ).content.strip()

    if "SIMPLE" in classification:
        return "planificateur_economique"
    elif "MODEREE" in classification:
        return "planificateur_moyen"
    else:
        return "planificateur_fort"

builder.add_conditional_edges("routeur", router_par_complexite, {
    "planificateur_economique": "planificateur_economique",
    "planificateur_moyen": "planificateur_moyen",
    "planificateur_fort": "planificateur_fort",
})
Attention : Ollama via Tailscale ajoute ~5 à 15ms de latence réseau par rapport à localhost. Pour des SLA inférieurs à 100ms, faites tourner le petit modèle directement dans le container Lambda (avec ollama-python et un fichier GGUF embarqué) — mais attendez-vous à une image container de 2 à 3 Go.

4. Déploiement sur AWS Lambda via Tailscale

La configuration de production canonique : une fonction Lambda exécute la logique du graphe tandis qu'un démon réseau Tailscale en mode userspace la connecte à vos serveurs d'inférence privés sans exposer aucun endpoint public. Cela supprime le besoin d'un VPC, d'une NAT gateway, et des coûts associés (~45 EUR/mois économisés par endpoint VPC).

Dockerfile (arm64, Lambda Web Adapter)

FROM public.ecr.aws/lambda/python:3.12-arm64

# Lambda Web Adapter pour le support streaming
COPY --from=public.ecr.aws/awsguru/aws-lambda-adapter:0.8.4      /lambda-adapter /opt/extensions/lambda-adapter

WORKDIR /var/task

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Binaire Tailscale userspace (aucun module kernel requis)
RUN curl -fsSL https://pkgs.tailscale.com/stable/tailscale_1.78.0_arm64.tgz | tar -xz
RUN mv tailscale_1.78.0_arm64/tailscale tailscale_1.78.0_arm64/tailscaled /usr/local/bin/

COPY app/ .
COPY entrypoint.sh .
RUN chmod +x entrypoint.sh

# Lambda Web Adapter attend la variable d'env PORT
ENV PORT=8080
EXPOSE 8080

CMD ["./entrypoint.sh"]

entrypoint.sh — démarrage Tailscale

#!/bin/bash
set -e

# Démarrer le daemon Tailscale en mode userspace (pas de device TUN dans Lambda)
tailscaled --state=/tmp/tailscaled.state --tun=userspace-networking &
TS_PID=$!

# S'authentifier (clé auth depuis Secrets Manager, injectée via variable d'env)
tailscale up \
  --authkey="${TS_AUTHKEY}" \
  --hostname="lambda-agent-${AWS_LAMBDA_FUNCTION_NAME}" \
  --advertise-tags=tag:lambda \
  --accept-routes \
  --timeout=15s

echo "Tailscale connecté : $(tailscale ip)"

# Démarrer FastAPI (accessible depuis Lambda Web Adapter)
exec uvicorn api:app --host 0.0.0.0 --port 8080

Fragment serverless.yml

functions:
  graph-agent:
    image:
      uri: 874735685088.dkr.ecr.eu-west-1.amazonaws.com/langgraph-agent:latest
    architecture: arm64
    memorySize: 1024    # Tailscale + Python tient dans 1 Go
    timeout: 120        # autoriser les longues exécutions de graphe
    environment:
      TS_AUTHKEY: ${ssm:/talki/tailscale/lambda-authkey}
      LANGGRAPH_CHECKPOINT_TABLE: langgraph-checkpoints
    events:
      - http:
          path: /chat/{thread_id}
          method: post
          cors: true
    provisionedConcurrency: 1   # garder un warm pour cold start < 2s

Cas d'usage : 60% de réduction de coûts chez une SaaS B2B

Une startup SaaS B2B utilisait un agent LangGraph pour alimenter son automatisation du support client. Configuration initiale : chaque message allait vers claude-sonnet-4-5, l'état était stocké dans un cluster ElastiCache compatible Redis, et la stack tournait sur une instance EC2 dédiée.

Facture mensuelle avant : EUR 2 200/mois (EC2 + ElastiCache + appels API Anthropic moyennant 12 000 tokens d'entrée par conversation).

Après migration vers LangGraph sur Lambda avec routage étagé et checkpoints DynamoDB :

  • 73% des requêtes routent maintenant vers Ollama qwen3:8b sur un serveur local (connecté via Tailscale) — coût : 0 EUR.
  • Le taillage de contexte a réduit les tokens d'entrée moyens de 12 000 à 4 800 par conversation (réduction de 60%).
  • Lambda arm64 + Graviton2 a réduit le coût compute de 34% vs x86.
  • DynamoDB On-Demand a remplacé ElastiCache — EUR 12/mois vs EUR 170/mois.

Facture mensuelle après : EUR 860/mois. Une réduction de 61% sans dégradation de la qualité des réponses (mesurée par les scores CSAT avant et après).

5. Débogage de graphe en production

Quand un agent produit une mauvaise réponse en production, vous devez reproduire l'état exact au moment de l'échec. Le checkpointer de LangGraph rend cela direct :

import asyncio
from votre_graphe import graph

async def rejouer_session_echouee(thread_id: str, checkpoint_id: str | None = None):
    """Rejouer un échec de production en local pour le débogage."""
    config = {
        "configurable": {
            "thread_id": thread_id,
            # Omettre checkpoint_id pour partir du dernier, ou épingler un spécifique
            **({"checkpoint_id": checkpoint_id} if checkpoint_id else {}),
        }
    }

    # Obtenir le snapshot d'état à ce checkpoint
    state = await graph.aget_state(config)
    print("État au checkpoint :")
    print(f"  messages : {len(state.values['messages'])} items")
    print(f"  noeuds suivants : {state.next}")
    print(f"  cout jusqu'ici : {state.values.get('cost_usd', 0):.4f} EUR")

    # Relancer depuis ce checkpoint
    result = await graph.ainvoke(None, config)  # None = reprendre, ne pas ajouter de message
    return result

asyncio.run(rejouer_session_echouee("session-utilisateur-abc123"))

Checklist d'implémentation

  • Schéma d'état : typé avec TypedDict, inclure un champ de suivi des coûts dès le premier jour.
  • Checkpointer : DynamoDB pour les déploiements Lambda (serverless, pas de connection pooling), Postgres pour les serveurs dédiés.
  • Taillage de contexte : insérer un noeud trim avant chaque appel LLM, ciblant 80% de la fenêtre de contexte du modèle.
  • Routage étagé : classifier la complexité de la tâche avec un modèle local avant d'invoquer des modèles frontier coûteux.
  • Streaming : exposer astream_events() via SSE pour tout endpoint orienté utilisateur.
  • Réseau Tailscale : utiliser le mode userspace dans Lambda — pas de VPC, pas de NAT, pas d'IP publique pour votre serveur d'inférence.
  • Outillage de rejeu : toujours vérifier que vous pouvez rejouer un checkpoint avant de déployer en production.

FAQ

Quelle est la différence entre LangChain et LangGraph ?

LangChain est une boîte à outils pour construire des applications LLM avec des chaînes et de la récupération. LangGraph est une couche au-dessus qui modélise la logique d'agent sous forme de graphe orienté et avec état, permettant les cycles, les branchements et les checkpoints persistants — indispensable pour les agents multi-étapes complexes.

LangGraph supporte-t-il le streaming des réponses ?

Oui. LangGraph expose astream() et astream_events() pour le streaming au niveau des tokens. Vous pouvez streamer à la fois la réponse finale et les sorties intermédiaires des noeuds, ce qui est critique pour des expériences utilisateur à faible latence.

Comment persister l'état du graphe entre les invocations Lambda ?

Utilisez le checkpointer intégré de LangGraph avec un backend DynamoDB ou PostgreSQL. Chaque invocation charge le checkpoint par thread_id, exécute le prochain noeud du graphe, et réécrit l'état mis à jour avant de retourner.

Quelle est la façon la moins chère de faire tourner des agents LangGraph en production ?

Routez les tâches de raisonnement court vers un modèle Ollama local (via Tailscale), réservez Claude ou GPT-4o uniquement pour la synthèse finale, et mettez en cache les résultats répétés de sous-graphes. Les équipes utilisant cette approche étagée réduisent typiquement leurs coûts de tokens de 50 à 70%.

Les agents LangGraph peuvent-ils tourner sur AWS Lambda ?

Oui. Packagés en container Docker (arm64), connectés à votre serveur d'inférence local via le réseau userspace Tailscale, et exposés derrière API Gateway. Le cold-start est typiquement inférieur à 2 secondes avec une concurrence provisionnée pré-chauffée.

Formez votre equipe a l'IA

Nos formations sont financables OPCO — reste a charge potentiel : 0€.

Voir les formationsVerifier eligibilite OPCO