ストリーミングレスポンス
ストリーミング(Streaming)を使用すると、モデルの生成中にリアルタイムで出力を受信でき、ユーザー体験と体感速度が向上します。
仕組み
OfoxAI は Server-Sent Events (SSE) プロトコルを使用してストリーミングレスポンスを実装しています:
- クライアントがリクエスト送信時に
stream: trueを設定します - サーバーが生成されたコンテンツの断片(chunk)を順次返します
- 各 chunk は
data:プレフィックス付きで SSE 経由で送信されます - 生成完了時に
data: [DONE]が送信されます
OpenAI プロトコルストリーミング
cURL
Terminal
curl https://api.ofox.ai/v1/chat/completions \
-H "Authorization: Bearer $OFOX_API_KEY" \
-H "Content-Type: application/json" \
-N \
-d '{
"model": "openai/gpt-4o",
"messages": [{"role": "user", "content": "プログラミングについての詩を書いてください"}],
"stream": true
}'Anthropic プロトコルストリーミング
Python
stream_anthropic.py
import anthropic
client = anthropic.Anthropic(
base_url="https://api.ofox.ai/anthropic",
api_key="<あなたの OFOXAI_API_KEY>"
)
with client.messages.stream(
model="anthropic/claude-sonnet-4.5",
max_tokens=1024,
messages=[{"role": "user", "content": "プログラミングについての詩を書いてください"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)ストリーミング + Function Calling
ストリーミングレスポンスは関数呼び出しのシナリオにも対応しています。モデルはまずツール呼び出しリクエストをストリーミングで出力し、処理完了後に会話を継続します:
stream_with_tools.py
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "今日の東京の天気はどうですか?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "指定した都市の天気を取得します",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "都市名"}
},
"required": ["city"]
}
}
}],
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
# ツール呼び出しを処理
print(f"ツール呼び出し: {delta.tool_calls[0].function}")
elif delta.content:
print(delta.content, end="", flush=True)エラーハンドリングと再接続
ストリーミング接続はネットワークの問題により中断される可能性があります。再接続ロジックの実装を推奨します。
stream_retry.py
import time
def stream_with_retry(client, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(stream=True, **kwargs)
for chunk in stream:
yield chunk
return # 正常完了
except Exception as e:
if attempt < max_retries - 1:
wait = 2 ** attempt # エクスポネンシャルバックオフ
print(f"\n接続が中断されました。{wait}秒後にリトライします...")
time.sleep(wait)
else:
raise eベストプラクティス
- 常にタイムアウトを設定する — 無限に待機することを防止します
- 不完全な chunk を処理する — 一部の chunk には content が含まれない場合があります
- 再接続メカニズムを実装する — エクスポネンシャルバックオフ戦略を使用します
- フロントエンドで
flushを使用する — コンテンツが即座に表示されるようにします
Last updated on