Skip to Content
DocumentaçãoGuiasStreaming

Resposta em streaming

O streaming permite receber a saída do modelo em tempo real durante a geração, melhorando a experiência do usuário e a percepção de velocidade.

Como funciona

O OfoxAI implementa streaming através do protocolo Server-Sent Events (SSE):

  1. O cliente define stream: true na requisição
  2. O servidor retorna fragmentos de conteúdo gerado (chunks) progressivamente
  3. Cada chunk é enviado via SSE com o prefixo data:
  4. Ao final da geração, é enviado data: [DONE]

Streaming com protocolo OpenAI

Terminal
curl https://api.ofox.ai/v1/chat/completions \ -H "Authorization: Bearer $OFOX_API_KEY" \ -H "Content-Type: application/json" \ -N \ -d '{ "model": "openai/gpt-4o", "messages": [{"role": "user", "content": "Escreva um poema sobre programação"}], "stream": true }'

Streaming com protocolo Anthropic

stream_anthropic.py
import anthropic client = anthropic.Anthropic( base_url="https://api.ofox.ai/anthropic", api_key="<Sua OFOXAI_API_KEY>" ) with client.messages.stream( model="anthropic/claude-sonnet-4.5", max_tokens=1024, messages=[{"role": "user", "content": "Escreva um poema sobre programação"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

Streaming + Function Calling

O streaming também suporta Function Calling. O modelo primeiro transmite a requisição de chamada de ferramenta, você a processa e depois continua o diálogo:

stream_with_tools.py
stream = client.chat.completions.create( model="openai/gpt-4o", messages=[{"role": "user", "content": "Como está o tempo em Pequim hoje?"}], tools=[{ "type": "function", "function": { "name": "get_weather", "description": "Obter o clima de uma cidade específica", "parameters": { "type": "object", "properties": { "city": {"type": "string", "description": "Nome da cidade"} }, "required": ["city"] } } }], stream=True ) for chunk in stream: delta = chunk.choices[0].delta if delta.tool_calls: # Processar chamada de ferramenta print(f"Chamada de ferramenta: {delta.tool_calls[0].function}") elif delta.content: print(delta.content, end="", flush=True)

Tratamento de erros e reconexão

Conexões de streaming podem ser interrompidas por problemas de rede. Implemente uma lógica de reconexão.

stream_retry.py
import time def stream_with_retry(client, max_retries=3, **kwargs): for attempt in range(max_retries): try: stream = client.chat.completions.create(stream=True, **kwargs) for chunk in stream: yield chunk return # Concluído com sucesso except Exception as e: if attempt < max_retries - 1: wait = 2 ** attempt # Backoff exponencial print(f"\nConexão interrompida, reconectando em {wait}s...") time.sleep(wait) else: raise e

Boas práticas

  1. Sempre configurar timeout — Evitar espera indefinida
  2. Tratar chunks incompletos — Alguns chunks podem não conter content
  3. Implementar mecanismo de reconexão — Com backoff exponencial
  4. Usar flush no frontend — Garantir que o conteúdo seja exibido imediatamente
Last updated on