← MCPで Claude を業務システムと統合する
LESSON 03 / 06

データベース・SaaSとの接続パターン

所要時間 14分 上級レベル

MCPサーバーの真価は 業務システムとの統合。本レッスンではDB・SaaS への接続実装パターンを学びます。

Postgres 接続の最小実装

# postgres_mcp.py
from mcp.server.fastmcp import FastMCP
import psycopg2
import os
from psycopg2.extras import RealDictCursor

mcp = FastMCP("postgres-mcp")

DB_URL = os.environ["DATABASE_URL"]

def get_connection():
    return psycopg2.connect(DB_URL, cursor_factory=RealDictCursor)


@mcp.tool()
def execute_query(query: str, params: list = None) -> str:
    """SELECT クエリを実行する(読み取り専用)。

    Args:
        query: SELECT で始まるSQLクエリ
        params: クエリパラメータ
    """
    # 安全性チェック:SELECT のみ許可
    q_normalized = query.strip().upper()
    if not q_normalized.startswith("SELECT"):
        return "エラー: SELECT クエリのみ実行可能です"

    # 危険ワード検知
    dangerous = ["DROP", "DELETE", "UPDATE", "INSERT", "TRUNCATE", "ALTER"]
    if any(d in q_normalized for d in dangerous):
        return f"エラー: 禁止されたキーワードが含まれます"

    try:
        with get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(query, params or [])
                rows = cur.fetchmany(100)  # 最大100件
                return json.dumps(rows, default=str, ensure_ascii=False)
    except Exception as e:
        return f"エラー: {str(e)}"


@mcp.tool()
def list_tables() -> str:
    """利用可能なテーブル一覧を返す。"""
    query = """
        SELECT table_name, table_type
        FROM information_schema.tables
        WHERE table_schema = 'public'
        ORDER BY table_name
    """
    return execute_query(query)


@mcp.tool()
def describe_table(table_name: str) -> str:
    """テーブルのスキーマを返す。"""
    query = """
        SELECT column_name, data_type, is_nullable
        FROM information_schema.columns
        WHERE table_name = %s
        ORDER BY ordinal_position
    """
    return execute_query(query, [table_name])


if __name__ == "__main__":
    mcp.run()

SQL インジェクション対策

対策 実装
パラメータ化クエリ %s + params で値を渡す
キーワードホワイトリスト SELECT のみ許可、DROP等を拒否
結果件数制限 fetchmany(100) で最大件数制限
タイムアウト statement_timeout = ‘5s’
読み取り専用ロール DBユーザーをSELECT権限のみに

Slack 接続のMCPサーバー

# slack_mcp.py
from mcp.server.fastmcp import FastMCP
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import os

mcp = FastMCP("slack-mcp")

slack = WebClient(token=os.environ["SLACK_BOT_TOKEN"])


@mcp.tool()
def post_message(channel: str, text: str) -> str:
    """Slackチャネルにメッセージを投稿する。

    Args:
        channel: チャネルID(例: 'C0123456789')またはチャネル名('#general')
        text: 投稿メッセージ(マークダウン可)
    """
    try:
        response = slack.chat_postMessage(channel=channel, text=text)
        return f"投稿完了: ts={response['ts']}, channel={response['channel']}"
    except SlackApiError as e:
        return f"Slack API エラー: {e.response['error']}"


@mcp.tool()
def search_messages(query: str, count: int = 10) -> str:
    """メッセージ検索。

    Args:
        query: 検索クエリ
        count: 最大件数
    """
    try:
        response = slack.search_messages(query=query, count=count)
        results = [
            {
                "channel": m["channel"]["name"],
                "user": m.get("username", m.get("user")),
                "text": m["text"][:200],
                "ts": m["ts"],
            }
            for m in response["messages"]["matches"]
        ]
        return json.dumps(results, ensure_ascii=False)
    except SlackApiError as e:
        return f"検索エラー: {e.response['error']}"


@mcp.tool()
def list_channels(limit: int = 50) -> str:
    """参加中のチャネル一覧。"""
    try:
        response = slack.conversations_list(limit=limit)
        channels = [
            {"id": c["id"], "name": c["name"], "is_private": c["is_private"]}
            for c in response["channels"]
        ]
        return json.dumps(channels, ensure_ascii=False)
    except SlackApiError as e:
        return f"エラー: {e.response['error']}"


if __name__ == "__main__":
    mcp.run()

Notion 接続のMCPサーバー

# notion_mcp.py
from mcp.server.fastmcp import FastMCP
from notion_client import Client
import os

mcp = FastMCP("notion-mcp")

notion = Client(auth=os.environ["NOTION_API_KEY"])


@mcp.tool()
def search_pages(query: str) -> str:
    """Notion ページを検索。"""
    response = notion.search(query=query, filter={"property": "object", "value": "page"})
    pages = [
        {
            "id": p["id"],
            "title": _extract_title(p),
            "url": p["url"],
            "last_edited": p["last_edited_time"],
        }
        for p in response["results"]
    ]
    return json.dumps(pages, ensure_ascii=False)


@mcp.tool()
def get_page_content(page_id: str) -> str:
    """ページ本文を取得。"""
    blocks = notion.blocks.children.list(block_id=page_id)
    content = _blocks_to_markdown(blocks["results"])
    return content


@mcp.tool()
def create_page(parent_id: str, title: str, content: str) -> str:
    """新規ページを作成。"""
    response = notion.pages.create(
        parent={"page_id": parent_id},
        properties={"title": [{"text": {"content": title}}]},
        children=_markdown_to_blocks(content),
    )
    return f"作成完了: {response['url']}"


def _extract_title(page):
    # ページタイトルの抽出ロジック
    ...

def _blocks_to_markdown(blocks):
    # ブロック → Markdown 変換
    ...

def _markdown_to_blocks(md):
    # Markdown → ブロック変換
    ...

レート制限への対処

from functools import wraps
import time
from collections import defaultdict

class RateLimiter:
    def __init__(self, max_calls: int, period_sec: int):
        self.max_calls = max_calls
        self.period = period_sec
        self.calls = defaultdict(list)

    def check(self, key: str):
        now = time.time()
        # 古い呼び出しを削除
        self.calls[key] = [t for t in self.calls[key] if now - t < self.period]

        if len(self.calls[key]) >= self.max_calls:
            wait = self.period - (now - self.calls[key][0])
            return False, wait

        self.calls[key].append(now)
        return True, 0


limiter = RateLimiter(max_calls=10, period_sec=60)


@mcp.tool()
def expensive_operation(args):
    ok, wait = limiter.check("expensive_operation")
    if not ok:
        return f"レート制限: {wait:.1f}秒後にお試しください"
    return _do_work(args)

キャッシュの活用

from functools import lru_cache
from cachetools import TTLCache

cache = TTLCache(maxsize=1000, ttl=300)  # 5分有効


@mcp.tool()
def get_user_info(user_id: int) -> str:
    if user_id in cache:
        return cache[user_id]

    user = fetch_from_api(user_id)
    cache[user_id] = json.dumps(user)
    return cache[user_id]

このレッスンのまとめ

DB・SaaS連携のMCPサーバーは「安全性 → 認証 → レート制限 → キャッシュ → エラー処理」を組み込むのが鉄則。次のレッスンでは、認証・権限管理を本番想定で設計します。

よくある質問

この記事に関連する質問と答えをまとめました。

Q.DB 接続 MCP で SQL インジェクションは大丈夫?
A.
パラメータ化クエリを使う、SELECT のみ許可する、危険ワードを拒否する、結果件数を制限する、専用 DB ロール(SELECT 権限のみ)を作る、の5重防御が定石です。
Q.SaaS 連携で気をつけるべき点は?
A.
OAuth トークンの保管・リフレッシュ、レート制限の対応、API キーの環境変数管理、エラーログの PII マスキングの4点です。本番運用では監査ログも必須です。