🔝 Retour au Sommaire
L'une des grandes forces de FastAPI est sa capacité à gérer facilement la programmation asynchrone. Ce concept peut sembler intimidant au début, mais il est en réalité assez simple à comprendre et extrêmement puissant pour créer des APIs performantes.
Dans cette section, nous allons découvrir ce qu'est la programmation asynchrone, pourquoi elle est importante, et comment l'utiliser avec FastAPI pour créer des applications ultra-rapides.
Imaginez un restaurant avec un seul serveur :
Mode synchrone (traditionnel) :
- Le serveur prend la commande du client 1
- Le serveur attend que la cuisine prépare le plat (il ne fait rien d'autre)
- Le serveur apporte le plat au client 1
- Seulement maintenant, il peut prendre la commande du client 2
C'est inefficace ! Le serveur perd du temps à attendre.
Mode asynchrone :
- Le serveur prend la commande du client 1
- Il envoie la commande en cuisine
- Pendant que la cuisine prépare, il prend la commande du client 2
- Il envoie la commande 2 en cuisine
- Il prend la commande du client 3
- Quand un plat est prêt, il le livre
- Il continue à prendre d'autres commandes
Le serveur est beaucoup plus productif ! C'est exactement le principe de la programmation asynchrone.
Code synchrone : Chaque opération attend que la précédente soit terminée avant de commencer.
Code asynchrone : Pendant qu'une opération attend (une réponse d'une base de données, d'une API externe, etc.), le programme peut faire autre chose.
La programmation asynchrone est particulièrement utile quand votre application passe beaucoup de temps à attendre :
- ⏳ Requêtes vers des bases de données
- ⏳ Appels vers des APIs externes
- ⏳ Lecture/écriture de fichiers
- ⏳ Opérations réseau
C'est ce qu'on appelle des opérations I/O (Input/Output).
from fastapi import FastAPI
import time
app = FastAPI()
@app.get("/utilisateur/{id}")
def lire_utilisateur(id: int):
# Simuler une requête base de données (prend 2 secondes)
time.sleep(2)
return {"id": id, "nom": f"Utilisateur {id}"}Si 3 clients font une requête en même temps :
- Client 1 : attend 2 secondes
- Client 2 : attend 4 secondes (2 + 2)
- Client 3 : attend 6 secondes (2 + 2 + 2)
Temps total : 6 secondes
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/utilisateur/{id}")
async def lire_utilisateur(id: int):
# Simuler une requête base de données (prend 2 secondes)
await asyncio.sleep(2)
return {"id": id, "nom": f"Utilisateur {id}"}Si 3 clients font une requête en même temps :
- Les 3 requêtes sont traitées en parallèle
- Tous reçoivent leur réponse après environ 2 secondes
Temps total : ~2 secondes
C'est beaucoup plus rapide ! 🚀
La programmation asynchrone en Python utilise deux mots-clés principaux :
Le mot-clé async définit une fonction asynchrone (aussi appelée coroutine) :
async def ma_fonction():
return "Bonjour"Une fonction asynchrone ne s'exécute pas immédiatement quand elle est appelée. Elle retourne un objet qui peut être "attendu".
Le mot-clé await est utilisé pour attendre le résultat d'une opération asynchrone :
async def ma_fonction():
resultat = await operation_asynchrone()
return resultatImportant : Vous ne pouvez utiliser await que dans une fonction async.
# ✅ Correct
async def fonction_async():
resultat = await autre_fonction_async()
return resultat
# ❌ Incorrect - await sans async
def fonction_normale():
resultat = await autre_fonction_async() # Erreur !
return resultat
# ✅ Correct - async sans await (mais moins utile)
async def fonction_async_simple():
return "Hello" # Pas d'attente, mais c'est quand même asyncFastAPI rend la création d'endpoints asynchrones extrêmement simple. Il suffit d'ajouter async devant votre fonction !
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def lire_racine():
return {"message": "Hello World"}from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def lire_racine():
return {"message": "Hello World"}La seule différence ? Le mot async ! C'est tout.
Voici un guide simple pour savoir quand utiliser async :
- Des requêtes vers une base de données (avec un driver asynchrone)
- Des appels vers des APIs externes
- De la lecture/écriture de fichiers (avec des bibliothèques async)
- Des opérations réseau
- N'importe quelle opération qui attend quelque chose
- Des calculs intensifs (mathématiques, traitement de données)
- Des opérations purement CPU sans attente
- Des opérations très rapides qui ne bloquent pas
@app.get("/calcul")
def calculer_fibonacci(n: int):
# Calcul intensif - mieux en synchrone
def fib(x):
if x <= 1:
return x
return fib(x-1) + fib(x-2)
return {"resultat": fib(n)}import httpx
@app.get("/meteo")
async def obtenir_meteo(ville: str):
async with httpx.AsyncClient() as client:
# Requête HTTP vers une API externe
response = await client.get(f"https://api.meteo.com/{ville}")
return response.json()Pour profiter pleinement de l'asynchrone, vous devez utiliser des bibliothèques qui le supportent.
httpx est l'équivalent asynchrone de requests :
pip install httpxUtilisation synchrone (requests) :
import requests
@app.get("/utilisateurs")
def lire_utilisateurs():
response = requests.get("https://api.example.com/users")
return response.json()Utilisation asynchrone (httpx) :
import httpx
@app.get("/utilisateurs")
async def lire_utilisateurs():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/users")
return response.json()SQLAlchemy avec support async :
pip install sqlalchemy[asyncio]Motor (MongoDB async) :
pip install motorDatabases (multi-DB async) :
pip install databasesaiofiles (lecture/écriture async) :
pip install aiofilesimport aiofiles
@app.get("/lire-fichier")
async def lire_fichier():
async with aiofiles.open('mon_fichier.txt', mode='r', encoding='utf-8') as f:
contenu = await f.read()
return {"contenu": contenu}Créons une API qui récupère des informations depuis plusieurs sources externes :
from fastapi import FastAPI
import httpx
import asyncio
app = FastAPI()
@app.get("/informations/{nom_utilisateur}")
async def obtenir_informations(nom_utilisateur: str):
"""
Récupère les informations d'un utilisateur depuis plusieurs APIs
"""
async with httpx.AsyncClient() as client:
# Lancer toutes les requêtes en parallèle
taches = [
client.get(f"https://api.github.com/users/{nom_utilisateur}"),
client.get(f"https://api.twitter.com/users/{nom_utilisateur}"),
client.get(f"https://api.linkedin.com/users/{nom_utilisateur}")
]
# Attendre que toutes les requêtes se terminent
reponses = await asyncio.gather(*taches, return_exceptions=True)
resultats = {
"github": reponses[0].json() if not isinstance(reponses[0], Exception) else None,
"twitter": reponses[1].json() if not isinstance(reponses[1], Exception) else None,
"linkedin": reponses[2].json() if not isinstance(reponses[2], Exception) else None
}
return resultatsasync with: Context manager asynchroneasyncio.gather(): Exécute plusieurs opérations asynchrones en parallèlereturn_exceptions=True: Continue même si certaines requêtes échouent
Avantage : Les 3 requêtes se font en parallèle au lieu de l'une après l'autre !
Temps synchrone : Si chaque requête prend 1 seconde = 3 secondes total
Temps asynchrone : Toutes en parallèle = ~1 seconde total
asyncio.gather() est un outil puissant pour exécuter plusieurs tâches asynchrones en même temps.
import asyncio
@app.get("/multi-fetch")
async def recuperer_multiple():
async def fetch_data(id: int):
await asyncio.sleep(1) # Simule une opération longue
return {"id": id, "data": f"Données {id}"}
# Exécuter 5 requêtes en parallèle
resultats = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3),
fetch_data(4),
fetch_data(5)
)
return resultatsAu lieu de prendre 5 secondes (1 seconde × 5), cela ne prend qu'environ 1 seconde !
import asyncio
# Méthode 1 : gather (simple et direct)
async def avec_gather():
resultats = await asyncio.gather(
operation1(),
operation2(),
operation3()
)
return resultats
# Méthode 2 : create_task (plus de contrôle)
async def avec_tasks():
task1 = asyncio.create_task(operation1())
task2 = asyncio.create_task(operation2())
task3 = asyncio.create_task(operation3())
resultat1 = await task1
resultat2 = await task2
resultat3 = await task3
return [resultat1, resultat2, resultat3]gather est plus simple, create_task offre plus de flexibilité.
Vous pouvez définir des limites de temps pour vos opérations asynchrones :
import asyncio
@app.get("/avec-timeout")
async def endpoint_avec_timeout():
try:
# Timeout de 5 secondes
resultat = await asyncio.wait_for(
operation_longue(),
timeout=5.0
)
return resultat
except asyncio.TimeoutError:
return {"erreur": "L'opération a pris trop de temps"}
async def operation_longue():
await asyncio.sleep(10) # Prend 10 secondes
return {"data": "Terminé"}Il est important de comprendre la différence :
Plusieurs tâches progressent pendant des périodes de temps qui se chevauchent, mais pas exactement en même temps.
Tâche A: ===|wait|========|wait|===
Tâche B: |wait|====|wait|========
Tâche C: |wait|====|wait|===
C'est ce que fait async/await - pendant qu'une tâche attend, une autre peut travailler.
Plusieurs tâches s'exécutent littéralement en même temps sur plusieurs cœurs CPU.
Core 1: Tâche A ====================
Core 2: Tâche B ====================
Core 3: Tâche C ====================
FastAPI avec async/await fait de la concurrence, pas du parallélisme.
Comparons les performances avec un exemple concret :
import time
@app.get("/sync/utilisateurs")
def obtenir_utilisateurs_sync():
resultats = []
for i in range(10):
time.sleep(0.5) # Simule une requête base de données
resultats.append({"id": i, "nom": f"Utilisateur {i}"})
return resultatsTemps d'exécution : 10 × 0.5 = 5 secondes
import asyncio
@app.get("/async/utilisateurs")
async def obtenir_utilisateurs_async():
async def fetch_user(i):
await asyncio.sleep(0.5) # Simule une requête base de données
return {"id": i, "nom": f"Utilisateur {i}"}
resultats = await asyncio.gather(*[fetch_user(i) for i in range(10)])
return resultatsTemps d'exécution : 0.5 secondes (toutes en parallèle !)
Amélioration : 10x plus rapide ! 🚀
❌ Mauvais :
import time
@app.get("/mauvais")
async def mauvais_endpoint():
time.sleep(2) # Bloque tout !
return {"message": "Terminé"}✅ Bon :
import asyncio
@app.get("/bon")
async def bon_endpoint():
await asyncio.sleep(2) # Permet à d'autres tâches de s'exécuter
return {"message": "Terminé"}❌ Mauvais :
@app.get("/oubli-await")
async def oubli_await():
resultat = operation_async() # Manque await !
return resultat # Retourne un objet coroutine, pas le résultat✅ Bon :
@app.get("/bon-await")
async def bon_await():
resultat = await operation_async() # Avec await
return resultat❌ Mauvais :
import requests
@app.get("/mauvais-requete")
async def mauvais_requete():
# requests est synchrone, bloque l'event loop
response = requests.get("https://api.example.com")
return response.json()✅ Bon :
import httpx
@app.get("/bonne-requete")
async def bonne_requete():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
return response.json()Attention à ne pas créer des milliers de tâches en même temps. Utilisez des limiteurs de concurrence :
from asyncio import Semaphore
@app.get("/avec-limite")
async def avec_limite():
semaphore = Semaphore(10) # Maximum 10 tâches simultanées
async def fetch_with_limit(id: int):
async with semaphore:
await asyncio.sleep(1)
return {"id": id}
resultats = await asyncio.gather(*[fetch_with_limit(i) for i in range(100)])
return resultatsParfois, vous voulez déclencher une tâche mais répondre immédiatement au client sans attendre :
from fastapi import BackgroundTasks
def envoyer_email(destinataire: str):
# Fonction qui prend du temps
time.sleep(5)
print(f"Email envoyé à {destinataire}")
@app.post("/inscription")
async def inscription(email: str, background_tasks: BackgroundTasks):
# Ajouter une tâche en arrière-plan
background_tasks.add_task(envoyer_email, email)
# Réponse immédiate sans attendre l'email
return {"message": "Inscription réussie, email de confirmation en cours d'envoi"}Le client reçoit une réponse immédiate, et l'email est envoyé en arrière-plan.
async def envoyer_email_async(destinataire: str):
await asyncio.sleep(5) # Opération asynchrone
print(f"Email envoyé à {destinataire}")
@app.post("/inscription-async")
async def inscription_async(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(envoyer_email_async, email)
return {"message": "Inscription réussie"}Voici un exemple complet qui combine plusieurs concepts :
from fastapi import FastAPI, HTTPException
import httpx
import asyncio
from datetime import datetime, timedelta
app = FastAPI()
# Cache simple en mémoire
cache_meteo = {}
@app.get("/meteo/{ville}")
async def obtenir_meteo(ville: str):
"""
Récupère la météo d'une ville avec mise en cache
"""
# Vérifier le cache
if ville in cache_meteo:
donnees_cache = cache_meteo[ville]
if datetime.now() - donnees_cache["timestamp"] < timedelta(minutes=30):
return {
"source": "cache",
"data": donnees_cache["data"]
}
# Pas en cache ou expiré, faire la requête
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.openweathermap.org/data/2.5/weather",
params={"q": ville, "appid": "VOTRE_CLE_API"},
timeout=5.0
)
response.raise_for_status()
data = response.json()
# Mettre en cache
cache_meteo[ville] = {
"data": data,
"timestamp": datetime.now()
}
return {
"source": "api",
"data": data
}
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Timeout lors de la récupération de la météo")
except httpx.HTTPError as e:
raise HTTPException(status_code=500, detail=f"Erreur API: {str(e)}")
@app.get("/meteo-multiple")
async def obtenir_meteo_multiple(villes: str):
"""
Récupère la météo de plusieurs villes en parallèle
Exemple: /meteo-multiple?villes=Paris,Londres,Berlin
"""
liste_villes = [v.strip() for v in villes.split(",")]
# Récupérer toutes les météos en parallèle
resultats = await asyncio.gather(
*[obtenir_meteo(ville) for ville in liste_villes],
return_exceptions=True
)
# Formater les résultats
reponse = {}
for ville, resultat in zip(liste_villes, resultats):
if isinstance(resultat, Exception):
reponse[ville] = {"erreur": str(resultat)}
else:
reponse[ville] = resultat
return reponse
@app.delete("/cache")
async def vider_cache():
"""Vider le cache de météo"""
cache_meteo.clear()
return {"message": "Cache vidé avec succès"}Pour mesurer les performances de vos endpoints :
from fastapi import FastAPI, Request
import time
app = FastAPI()
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return responseMaintenant, chaque réponse inclura le temps de traitement dans les headers !
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.get("/test-perf")
async def test_performance():
start = time.time()
# Votre code ici
await asyncio.sleep(0.5)
duree = time.time() - start
logger.info(f"Endpoint exécuté en {duree:.2f} secondes")
return {"duree": duree}Toutes les opérations qui attendent (base de données, API, fichiers) doivent être asynchrones.
Ne lancez pas des milliers de tâches en même temps. Utilisez des limiteurs.
Les données qui changent peu doivent être mises en cache pour éviter des requêtes inutiles.
Définissez toujours des timeouts pour vos opérations externes.
Utilisez des outils pour mesurer et identifier les goulots d'étranglement.
Pour les bases de données, utilisez un pool de connexions :
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20, # 20 connexions dans le pool
max_overflow=10
)L'asynchrone n'est pas toujours la solution. N'utilisez pas async si :
- Calculs intensifs CPU : Les calculs purs ne bénéficient pas d'async
- Code bloquant inévitable : Si vous devez utiliser des bibliothèques synchrones
- Simplicité nécessaire : Pour des scripts simples, le synchrone est plus simple
- Pas d'opérations I/O : Si votre code ne fait qu'exécuter de la logique
Pour les calculs intensifs, utilisez plutôt le multiprocessing.
Dans cette section, vous avez appris :
✅ Ce qu'est la programmation asynchrone et pourquoi elle est importante
✅ Comment utiliser async et await en Python
✅ Quand utiliser async et quand ne pas l'utiliser
✅ Comment créer des endpoints asynchrones avec FastAPI
✅ Comment utiliser des bibliothèques asynchrones (httpx, aiofiles, etc.)
✅ Comment exécuter plusieurs tâches en parallèle avec asyncio.gather()
✅ Comment gérer les timeouts et les erreurs
✅ Les pièges courants et comment les éviter
✅ Comment utiliser les background tasks
✅ Les bonnes pratiques pour optimiser les performances
Avec la programmation asynchrone, vous pouvez créer des APIs FastAPI incroyablement performantes capables de gérer des milliers de requêtes simultanées ! C'est l'un des grands avantages de FastAPI par rapport à d'autres frameworks Python.