← Claudeを本番運用するためのインフラ・モニタリング・ガバナンス
LESSON 02 / 06

レイテンシ削減:ストリーミング・並列・モデル選定

所要時間 13分 上級レベル

AI機能で 「遅い」 はそのまま離脱率に直結します。本レッスンでは、レイテンシを削減する実戦テクニックを学びます。

レイテンシの構成要素

要素 典型値 削減手段
ネットワーク往復 50〜200ms リージョン最適化
プロンプト処理 100〜500ms プロンプトキャッシュ
生成(モデル推論 1〜10秒 ストリーミング・モデル選定
後処理(パース等) 10〜100ms 軽量化

戦略1: ストリーミングの活用

応答全体を待つより、生成中から少しずつ表示 するほうが体感速度が大きく改善します。

# Python SDK での streaming
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-7",
    max_tokens=1024,
    messages=[{"role": "user", "content": user_query}],
) as stream:
    for text in stream.text_stream:
        # 1チャンクずつクライアントに送信
        yield_to_client(text)

# 全文取得
full_response = stream.get_final_message()

WebSocket / SSE 経由でブラウザに転送

# FastAPI で SSE エンドポイント
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/chat/stream")
async def chat_stream(query: str):
    async def generate():
        with client.messages.stream(
            model="claude-sonnet-4-7",
            messages=[{"role": "user", "content": query}],
        ) as stream:
            for text in stream.text_stream:
                yield f"data: {json.dumps({'text': text})}nn"
            yield "data: [DONE]nn"

    return StreamingResponse(generate(), media_type="text/event-stream")

戦略2: 並列ツール実行

独立したツール呼び出しは 並列実行 することでレイテンシを大きく短縮できます。

# Anthropic API は parallel tool use をサポート
response = client.messages.create(
    model="claude-sonnet-4-7",
    tools=tools,
    messages=[{"role": "user", "content": "商品Aと商品Bの詳細を比較して"}],
)

# Claude が複数のツール呼び出しを同時に出す
# tool_use ブロックが複数出てきたら、並列実行
import asyncio

async def execute_tools_parallel(tool_uses):
    tasks = [execute_tool(tu) for tu in tool_uses]
    return await asyncio.gather(*tasks)

results = await execute_tools_parallel(tool_uses_in_response)

戦略3: モデル選定によるレイテンシ削減

モデル 典型レイテンシ(500トークン生成)
Opus 3〜8秒
Sonnet 1〜3秒
Haiku 0.5〜1.5秒
# 用途別の使い分け
# - リアルタイムチャット: Haiku を主軸、必要な時だけ Sonnet
# - バックグラウンド処理: Sonnet
# - 重要な意思決定: Opus(多少遅くてもOK)

戦略4: プリフェッチ・予測実行

ユーザーの次の行動を予測して 事前に処理を開始 しておく。

# ユーザーが入力中に、次のステップを予測してバックグラウンド実行

async def smart_prefetch(typing_query: str):
    # ある程度入力された段階でリクエストを投げる
    if len(typing_query) > 10 and is_complete_thought(typing_query):
        # バックグラウンドで実行
        future = asyncio.create_task(
            client.messages.create(
                model="claude-sonnet-4-7",
                messages=[{"role": "user", "content": typing_query}],
            )
        )
        return future
    return None


# 確定送信時に future を確認
async def on_send(query, prefetch_future):
    if prefetch_future and prefetch_future.done() and prefetch_future.input == query:
        # キャッシュヒット
        return prefetch_future.result()
    # 通常実行
    return await client.messages.create(...)

戦略5: max_tokens の最適化

# 生成時間は max_tokens に比例しがち
# 必要最小限に絞る

# 悪い例
response = client.messages.create(
    model="claude-sonnet-4-7",
    max_tokens=4096,  # 必要以上に大きい
    messages=...,
)

# 良い例:用途別に絞る
response = client.messages.create(
    model="claude-sonnet-4-7",
    max_tokens=200,  # 短い応答が期待されるなら
    messages=...,
)

戦略6: プロンプトキャッシュの effective use

キャッシュヒットは レイテンシも数倍速くなる

# キャッシュ済みプロンプト:通常500ms → ヒット時150ms
# 大きいシステムプロンプトなら効果絶大

# キャッシュ前提の設計:
# 1. システムプロンプトを固定化(毎回違うとキャッシュされない)
# 2. キャッシュ単位を意識(cache_control の境界)
# 3. cache_creation を初回のみに集中

レイテンシ計測の実装

import time

def call_with_latency_tracking(messages, **kwargs):
    start = time.time()

    # ストリーミング: TTFT(最初のトークンまでの時間)も計測
    ttft = None
    response_text = ""

    with client.messages.stream(messages=messages, **kwargs) as stream:
        for text in stream.text_stream:
            if ttft is None:
                ttft = time.time() - start
            response_text += text

        total_time = time.time() - start
        final = stream.get_final_message()

    metrics = {
        "ttft_ms": ttft * 1000,
        "total_time_ms": total_time * 1000,
        "tokens_per_sec": final.usage.output_tokens / total_time,
        "input_tokens": final.usage.input_tokens,
        "output_tokens": final.usage.output_tokens,
    }
    metrics_db.insert(metrics)

    return response_text, metrics

パフォーマンス予算の設定

# SLO(Service Level Objective)を明確に

LATENCY_BUDGET = {
    "ttft_p95_ms": 1000,        # 最初のトークンまで1秒
    "total_p95_ms": 5000,        # 全体5秒
    "tool_call_p95_ms": 500,     # ツール呼び出し0.5秒
}

# 予算超過時のアラート
def check_latency_budget():
    p95 = get_latency_percentile(0.95)
    for metric, budget in LATENCY_BUDGET.items():
        if p95[metric] > budget:
            send_alert(f"⚠️ {metric}: {p95[metric]}ms > {budget}ms")

このレッスンのまとめ

「ストリーミング → 並列ツール → モデル選定 → プリフェッチ → max_tokens最適化 → キャッシュ」の組み合わせでレイテンシを 30〜70%削減。次のレッスンでは、監視・ロギングを学びます。

よくある質問

この記事に関連する質問と答えをまとめました。

Q.ストリーミング実装の効果は?
A.
体感速度が劇的に改善します。応答全体を待つより、生成中から少しずつ表示するほうが、ユーザーにとっての「待ち時間」が大幅に短く感じられます。離脱率も下がります。
Q.並列ツール実行の使いどころは?
A.
独立した複数ツール呼び出しがある時です。「商品Aと商品Bの比較」「複数DBへの問合せ」など、依存関係がない処理は並列化することで2〜5倍速くなります。