Потоковые ответы
Потоковые ответы (Streaming) позволяют получать вывод модели в реальном времени по мере генерации, улучшая пользовательский опыт и воспринимаемую скорость.
Принцип работы
OfoxAI использует протокол Server-Sent Events (SSE) для реализации потоковых ответов:
- Клиент отправляет запрос с параметром
stream: true - Сервер постепенно возвращает фрагменты сгенерированного контента (chunk)
- Каждый chunk отправляется через SSE с префиксом
data: - По завершении генерации отправляется
data: [DONE]
Потоковая передача по протоколу OpenAI
cURL
Terminal
curl https://api.ofox.ai/v1/chat/completions \
-H "Authorization: Bearer $OFOX_API_KEY" \
-H "Content-Type: application/json" \
-N \
-d '{
"model": "openai/gpt-4o",
"messages": [{"role": "user", "content": "Напишите стихотворение о программировании"}],
"stream": true
}'Потоковая передача по протоколу Anthropic
Python
stream_anthropic.py
import anthropic
client = anthropic.Anthropic(
base_url="https://api.ofox.ai/anthropic",
api_key="<Ваш OFOXAI_API_KEY>"
)
with client.messages.stream(
model="anthropic/claude-sonnet-4.5",
max_tokens=1024,
messages=[{"role": "user", "content": "Напишите стихотворение о программировании"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)Потоковая передача + Function Calling
Потоковые ответы также поддерживают вызов функций. Модель сначала передает запрос на вызов инструмента в потоковом режиме, а после обработки вы продолжаете диалог:
stream_with_tools.py
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Какая сегодня погода в Москве?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "Получить погоду в указанном городе",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "Название города"}
},
"required": ["city"]
}
}
}],
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
# Обработка вызова инструмента
print(f"Вызов инструмента: {delta.tool_calls[0].function}")
elif delta.content:
print(delta.content, end="", flush=True)Обработка ошибок и переподключение
Потоковое соединение может прерваться из-за сетевых проблем. Рекомендуется реализовать логику переподключения.
stream_retry.py
import time
def stream_with_retry(client, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(stream=True, **kwargs)
for chunk in stream:
yield chunk
return # Успешно завершено
except Exception as e:
if attempt < max_retries - 1:
wait = 2 ** attempt # Экспоненциальный откат
print(f"\nСоединение прервано, повтор через {wait}s...")
time.sleep(wait)
else:
raise eЛучшие практики
- Всегда устанавливайте таймаут — избегайте бесконечного ожидания
- Обрабатывайте неполные chunk — некоторые chunk могут не содержать content
- Реализуйте переподключение — используйте стратегию экспоненциального отката
- Используйте
flushна фронтенде — обеспечьте мгновенное отображение контента
Last updated on