Skip to Content
DokumentationAnleitungenStreaming

Streaming-Antworten

Streaming ermöglicht es, die Modellausgabe während der Generierung in Echtzeit zu empfangen — für bessere Benutzererfahrung und gefühlt schnellere Antworten.

Funktionsweise

OfoxAI implementiert Streaming über das Server-Sent Events (SSE)-Protokoll:

  1. Der Client setzt in der Anfrage stream: true
  2. Der Server liefert generierte Inhaltsfragmente (Chunks) schrittweise zurück
  3. Jeder Chunk wird mit dem Präfix data: per SSE gesendet
  4. Am Ende der Generierung wird data: [DONE] gesendet

OpenAI-Protokoll Streaming

Terminal
curl https://api.ofox.ai/v1/chat/completions \ -H "Authorization: Bearer $OFOX_API_KEY" \ -H "Content-Type: application/json" \ -N \ -d '{ "model": "openai/gpt-4o", "messages": [{"role": "user", "content": "Schreiben Sie ein Gedicht über das Programmieren"}], "stream": true }'

Anthropic-Protokoll Streaming

stream_anthropic.py
import anthropic client = anthropic.Anthropic( base_url="https://api.ofox.ai/anthropic", api_key="<Ihr OFOXAI_API_KEY>" ) with client.messages.stream( model="anthropic/claude-sonnet-4.5", max_tokens=1024, messages=[{"role": "user", "content": "Schreiben Sie ein Gedicht über das Programmieren"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

Streaming + Function Calling

Streaming unterstützt auch Function Calling. Das Modell streamt zunächst die Werkzeugaufrufanfrage, die Sie verarbeiten und dann den Dialog fortsetzen:

stream_with_tools.py
stream = client.chat.completions.create( model="openai/gpt-4o", messages=[{"role": "user", "content": "Wie ist das Wetter heute in Peking?"}], tools=[{ "type": "function", "function": { "name": "get_weather", "description": "Wetter für eine bestimmte Stadt abrufen", "parameters": { "type": "object", "properties": { "city": {"type": "string", "description": "Stadtname"} }, "required": ["city"] } } }], stream=True ) for chunk in stream: delta = chunk.choices[0].delta if delta.tool_calls: # Werkzeugaufruf verarbeiten print(f"Werkzeugaufruf: {delta.tool_calls[0].function}") elif delta.content: print(delta.content, end="", flush=True)

Fehlerbehandlung und Reconnect

Streaming-Verbindungen können durch Netzwerkprobleme unterbrochen werden. Implementieren Sie eine Reconnect-Logik.

stream_retry.py
import time def stream_with_retry(client, max_retries=3, **kwargs): for attempt in range(max_retries): try: stream = client.chat.completions.create(stream=True, **kwargs) for chunk in stream: yield chunk return # Erfolgreich abgeschlossen except Exception as e: if attempt < max_retries - 1: wait = 2 ** attempt # Exponentielles Backoff print(f"\nVerbindung unterbrochen, Wiederholung in {wait}s...") time.sleep(wait) else: raise e

Best Practices

  1. Timeout immer setzen — Endloses Warten vermeiden
  2. Unvollständige Chunks behandeln — Manche Chunks enthalten keinen Content
  3. Reconnect-Mechanismus implementieren — Mit exponentiellem Backoff
  4. Im Frontend flush verwenden — Sicherstellen, dass Inhalte sofort angezeigt werden
Last updated on