Skip to Content

Streaming

Streaming allows you to receive model output in real time as it’s being generated, improving user experience and perceived response speed.

How It Works

OfoxAI uses the Server-Sent Events (SSE) protocol for streaming:

  1. The client sends a request with stream: true
  2. The server progressively returns generated content chunks
  3. Each chunk is sent via SSE with a data: prefix
  4. data: [DONE] is sent when generation is complete

OpenAI Protocol Streaming

Terminal
curl https://api.ofox.ai/v1/chat/completions \ -H "Authorization: Bearer $OFOX_API_KEY" \ -H "Content-Type: application/json" \ -N \ -d '{ "model": "openai/gpt-4o", "messages": [{"role": "user", "content": "Write a poem about programming"}], "stream": true }'

Anthropic Protocol Streaming

stream_anthropic.py
import anthropic client = anthropic.Anthropic( base_url="https://api.ofox.ai/anthropic", api_key="<your OFOXAI_API_KEY>" ) with client.messages.stream( model="anthropic/claude-sonnet-4.5", max_tokens=1024, messages=[{"role": "user", "content": "Write a poem about programming"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

Streaming + Function Calling

Streaming also supports Function Calling scenarios. The model will stream the tool call request, and you can continue the conversation after processing:

stream_with_tools.py
stream = client.chat.completions.create( model="openai/gpt-4o", messages=[{"role": "user", "content": "What's the weather like in San Francisco today?"}], tools=[{ "type": "function", "function": { "name": "get_weather", "description": "Get weather for a given city", "parameters": { "type": "object", "properties": { "city": {"type": "string", "description": "City name"} }, "required": ["city"] } } }], stream=True ) for chunk in stream: delta = chunk.choices[0].delta if delta.tool_calls: # Handle tool call print(f"Calling tool: {delta.tool_calls[0].function}") elif delta.content: print(delta.content, end="", flush=True)

Error Handling and Reconnection

Streaming connections may be interrupted by network issues. We recommend implementing reconnection logic.

stream_retry.py
import time def stream_with_retry(client, max_retries=3, **kwargs): for attempt in range(max_retries): try: stream = client.chat.completions.create(stream=True, **kwargs) for chunk in stream: yield chunk return # Completed successfully except Exception as e: if attempt < max_retries - 1: wait = 2 ** attempt # Exponential backoff print(f"\nConnection interrupted, retrying in {wait}s...") time.sleep(wait) else: raise e

Best Practices

  1. Always set timeouts — Avoid indefinite waiting
  2. Handle incomplete chunks — Some chunks may not contain content
  3. Implement reconnection — Use exponential backoff strategy
  4. Use flush on the frontend — Ensure content is displayed immediately
Last updated on