MCPで Claude を業務システムと統合する
0 / 6 完了
(0%)
LESSON 03
/ 06
データベース・SaaSとの接続パターン

MCPサーバーの真価は 業務システムとの統合。本レッスンではDB・SaaS への接続実装パターンを学びます。
Postgres 接続の最小実装
# postgres_mcp.py
from mcp.server.fastmcp import FastMCP
import psycopg2
import os
from psycopg2.extras import RealDictCursor
mcp = FastMCP("postgres-mcp")
DB_URL = os.environ["DATABASE_URL"]
def get_connection():
return psycopg2.connect(DB_URL, cursor_factory=RealDictCursor)
@mcp.tool()
def execute_query(query: str, params: list = None) -> str:
"""SELECT クエリを実行する(読み取り専用)。
Args:
query: SELECT で始まるSQLクエリ
params: クエリパラメータ
"""
# 安全性チェック:SELECT のみ許可
q_normalized = query.strip().upper()
if not q_normalized.startswith("SELECT"):
return "エラー: SELECT クエリのみ実行可能です"
# 危険ワード検知
dangerous = ["DROP", "DELETE", "UPDATE", "INSERT", "TRUNCATE", "ALTER"]
if any(d in q_normalized for d in dangerous):
return f"エラー: 禁止されたキーワードが含まれます"
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params or [])
rows = cur.fetchmany(100) # 最大100件
return json.dumps(rows, default=str, ensure_ascii=False)
except Exception as e:
return f"エラー: {str(e)}"
@mcp.tool()
def list_tables() -> str:
"""利用可能なテーブル一覧を返す。"""
query = """
SELECT table_name, table_type
FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name
"""
return execute_query(query)
@mcp.tool()
def describe_table(table_name: str) -> str:
"""テーブルのスキーマを返す。"""
query = """
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = %s
ORDER BY ordinal_position
"""
return execute_query(query, [table_name])
if __name__ == "__main__":
mcp.run()
SQL インジェクション対策
| 対策 | 実装 |
|---|---|
| パラメータ化クエリ | %s + params で値を渡す |
| キーワードホワイトリスト | SELECT のみ許可、DROP等を拒否 |
| 結果件数制限 | fetchmany(100) で最大件数制限 |
| タイムアウト | statement_timeout = ‘5s’ |
| 読み取り専用ロール | DBユーザーをSELECT権限のみに |
Slack 接続のMCPサーバー
# slack_mcp.py
from mcp.server.fastmcp import FastMCP
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import os
mcp = FastMCP("slack-mcp")
slack = WebClient(token=os.environ["SLACK_BOT_TOKEN"])
@mcp.tool()
def post_message(channel: str, text: str) -> str:
"""Slackチャネルにメッセージを投稿する。
Args:
channel: チャネルID(例: 'C0123456789')またはチャネル名('#general')
text: 投稿メッセージ(マークダウン可)
"""
try:
response = slack.chat_postMessage(channel=channel, text=text)
return f"投稿完了: ts={response['ts']}, channel={response['channel']}"
except SlackApiError as e:
return f"Slack API エラー: {e.response['error']}"
@mcp.tool()
def search_messages(query: str, count: int = 10) -> str:
"""メッセージ検索。
Args:
query: 検索クエリ
count: 最大件数
"""
try:
response = slack.search_messages(query=query, count=count)
results = [
{
"channel": m["channel"]["name"],
"user": m.get("username", m.get("user")),
"text": m["text"][:200],
"ts": m["ts"],
}
for m in response["messages"]["matches"]
]
return json.dumps(results, ensure_ascii=False)
except SlackApiError as e:
return f"検索エラー: {e.response['error']}"
@mcp.tool()
def list_channels(limit: int = 50) -> str:
"""参加中のチャネル一覧。"""
try:
response = slack.conversations_list(limit=limit)
channels = [
{"id": c["id"], "name": c["name"], "is_private": c["is_private"]}
for c in response["channels"]
]
return json.dumps(channels, ensure_ascii=False)
except SlackApiError as e:
return f"エラー: {e.response['error']}"
if __name__ == "__main__":
mcp.run()
Notion 接続のMCPサーバー
# notion_mcp.py
from mcp.server.fastmcp import FastMCP
from notion_client import Client
import os
mcp = FastMCP("notion-mcp")
notion = Client(auth=os.environ["NOTION_API_KEY"])
@mcp.tool()
def search_pages(query: str) -> str:
"""Notion ページを検索。"""
response = notion.search(query=query, filter={"property": "object", "value": "page"})
pages = [
{
"id": p["id"],
"title": _extract_title(p),
"url": p["url"],
"last_edited": p["last_edited_time"],
}
for p in response["results"]
]
return json.dumps(pages, ensure_ascii=False)
@mcp.tool()
def get_page_content(page_id: str) -> str:
"""ページ本文を取得。"""
blocks = notion.blocks.children.list(block_id=page_id)
content = _blocks_to_markdown(blocks["results"])
return content
@mcp.tool()
def create_page(parent_id: str, title: str, content: str) -> str:
"""新規ページを作成。"""
response = notion.pages.create(
parent={"page_id": parent_id},
properties={"title": [{"text": {"content": title}}]},
children=_markdown_to_blocks(content),
)
return f"作成完了: {response['url']}"
def _extract_title(page):
# ページタイトルの抽出ロジック
...
def _blocks_to_markdown(blocks):
# ブロック → Markdown 変換
...
def _markdown_to_blocks(md):
# Markdown → ブロック変換
...
レート制限への対処
from functools import wraps
import time
from collections import defaultdict
class RateLimiter:
def __init__(self, max_calls: int, period_sec: int):
self.max_calls = max_calls
self.period = period_sec
self.calls = defaultdict(list)
def check(self, key: str):
now = time.time()
# 古い呼び出しを削除
self.calls[key] = [t for t in self.calls[key] if now - t < self.period]
if len(self.calls[key]) >= self.max_calls:
wait = self.period - (now - self.calls[key][0])
return False, wait
self.calls[key].append(now)
return True, 0
limiter = RateLimiter(max_calls=10, period_sec=60)
@mcp.tool()
def expensive_operation(args):
ok, wait = limiter.check("expensive_operation")
if not ok:
return f"レート制限: {wait:.1f}秒後にお試しください"
return _do_work(args)
キャッシュの活用
from functools import lru_cache
from cachetools import TTLCache
cache = TTLCache(maxsize=1000, ttl=300) # 5分有効
@mcp.tool()
def get_user_info(user_id: int) -> str:
if user_id in cache:
return cache[user_id]
user = fetch_from_api(user_id)
cache[user_id] = json.dumps(user)
return cache[user_id]
このレッスンのまとめ
DB・SaaS連携のMCPサーバーは「安全性 → 認証 → レート制限 → キャッシュ → エラー処理」を組み込むのが鉄則。次のレッスンでは、認証・権限管理を本番想定で設計します。
よくある質問
この記事に関連する質問と答えをまとめました。
Q.DB 接続 MCP で SQL インジェクションは大丈夫?
A.
パラメータ化クエリを使う、SELECT のみ許可する、危険ワードを拒否する、結果件数を制限する、専用 DB ロール(SELECT 権限のみ)を作る、の5重防御が定石です。
Q.SaaS 連携で気をつけるべき点は?
A.
OAuth トークンの保管・リフレッシュ、レート制限の対応、API キーの環境変数管理、エラーログの PII マスキングの4点です。本番運用では監査ログも必須です。