← Claudeを本番運用するためのインフラ・モニタリング・ガバナンス
LESSON 05 / 06

SLA・障害対応:多層フォールバック設計

所要時間 13分 上級レベル

Anthropic API は高い可用性を持ちますが、「絶対落ちない」 サービスはありません。本レッスンでは、外部AI障害時の対応設計を学びます。

典型的な障害シナリオ

シナリオ 頻度 影響
API レート制限 一部リクエスト失敗
API一時障害 数分〜数十分の停止
API 大規模障害 1時間以上停止
特定モデルのみ障害 低〜中 該当モデルだけ停止
レイテンシ大幅増 タイムアウト多発

多層フォールバック設計

# 階層的フォールバック

async def call_with_fallback(messages, **kwargs):
    # Layer 1: 第1選択モデル
    try:
        return await call_claude(model="claude-sonnet-4-7", messages=messages, **kwargs)
    except (RateLimitError, APIError) as e:
        logger.warning(f"Sonnet failed: {e}")

    # Layer 2: 別モデル(同じ Anthropic)
    try:
        return await call_claude(model="claude-haiku-4", messages=messages, **kwargs)
    except (RateLimitError, APIError) as e:
        logger.warning(f"Haiku failed: {e}")

    # Layer 3: 別ベンダー(OpenAI / Google など)
    try:
        return await call_openai(messages, **kwargs)
    except Exception as e:
        logger.error(f"OpenAI fallback failed: {e}")

    # Layer 4: 静的応答(最低限のサービス維持)
    return get_static_fallback_response(messages)


def get_static_fallback_response(messages):
    return {
        "content": [
            {
                "type": "text",
                "text": "申し訳ございません、現在AIサービスが混雑しています。数分後に再度お試しください。"
            }
        ],
        "stop_reason": "fallback",
    }

サーキットブレーカパターン

連続失敗が続いたら 一定時間呼び出しを止める。「死んだサーバー」に殴り続けない。

from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"   # 正常動作
    OPEN = "open"        # 遮断中
    HALF_OPEN = "half_open"  # 復旧テスト中


class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout_seconds=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timedelta(seconds=timeout_seconds)
        self.state = CircuitState.CLOSED
        self.last_failure_time = None

    def call(self, func, *args, **kwargs):
        # OPEN 状態:遮断
        if self.state == CircuitState.OPEN:
            if datetime.now() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpen("Circuit is open")

        # 実行
        try:
            result = func(*args, **kwargs)
            # 成功 → CLOSED に戻す
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
                logger.warning("Circuit opened due to repeated failures")
            raise


# 使用
claude_breaker = CircuitBreaker(failure_threshold=10, timeout_seconds=30)

async def call_claude_with_breaker(messages):
    return await claude_breaker.call(client.messages.create, messages=messages)

リトライ戦略

import asyncio
from anthropic import RateLimitError, APIError

async def call_with_retry(func, *args, max_retries=3, **kwargs):
    for attempt in range(max_retries):
        try:
            return await func(*args, **kwargs)
        except RateLimitError as e:
            # レート制限:指数バックオフ + ジッタ
            wait = (2 ** attempt) + random.uniform(0, 1)
            logger.info(f"Rate limited, retrying in {wait}s (attempt {attempt+1})")
            await asyncio.sleep(wait)
        except APIError as e:
            if e.status_code in (500, 502, 503, 504):
                # サーバー側エラー:リトライ
                wait = 1 + (attempt * 2)
                await asyncio.sleep(wait)
            else:
                # 4xx 系はリトライしない
                raise

    # 全リトライ失敗
    raise MaxRetriesExceeded()

キューイングによる負荷平準化

# 急増したリクエストをキューイング

import asyncio
from asyncio import Queue, Semaphore

class RateLimitedQueue:
    def __init__(self, max_concurrent=10):
        self.queue = Queue()
        self.semaphore = Semaphore(max_concurrent)

    async def submit(self, request):
        await self.queue.put(request)

    async def worker(self):
        while True:
            request = await self.queue.get()
            async with self.semaphore:
                try:
                    await process(request)
                finally:
                    self.queue.task_done()

queue = RateLimitedQueue(max_concurrent=10)

# 起動時に worker タスクを生成
for _ in range(10):
    asyncio.create_task(queue.worker())

非同期+キャッシュによるレジリエンス

# よくある質問は事前に答えをキャッシュ
# AI 障害時もキャッシュから応答可能

async def smart_query_handler(user_query):
    # 1. 完全一致キャッシュ
    cache_key = hash_query(user_query)
    if cached := await cache.get(cache_key):
        return cached

    # 2. 類似クエリのキャッシュ(埋め込み検索)
    similar = await find_similar_cached(user_query, threshold=0.95)
    if similar:
        return similar.response

    # 3. AI 呼び出し
    try:
        response = await call_claude(user_query)
        await cache.set(cache_key, response, ttl=86400)
        return response
    except APIError:
        # 4. AI 失敗時のフォールバック
        return await fallback_response(user_query)

SLA の設計

# 自社サービスのSLA を定義

SLA = {
    "availability": "99.5%",  # 月7時間ダウンタイム許容
    "latency_p95_ms": 5000,    # P95 で5秒以内
    "error_rate_max": "0.5%",  # 0.5%以下
}

# Anthropic API の SLA を考慮
# - Anthropic 公式: 99.5% SLA(プラン依存)
# - 自社サービスのSLA はこれ以下にせざるを得ない(または冗長化必須)

# 月次レポートでの SLA 達成率測定
def calculate_sla_compliance():
    return {
        "availability": (success_count / total_count) * 100,
        "latency_p95": np.percentile(latencies, 95),
        "error_rate": (error_count / total_count) * 100,
    }

インシデント対応プロセス

レベル 定義 対応時間 通知範囲
P0 サービス全停止 15分以内 全員 + 経営層
P1 主要機能停止 30分以内 関係チーム
P2 一部機能影響 2時間以内 担当者
P3 軽微な問題 当日中 担当者

ポストモーテム(事後分析)

# 障害後の振り返りテンプレ

# インシデントレポート
- 発生日時:
- 検知日時:
- 復旧日時:
- 影響範囲: ユーザー数・機能・ビジネスインパクト

# タイムライン
- 09:00 障害発生
- 09:05 アラート発火
- 09:10 オンコール対応開始
- 09:30 原因特定
- 09:45 復旧

# 根本原因
- 直接原因:
- 真因:
- 連鎖した要因:

# 改善アクション(必ずアクションを定義)
1. (短期)アラート閾値の調整
2. (中期)フォールバックの追加実装
3. (長期)アーキテクチャ見直し

このレッスンのまとめ

「フォールバック → サーキットブレーカ → リトライ → キャッシュ → SLA定義 → インシデント対応」の組み合わせで、AI機能を レジリエントに運用 できます。次のレッスン(最終回)では、AIガバナンスを学びます。

よくある質問

この記事に関連する質問と答えをまとめました。

Q.多層フォールバックの設計は?
A.
同一ベンダーの別モデル → 他ベンダー(OpenAI/Google)→ 静的応答 の階層構造が定石です。サーキットブレーカと組み合わせて「壊れたサービスに殴り続けない」のが重要です。
Q.インシデント対応の優先順位は?
A.
P0(全停止:15分以内対応)、P1(主要機能停止:30分)、P2(一部機能:2時間)、P3(軽微:当日中)の4段階が一般的。ポストモーテムで再発防止策を必ず文書化します。