Skip to Content
DocumentaciónGuíasRespuestas en streaming

Respuestas en streaming

Las respuestas en streaming (Streaming) le permiten recibir la salida en tiempo real durante el proceso de generación del modelo, mejorando la experiencia del usuario y la velocidad percibida.

Funcionamiento

OfoxAI utiliza el protocolo Server-Sent Events (SSE) para implementar respuestas en streaming:

  1. El cliente envía la solicitud con stream: true
  2. El servidor devuelve progresivamente los fragmentos de contenido generado (chunk)
  3. Cada chunk se envía vía SSE con el prefijo data:
  4. Al finalizar la generación se envía data: [DONE]

Streaming con protocolo OpenAI

Terminal
curl https://api.ofox.ai/v1/chat/completions \ -H "Authorization: Bearer $OFOX_API_KEY" \ -H "Content-Type: application/json" \ -N \ -d '{ "model": "openai/gpt-4o", "messages": [{"role": "user", "content": "Escribe un poema sobre programación"}], "stream": true }'

Streaming con protocolo Anthropic

stream_anthropic.py
import anthropic client = anthropic.Anthropic( base_url="https://api.ofox.ai/anthropic", api_key="<su OFOXAI_API_KEY>" ) with client.messages.stream( model="anthropic/claude-sonnet-4.5", max_tokens=1024, messages=[{"role": "user", "content": "Escribe un poema sobre programación"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

Streaming + Function Calling

Las respuestas en streaming también soportan escenarios de llamada a funciones. El modelo primero transmite en streaming la solicitud de llamada a herramienta, y después de procesarla, continúa la conversación:

stream_with_tools.py
stream = client.chat.completions.create( model="openai/gpt-4o", messages=[{"role": "user", "content": "¿Cómo está el clima hoy en Madrid?"}], tools=[{ "type": "function", "function": { "name": "get_weather", "description": "Obtener el clima de una ciudad específica", "parameters": { "type": "object", "properties": { "city": {"type": "string", "description": "Nombre de la ciudad"} }, "required": ["city"] } } }], stream=True ) for chunk in stream: delta = chunk.choices[0].delta if delta.tool_calls: # Procesar la llamada a herramienta print(f"Llamando herramienta: {delta.tool_calls[0].function}") elif delta.content: print(delta.content, end="", flush=True)

Manejo de errores y reconexión

Las conexiones de streaming pueden interrumpirse por problemas de red. Se recomienda implementar lógica de reconexión.

stream_retry.py
import time def stream_with_retry(client, max_retries=3, **kwargs): for attempt in range(max_retries): try: stream = client.chat.completions.create(stream=True, **kwargs) for chunk in stream: yield chunk return # Completado exitosamente except Exception as e: if attempt < max_retries - 1: wait = 2 ** attempt # Retroceso exponencial print(f"\nConexión interrumpida, reintentando en {wait}s...") time.sleep(wait) else: raise e

Mejores prácticas

  1. Siempre configure un timeout — Evite esperas indefinidas
  2. Maneje chunks incompletos — Algunos chunks pueden no tener content
  3. Implemente reconexión — Use una estrategia de retroceso exponencial
  4. Use flush en el frontend — Asegure que el contenido se muestre inmediatamente
Last updated on