Réponses en streaming
Les réponses en streaming (Streaming) vous permettent de recevoir la sortie en temps réel pendant le processus de génération du modèle, améliorant l’expérience utilisateur et la vitesse perçue.
Fonctionnement
OfoxAI utilise le protocole Server-Sent Events (SSE) pour implémenter les réponses en streaming :
- Le client envoie la requête avec
stream: true - Le serveur renvoie progressivement les fragments de contenu généré (chunk)
- Chaque chunk est envoyé via SSE avec le préfixe
data: - À la fin de la génération,
data: [DONE]est envoyé
Streaming avec le protocole OpenAI
cURL
Terminal
curl https://api.ofox.ai/v1/chat/completions \
-H "Authorization: Bearer $OFOX_API_KEY" \
-H "Content-Type: application/json" \
-N \
-d '{
"model": "openai/gpt-4o",
"messages": [{"role": "user", "content": "Écris un poème sur la programmation"}],
"stream": true
}'Streaming avec le protocole Anthropic
Python
stream_anthropic.py
import anthropic
client = anthropic.Anthropic(
base_url="https://api.ofox.ai/anthropic",
api_key="<votre OFOXAI_API_KEY>"
)
with client.messages.stream(
model="anthropic/claude-sonnet-4.5",
max_tokens=1024,
messages=[{"role": "user", "content": "Écris un poème sur la programmation"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)Streaming + Function Calling
Les réponses en streaming supportent également les scénarios d’appel de fonctions. Le modèle transmet d’abord en streaming la requête d’appel d’outil, puis après traitement, la conversation continue :
stream_with_tools.py
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Quel temps fait-il à Paris aujourd'hui ?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "Obtenir la météo d'une ville spécifique",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "Nom de la ville"}
},
"required": ["city"]
}
}
}],
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
# Traiter l'appel d'outil
print(f"Appel d'outil : {delta.tool_calls[0].function}")
elif delta.content:
print(delta.content, end="", flush=True)Gestion des erreurs et reconnexion
Les connexions de streaming peuvent être interrompues par des problèmes réseau. Il est recommandé d’implémenter une logique de reconnexion.
stream_retry.py
import time
def stream_with_retry(client, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(stream=True, **kwargs)
for chunk in stream:
yield chunk
return # Terminé avec succès
except Exception as e:
if attempt < max_retries - 1:
wait = 2 ** attempt # Backoff exponentiel
print(f"\nConnexion interrompue, nouvelle tentative dans {wait}s...")
time.sleep(wait)
else:
raise eBonnes pratiques
- Configurez toujours un timeout — Évitez les attentes indéfinies
- Gérez les chunks incomplets — Certains chunks peuvent ne pas avoir de content
- Implémentez la reconnexion — Utilisez une stratégie de backoff exponentiel
- Utilisez
flushcôté frontend — Assurez l’affichage immédiat du contenu
Last updated on