Streaming-Antworten
Streaming ermöglicht es, die Modellausgabe während der Generierung in Echtzeit zu empfangen — für bessere Benutzererfahrung und gefühlt schnellere Antworten.
Funktionsweise
OfoxAI implementiert Streaming über das Server-Sent Events (SSE)-Protokoll:
- Der Client setzt in der Anfrage
stream: true - Der Server liefert generierte Inhaltsfragmente (Chunks) schrittweise zurück
- Jeder Chunk wird mit dem Präfix
data:per SSE gesendet - Am Ende der Generierung wird
data: [DONE]gesendet
OpenAI-Protokoll Streaming
cURL
Terminal
curl https://api.ofox.ai/v1/chat/completions \
-H "Authorization: Bearer $OFOX_API_KEY" \
-H "Content-Type: application/json" \
-N \
-d '{
"model": "openai/gpt-4o",
"messages": [{"role": "user", "content": "Schreiben Sie ein Gedicht über das Programmieren"}],
"stream": true
}'Anthropic-Protokoll Streaming
Python
stream_anthropic.py
import anthropic
client = anthropic.Anthropic(
base_url="https://api.ofox.ai/anthropic",
api_key="<Ihr OFOXAI_API_KEY>"
)
with client.messages.stream(
model="anthropic/claude-sonnet-4.5",
max_tokens=1024,
messages=[{"role": "user", "content": "Schreiben Sie ein Gedicht über das Programmieren"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)Streaming + Function Calling
Streaming unterstützt auch Function Calling. Das Modell streamt zunächst die Werkzeugaufrufanfrage, die Sie verarbeiten und dann den Dialog fortsetzen:
stream_with_tools.py
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Wie ist das Wetter heute in Peking?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "Wetter für eine bestimmte Stadt abrufen",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "Stadtname"}
},
"required": ["city"]
}
}
}],
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
# Werkzeugaufruf verarbeiten
print(f"Werkzeugaufruf: {delta.tool_calls[0].function}")
elif delta.content:
print(delta.content, end="", flush=True)Fehlerbehandlung und Reconnect
Streaming-Verbindungen können durch Netzwerkprobleme unterbrochen werden. Implementieren Sie eine Reconnect-Logik.
stream_retry.py
import time
def stream_with_retry(client, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(stream=True, **kwargs)
for chunk in stream:
yield chunk
return # Erfolgreich abgeschlossen
except Exception as e:
if attempt < max_retries - 1:
wait = 2 ** attempt # Exponentielles Backoff
print(f"\nVerbindung unterbrochen, Wiederholung in {wait}s...")
time.sleep(wait)
else:
raise eBest Practices
- Timeout immer setzen — Endloses Warten vermeiden
- Unvollständige Chunks behandeln — Manche Chunks enthalten keinen Content
- Reconnect-Mechanismus implementieren — Mit exponentiellem Backoff
- Im Frontend
flushverwenden — Sicherstellen, dass Inhalte sofort angezeigt werden
Last updated on