Skip to Content
DocumentationGuidesRéponses en streaming

Réponses en streaming

Les réponses en streaming (Streaming) vous permettent de recevoir la sortie en temps réel pendant le processus de génération du modèle, améliorant l’expérience utilisateur et la vitesse perçue.

Fonctionnement

OfoxAI utilise le protocole Server-Sent Events (SSE) pour implémenter les réponses en streaming :

  1. Le client envoie la requête avec stream: true
  2. Le serveur renvoie progressivement les fragments de contenu généré (chunk)
  3. Chaque chunk est envoyé via SSE avec le préfixe data:
  4. À la fin de la génération, data: [DONE] est envoyé

Streaming avec le protocole OpenAI

Terminal
curl https://api.ofox.ai/v1/chat/completions \ -H "Authorization: Bearer $OFOX_API_KEY" \ -H "Content-Type: application/json" \ -N \ -d '{ "model": "openai/gpt-4o", "messages": [{"role": "user", "content": "Écris un poème sur la programmation"}], "stream": true }'

Streaming avec le protocole Anthropic

stream_anthropic.py
import anthropic client = anthropic.Anthropic( base_url="https://api.ofox.ai/anthropic", api_key="<votre OFOXAI_API_KEY>" ) with client.messages.stream( model="anthropic/claude-sonnet-4.5", max_tokens=1024, messages=[{"role": "user", "content": "Écris un poème sur la programmation"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

Streaming + Function Calling

Les réponses en streaming supportent également les scénarios d’appel de fonctions. Le modèle transmet d’abord en streaming la requête d’appel d’outil, puis après traitement, la conversation continue :

stream_with_tools.py
stream = client.chat.completions.create( model="openai/gpt-4o", messages=[{"role": "user", "content": "Quel temps fait-il à Paris aujourd'hui ?"}], tools=[{ "type": "function", "function": { "name": "get_weather", "description": "Obtenir la météo d'une ville spécifique", "parameters": { "type": "object", "properties": { "city": {"type": "string", "description": "Nom de la ville"} }, "required": ["city"] } } }], stream=True ) for chunk in stream: delta = chunk.choices[0].delta if delta.tool_calls: # Traiter l'appel d'outil print(f"Appel d'outil : {delta.tool_calls[0].function}") elif delta.content: print(delta.content, end="", flush=True)

Gestion des erreurs et reconnexion

Les connexions de streaming peuvent être interrompues par des problèmes réseau. Il est recommandé d’implémenter une logique de reconnexion.

stream_retry.py
import time def stream_with_retry(client, max_retries=3, **kwargs): for attempt in range(max_retries): try: stream = client.chat.completions.create(stream=True, **kwargs) for chunk in stream: yield chunk return # Terminé avec succès except Exception as e: if attempt < max_retries - 1: wait = 2 ** attempt # Backoff exponentiel print(f"\nConnexion interrompue, nouvelle tentative dans {wait}s...") time.sleep(wait) else: raise e

Bonnes pratiques

  1. Configurez toujours un timeout — Évitez les attentes indéfinies
  2. Gérez les chunks incomplets — Certains chunks peuvent ne pas avoir de content
  3. Implémentez la reconnexion — Utilisez une stratégie de backoff exponentiel
  4. Utilisez flush côté frontend — Assurez l’affichage immédiat du contenu
Last updated on