Skip to Content
開発ドキュメントガイドストリーミング

ストリーミングレスポンス

ストリーミング(Streaming)を使用すると、モデルの生成中にリアルタイムで出力を受信でき、ユーザー体験と体感速度が向上します。

仕組み

OfoxAI は Server-Sent Events (SSE) プロトコルを使用してストリーミングレスポンスを実装しています:

  1. クライアントがリクエスト送信時に stream: true を設定します
  2. サーバーが生成されたコンテンツの断片(chunk)を順次返します
  3. 各 chunk は data: プレフィックス付きで SSE 経由で送信されます
  4. 生成完了時に data: [DONE] が送信されます

OpenAI プロトコルストリーミング

Terminal
curl https://api.ofox.ai/v1/chat/completions \ -H "Authorization: Bearer $OFOX_API_KEY" \ -H "Content-Type: application/json" \ -N \ -d '{ "model": "openai/gpt-4o", "messages": [{"role": "user", "content": "プログラミングについての詩を書いてください"}], "stream": true }'

Anthropic プロトコルストリーミング

stream_anthropic.py
import anthropic client = anthropic.Anthropic( base_url="https://api.ofox.ai/anthropic", api_key="<あなたの OFOXAI_API_KEY>" ) with client.messages.stream( model="anthropic/claude-sonnet-4.5", max_tokens=1024, messages=[{"role": "user", "content": "プログラミングについての詩を書いてください"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

ストリーミング + Function Calling

ストリーミングレスポンスは関数呼び出しのシナリオにも対応しています。モデルはまずツール呼び出しリクエストをストリーミングで出力し、処理完了後に会話を継続します:

stream_with_tools.py
stream = client.chat.completions.create( model="openai/gpt-4o", messages=[{"role": "user", "content": "今日の東京の天気はどうですか?"}], tools=[{ "type": "function", "function": { "name": "get_weather", "description": "指定した都市の天気を取得します", "parameters": { "type": "object", "properties": { "city": {"type": "string", "description": "都市名"} }, "required": ["city"] } } }], stream=True ) for chunk in stream: delta = chunk.choices[0].delta if delta.tool_calls: # ツール呼び出しを処理 print(f"ツール呼び出し: {delta.tool_calls[0].function}") elif delta.content: print(delta.content, end="", flush=True)

エラーハンドリングと再接続

ストリーミング接続はネットワークの問題により中断される可能性があります。再接続ロジックの実装を推奨します。

stream_retry.py
import time def stream_with_retry(client, max_retries=3, **kwargs): for attempt in range(max_retries): try: stream = client.chat.completions.create(stream=True, **kwargs) for chunk in stream: yield chunk return # 正常完了 except Exception as e: if attempt < max_retries - 1: wait = 2 ** attempt # エクスポネンシャルバックオフ print(f"\n接続が中断されました。{wait}秒後にリトライします...") time.sleep(wait) else: raise e

ベストプラクティス

  1. 常にタイムアウトを設定する — 無限に待機することを防止します
  2. 不完全な chunk を処理する — 一部の chunk には content が含まれない場合があります
  3. 再接続メカニズムを実装する — エクスポネンシャルバックオフ戦略を使用します
  4. フロントエンドで flush を使用する — コンテンツが即座に表示されるようにします
Last updated on