Claudeを本番運用するためのインフラ・モニタリング・ガバナンス
0 / 6 完了
(0%)
LESSON 02
/ 06
レイテンシ削減:ストリーミング・並列・モデル選定

AI機能で 「遅い」 はそのまま離脱率に直結します。本レッスンでは、レイテンシを削減する実戦テクニックを学びます。
レイテンシの構成要素
| 要素 | 典型値 | 削減手段 |
|---|---|---|
| ネットワーク往復 | 50〜200ms | リージョン最適化 |
| プロンプト処理 | 100〜500ms | プロンプトキャッシュ |
| 生成(モデル推論) | 1〜10秒 | ストリーミング・モデル選定 |
| 後処理(パース等) | 10〜100ms | 軽量化 |
戦略1: ストリーミングの活用
応答全体を待つより、生成中から少しずつ表示 するほうが体感速度が大きく改善します。
# Python SDK での streaming
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": user_query}],
) as stream:
for text in stream.text_stream:
# 1チャンクずつクライアントに送信
yield_to_client(text)
# 全文取得
full_response = stream.get_final_message()
WebSocket / SSE 経由でブラウザに転送
# FastAPI で SSE エンドポイント
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/chat/stream")
async def chat_stream(query: str):
async def generate():
with client.messages.stream(
model="claude-sonnet-4-7",
messages=[{"role": "user", "content": query}],
) as stream:
for text in stream.text_stream:
yield f"data: {json.dumps({'text': text})}nn"
yield "data: [DONE]nn"
return StreamingResponse(generate(), media_type="text/event-stream")
戦略2: 並列ツール実行
独立したツール呼び出しは 並列実行 することでレイテンシを大きく短縮できます。
# Anthropic API は parallel tool use をサポート
response = client.messages.create(
model="claude-sonnet-4-7",
tools=tools,
messages=[{"role": "user", "content": "商品Aと商品Bの詳細を比較して"}],
)
# Claude が複数のツール呼び出しを同時に出す
# tool_use ブロックが複数出てきたら、並列実行
import asyncio
async def execute_tools_parallel(tool_uses):
tasks = [execute_tool(tu) for tu in tool_uses]
return await asyncio.gather(*tasks)
results = await execute_tools_parallel(tool_uses_in_response)
戦略3: モデル選定によるレイテンシ削減
| モデル | 典型レイテンシ(500トークン生成) |
|---|---|
| Opus | 3〜8秒 |
| Sonnet | 1〜3秒 |
| Haiku | 0.5〜1.5秒 |
# 用途別の使い分け
# - リアルタイムチャット: Haiku を主軸、必要な時だけ Sonnet
# - バックグラウンド処理: Sonnet
# - 重要な意思決定: Opus(多少遅くてもOK)
戦略4: プリフェッチ・予測実行
ユーザーの次の行動を予測して 事前に処理を開始 しておく。
# ユーザーが入力中に、次のステップを予測してバックグラウンド実行
async def smart_prefetch(typing_query: str):
# ある程度入力された段階でリクエストを投げる
if len(typing_query) > 10 and is_complete_thought(typing_query):
# バックグラウンドで実行
future = asyncio.create_task(
client.messages.create(
model="claude-sonnet-4-7",
messages=[{"role": "user", "content": typing_query}],
)
)
return future
return None
# 確定送信時に future を確認
async def on_send(query, prefetch_future):
if prefetch_future and prefetch_future.done() and prefetch_future.input == query:
# キャッシュヒット
return prefetch_future.result()
# 通常実行
return await client.messages.create(...)
戦略5: max_tokens の最適化
# 生成時間は max_tokens に比例しがち
# 必要最小限に絞る
# 悪い例
response = client.messages.create(
model="claude-sonnet-4-7",
max_tokens=4096, # 必要以上に大きい
messages=...,
)
# 良い例:用途別に絞る
response = client.messages.create(
model="claude-sonnet-4-7",
max_tokens=200, # 短い応答が期待されるなら
messages=...,
)
戦略6: プロンプトキャッシュの effective use
キャッシュヒットは レイテンシも数倍速くなる。
# キャッシュ済みプロンプト:通常500ms → ヒット時150ms
# 大きいシステムプロンプトなら効果絶大
# キャッシュ前提の設計:
# 1. システムプロンプトを固定化(毎回違うとキャッシュされない)
# 2. キャッシュ単位を意識(cache_control の境界)
# 3. cache_creation を初回のみに集中
レイテンシ計測の実装
import time
def call_with_latency_tracking(messages, **kwargs):
start = time.time()
# ストリーミング: TTFT(最初のトークンまでの時間)も計測
ttft = None
response_text = ""
with client.messages.stream(messages=messages, **kwargs) as stream:
for text in stream.text_stream:
if ttft is None:
ttft = time.time() - start
response_text += text
total_time = time.time() - start
final = stream.get_final_message()
metrics = {
"ttft_ms": ttft * 1000,
"total_time_ms": total_time * 1000,
"tokens_per_sec": final.usage.output_tokens / total_time,
"input_tokens": final.usage.input_tokens,
"output_tokens": final.usage.output_tokens,
}
metrics_db.insert(metrics)
return response_text, metrics
パフォーマンス予算の設定
# SLO(Service Level Objective)を明確に
LATENCY_BUDGET = {
"ttft_p95_ms": 1000, # 最初のトークンまで1秒
"total_p95_ms": 5000, # 全体5秒
"tool_call_p95_ms": 500, # ツール呼び出し0.5秒
}
# 予算超過時のアラート
def check_latency_budget():
p95 = get_latency_percentile(0.95)
for metric, budget in LATENCY_BUDGET.items():
if p95[metric] > budget:
send_alert(f"⚠️ {metric}: {p95[metric]}ms > {budget}ms")
このレッスンのまとめ
「ストリーミング → 並列ツール → モデル選定 → プリフェッチ → max_tokens最適化 → キャッシュ」の組み合わせでレイテンシを 30〜70%削減。次のレッスンでは、監視・ロギングを学びます。
よくある質問
この記事に関連する質問と答えをまとめました。
Q.ストリーミング実装の効果は?
A.
体感速度が劇的に改善します。応答全体を待つより、生成中から少しずつ表示するほうが、ユーザーにとっての「待ち時間」が大幅に短く感じられます。離脱率も下がります。
Q.並列ツール実行の使いどころは?
A.
独立した複数ツール呼び出しがある時です。「商品Aと商品Bの比較」「複数DBへの問合せ」など、依存関係がない処理は並列化することで2〜5倍速くなります。