Resposta em streaming
O streaming permite receber a saída do modelo em tempo real durante a geração, melhorando a experiência do usuário e a percepção de velocidade.
Como funciona
O OfoxAI implementa streaming através do protocolo Server-Sent Events (SSE):
- O cliente define
stream: truena requisição - O servidor retorna fragmentos de conteúdo gerado (chunks) progressivamente
- Cada chunk é enviado via SSE com o prefixo
data: - Ao final da geração, é enviado
data: [DONE]
Streaming com protocolo OpenAI
cURL
Terminal
curl https://api.ofox.ai/v1/chat/completions \
-H "Authorization: Bearer $OFOX_API_KEY" \
-H "Content-Type: application/json" \
-N \
-d '{
"model": "openai/gpt-4o",
"messages": [{"role": "user", "content": "Escreva um poema sobre programação"}],
"stream": true
}'Streaming com protocolo Anthropic
Python
stream_anthropic.py
import anthropic
client = anthropic.Anthropic(
base_url="https://api.ofox.ai/anthropic",
api_key="<Sua OFOXAI_API_KEY>"
)
with client.messages.stream(
model="anthropic/claude-sonnet-4.5",
max_tokens=1024,
messages=[{"role": "user", "content": "Escreva um poema sobre programação"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)Streaming + Function Calling
O streaming também suporta Function Calling. O modelo primeiro transmite a requisição de chamada de ferramenta, você a processa e depois continua o diálogo:
stream_with_tools.py
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Como está o tempo em Pequim hoje?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "Obter o clima de uma cidade específica",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "Nome da cidade"}
},
"required": ["city"]
}
}
}],
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
# Processar chamada de ferramenta
print(f"Chamada de ferramenta: {delta.tool_calls[0].function}")
elif delta.content:
print(delta.content, end="", flush=True)Tratamento de erros e reconexão
Conexões de streaming podem ser interrompidas por problemas de rede. Implemente uma lógica de reconexão.
stream_retry.py
import time
def stream_with_retry(client, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(stream=True, **kwargs)
for chunk in stream:
yield chunk
return # Concluído com sucesso
except Exception as e:
if attempt < max_retries - 1:
wait = 2 ** attempt # Backoff exponencial
print(f"\nConexão interrompida, reconectando em {wait}s...")
time.sleep(wait)
else:
raise eBoas práticas
- Sempre configurar timeout — Evitar espera indefinida
- Tratar chunks incompletos — Alguns chunks podem não conter content
- Implementar mecanismo de reconexão — Com backoff exponencial
- Usar
flushno frontend — Garantir que o conteúdo seja exibido imediatamente
Last updated on