Streaming
Streaming allows you to receive model output in real time as it’s being generated, improving user experience and perceived response speed.
How It Works
OfoxAI uses the Server-Sent Events (SSE) protocol for streaming:
- The client sends a request with
stream: true - The server progressively returns generated content chunks
- Each chunk is sent via SSE with a
data:prefix data: [DONE]is sent when generation is complete
OpenAI Protocol Streaming
cURL
Terminal
curl https://api.ofox.ai/v1/chat/completions \
-H "Authorization: Bearer $OFOX_API_KEY" \
-H "Content-Type: application/json" \
-N \
-d '{
"model": "openai/gpt-4o",
"messages": [{"role": "user", "content": "Write a poem about programming"}],
"stream": true
}'Anthropic Protocol Streaming
Python
stream_anthropic.py
import anthropic
client = anthropic.Anthropic(
base_url="https://api.ofox.ai/anthropic",
api_key="<your OFOXAI_API_KEY>"
)
with client.messages.stream(
model="anthropic/claude-sonnet-4.5",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a poem about programming"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)Streaming + Function Calling
Streaming also supports Function Calling scenarios. The model will stream the tool call request, and you can continue the conversation after processing:
stream_with_tools.py
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "What's the weather like in San Francisco today?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a given city",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "City name"}
},
"required": ["city"]
}
}
}],
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
# Handle tool call
print(f"Calling tool: {delta.tool_calls[0].function}")
elif delta.content:
print(delta.content, end="", flush=True)Error Handling and Reconnection
Streaming connections may be interrupted by network issues. We recommend implementing reconnection logic.
stream_retry.py
import time
def stream_with_retry(client, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(stream=True, **kwargs)
for chunk in stream:
yield chunk
return # Completed successfully
except Exception as e:
if attempt < max_retries - 1:
wait = 2 ** attempt # Exponential backoff
print(f"\nConnection interrupted, retrying in {wait}s...")
time.sleep(wait)
else:
raise eBest Practices
- Always set timeouts — Avoid indefinite waiting
- Handle incomplete chunks — Some chunks may not contain content
- Implement reconnection — Use exponential backoff strategy
- Use
flushon the frontend — Ensure content is displayed immediately
Last updated on