Files
leo-claude-mktplace/.claude/skills/claude-plugin-developer/references/mcp-integration.md

16 KiB

MCP Server Integration

Comprehensive guide for integrating Model Context Protocol (MCP) servers with Claude plugins.

MCP Overview

MCP servers provide structured interfaces to external tools and services, enabling Claude to interact with databases, APIs, and other systems through a standardized protocol.

Basic Configuration

.mcp.json Structure

{
  "name": "restaurant-data-server",
  "version": "1.0.0",
  "description": "MCP server for restaurant database access",
  "command": "python",
  "args": ["servers/restaurant_mcp.py"],
  "env": {
    "DATABASE_URL": "${RESTAURANT_DB_URL}",
    "API_KEY": "${RESTAURANT_API_KEY}"
  },
  "capabilities": {
    "resources": true,
    "tools": true,
    "subscriptions": true
  }
}

Server Implementation

Python MCP Server

#!/usr/bin/env python3
# servers/restaurant_mcp.py

import asyncio
import json
from typing import Any, Dict, List
from mcp import MCPServer, Resource, Tool

class RestaurantMCPServer(MCPServer):
    def __init__(self):
        super().__init__("restaurant-data-server")
        self.setup_tools()
        self.setup_resources()
    
    def setup_tools(self):
        @self.tool("get_sales_data")
        async def get_sales_data(date: str, location: str = None) -> Dict:
            """Retrieve sales data for specified date and location"""
            # Implementation
            return {
                "date": date,
                "location": location,
                "total_sales": 15420.50,
                "transactions": 342
            }
        
        @self.tool("update_inventory")
        async def update_inventory(item_id: str, quantity: int) -> Dict:
            """Update inventory levels for an item"""
            # Implementation
            return {
                "item_id": item_id,
                "new_quantity": quantity,
                "status": "updated"
            }
    
    def setup_resources(self):
        @self.resource("menu_items")
        async def get_menu_items() -> List[Resource]:
            """List all menu items"""
            items = await fetch_menu_from_db()
            return [
                Resource(
                    id=f"menu_item_{item['id']}",
                    name=item['name'],
                    description=f"Menu item: {item['name']}",
                    metadata={"price": item['price'], "category": item['category']}
                )
                for item in items
            ]

if __name__ == "__main__":
    server = RestaurantMCPServer()
    asyncio.run(server.run())

Node.js MCP Server

#!/usr/bin/env node
// servers/restaurant_mcp.js

const { MCPServer, Tool, Resource } = require('@modelcontextprotocol/server');

class RestaurantMCPServer extends MCPServer {
  constructor() {
    super('restaurant-data-server');
    this.setupTools();
    this.setupResources();
  }
  
  setupTools() {
    this.registerTool(new Tool({
      name: 'get_sales_data',
      description: 'Retrieve sales data',
      parameters: {
        type: 'object',
        properties: {
          date: { type: 'string', format: 'date' },
          location: { type: 'string' }
        },
        required: ['date']
      },
      handler: async ({ date, location }) => {
        // Implementation
        return {
          date,
          location,
          total_sales: 15420.50,
          transactions: 342
        };
      }
    }));
  }
  
  setupResources() {
    this.registerResourceProvider({
      pattern: /^menu_items$/,
      handler: async () => {
        const items = await this.fetchMenuFromDB();
        return items.map(item => ({
          id: `menu_item_${item.id}`,
          name: item.name,
          content: JSON.stringify(item, null, 2),
          mimeType: 'application/json'
        }));
      }
    });
  }
}

const server = new RestaurantMCPServer();
server.start();

Tool Definitions

Tool Schema

{
  "name": "analyze_customer_feedback",
  "description": "Analyze customer feedback sentiment",
  "parameters": {
    "type": "object",
    "properties": {
      "feedback_id": {
        "type": "string",
        "description": "Unique feedback identifier"
      },
      "include_suggestions": {
        "type": "boolean",
        "default": true,
        "description": "Include improvement suggestions"
      }
    },
    "required": ["feedback_id"]
  }
}

Complex Tool Example

@server.tool("generate_report")
async def generate_report(
    report_type: str,
    start_date: str,
    end_date: str,
    format: str = "pdf",
    filters: Dict[str, Any] = None
) -> Dict[str, Any]:
    """Generate comprehensive business report
    
    Args:
        report_type: Type of report (sales, inventory, customer)
        start_date: Report start date (YYYY-MM-DD)
        end_date: Report end date (YYYY-MM-DD)
        format: Output format (pdf, excel, json)
        filters: Additional filters to apply
    
    Returns:
        Report data and download URL
    """
    # Validate inputs
    if not validate_date_range(start_date, end_date):
        raise ValueError("Invalid date range")
    
    # Generate report
    report_data = await compile_report_data(
        report_type, start_date, end_date, filters
    )
    
    # Format output
    if format == "pdf":
        url = await generate_pdf_report(report_data)
    elif format == "excel":
        url = await generate_excel_report(report_data)
    else:
        url = await save_json_report(report_data)
    
    return {
        "report_type": report_type,
        "period": f"{start_date} to {end_date}",
        "download_url": url,
        "summary": report_data.get("summary", {})
    }

Resource Management

Static Resources

@server.resource("config/database_schema")
async def get_database_schema() -> Resource:
    """Provide database schema documentation"""
    schema = load_schema_file()
    return Resource(
        id="database_schema",
        name="Restaurant Database Schema",
        content=schema,
        mimeType="text/markdown"
    )

Dynamic Resources

@server.resource_pattern(r"^orders/(\d{4}-\d{2}-\d{2})$")
async def get_daily_orders(date: str) -> List[Resource]:
    """Get orders for a specific date"""
    orders = await fetch_orders_by_date(date)
    return [
        Resource(
            id=f"order_{order['id']}",
            name=f"Order #{order['number']}",
            content=json.dumps(order, indent=2),
            mimeType="application/json",
            metadata={
                "customer": order['customer_name'],
                "total": order['total_amount'],
                "status": order['status']
            }
        )
        for order in orders
    ]

Subscription Resources

@server.subscription("live_orders")
async def subscribe_to_orders(callback):
    """Subscribe to live order updates"""
    async def order_handler(order):
        await callback(Resource(
            id=f"live_order_{order['id']}",
            name=f"New Order #{order['number']}",
            content=json.dumps(order),
            mimeType="application/json"
        ))
    
    # Register handler with order system
    order_system.on_new_order(order_handler)
    
    # Return unsubscribe function
    return lambda: order_system.off_new_order(order_handler)

Security Implementation

Authentication

class SecureRestaurantServer(MCPServer):
    def __init__(self):
        super().__init__("secure-restaurant-server")
        self.auth_token = os.environ.get("MCP_AUTH_TOKEN")
    
    async def authenticate(self, request):
        """Validate authentication token"""
        token = request.headers.get("Authorization")
        if not token or token != f"Bearer {self.auth_token}":
            raise AuthenticationError("Invalid token")
    
    async def handle_request(self, request):
        await self.authenticate(request)
        return await super().handle_request(request)

Input Validation

@server.tool("update_menu_item")
async def update_menu_item(item_id: str, updates: Dict) -> Dict:
    """Securely update menu item"""
    # Validate item_id format
    if not re.match(r"^[A-Z0-9]{8}$", item_id):
        raise ValueError("Invalid item ID format")
    
    # Validate allowed fields
    allowed_fields = {"name", "price", "description", "category"}
    invalid_fields = set(updates.keys()) - allowed_fields
    if invalid_fields:
        raise ValueError(f"Invalid fields: {invalid_fields}")
    
    # Validate data types
    if "price" in updates:
        if not isinstance(updates["price"], (int, float)):
            raise TypeError("Price must be numeric")
        if updates["price"] < 0:
            raise ValueError("Price cannot be negative")
    
    # Apply updates
    result = await db.update_menu_item(item_id, updates)
    return {"status": "success", "updated": result}

Rate Limiting

from functools import wraps
import time

def rate_limit(max_calls=10, time_window=60):
    calls = {}
    
    def decorator(func):
        @wraps(func)
        async def wrapper(self, *args, **kwargs):
            client_id = kwargs.get('client_id', 'default')
            now = time.time()
            
            # Clean old calls
            calls[client_id] = [
                t for t in calls.get(client_id, [])
                if now - t < time_window
            ]
            
            # Check rate limit
            if len(calls[client_id]) >= max_calls:
                raise RateLimitError(f"Rate limit exceeded: {max_calls}/{time_window}s")
            
            # Record call
            calls[client_id].append(now)
            
            # Execute function
            return await func(self, *args, **kwargs)
        return wrapper
    return decorator

# Usage
@server.tool("expensive_operation")
@rate_limit(max_calls=5, time_window=300)
async def expensive_operation(data: str) -> Dict:
    """Rate-limited expensive operation"""
    result = await perform_expensive_calculation(data)
    return {"result": result}

Error Handling

Graceful Errors

@server.tool("process_order")
async def process_order(order_data: Dict) -> Dict:
    try:
        # Validate order
        validation_result = validate_order(order_data)
        if not validation_result.is_valid:
            return {
                "status": "error",
                "error_code": "INVALID_ORDER",
                "message": validation_result.message,
                "fields": validation_result.invalid_fields
            }
        
        # Process order
        result = await order_processor.process(order_data)
        return {
            "status": "success",
            "order_id": result.order_id,
            "estimated_time": result.estimated_time
        }
        
    except InventoryError as e:
        return {
            "status": "error",
            "error_code": "INSUFFICIENT_INVENTORY",
            "message": str(e),
            "available_items": e.available_items
        }
    except Exception as e:
        # Log unexpected errors
        logger.error(f"Unexpected error: {e}")
        return {
            "status": "error",
            "error_code": "INTERNAL_ERROR",
            "message": "An unexpected error occurred"
        }

Error Recovery

class ResilientMCPServer(MCPServer):
    def __init__(self):
        super().__init__("resilient-server")
        self.db = None
        self.reconnect_attempts = 0
    
    async def ensure_connection(self):
        """Ensure database connection with retry logic"""
        if self.db and self.db.is_connected():
            return
        
        for attempt in range(3):
            try:
                self.db = await create_db_connection()
                self.reconnect_attempts = 0
                return
            except ConnectionError:
                await asyncio.sleep(2 ** attempt)
        
        raise ServiceUnavailableError("Cannot connect to database")
    
    async def handle_tool_call(self, tool_name, params):
        await self.ensure_connection()
        return await super().handle_tool_call(tool_name, params)

Testing MCP Servers

Unit Testing

import pytest
from unittest.mock import AsyncMock

@pytest.mark.asyncio
async def test_get_sales_data():
    server = RestaurantMCPServer()
    server.db = AsyncMock()
    server.db.query.return_value = [
        {"date": "2024-01-15", "total": 1000}
    ]
    
    result = await server.tools["get_sales_data"](
        date="2024-01-15",
        location="main"
    )
    
    assert result["total_sales"] == 1000
    server.db.query.assert_called_once()

Integration Testing

async def test_mcp_server_integration():
    # Start test server
    server = RestaurantMCPServer()
    test_port = 8765
    await server.start(port=test_port)
    
    # Create client
    client = MCPClient(f"http://localhost:{test_port}")
    
    # Test tool call
    result = await client.call_tool(
        "get_sales_data",
        {"date": "2024-01-15"}
    )
    
    assert result["status"] == "success"
    
    # Cleanup
    await server.stop()

Mock Server for Development

// servers/mock_restaurant_mcp.js
class MockRestaurantServer extends MCPServer {
  constructor() {
    super('mock-restaurant-server');
    this.setupMockTools();
  }
  
  setupMockTools() {
    this.registerTool({
      name: 'get_sales_data',
      handler: async ({ date }) => ({
        date,
        total_sales: Math.random() * 10000,
        transactions: Math.floor(Math.random() * 500)
      })
    });
  }
}

Performance Optimization

Caching

from functools import lru_cache
from cachetools import TTLCache

class CachedMCPServer(MCPServer):
    def __init__(self):
        super().__init__("cached-server")
        self.cache = TTLCache(maxsize=100, ttl=300)
    
    @server.tool("get_analytics")
    async def get_analytics(self, date_range: str) -> Dict:
        # Check cache
        cache_key = f"analytics_{date_range}"
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        # Compute analytics
        result = await self.compute_analytics(date_range)
        
        # Store in cache
        self.cache[cache_key] = result
        return result

Connection Pooling

import asyncpg

class PooledMCPServer(MCPServer):
    def __init__(self):
        super().__init__("pooled-server")
        self.db_pool = None
    
    async def initialize(self):
        self.db_pool = await asyncpg.create_pool(
            database="restaurant_db",
            user="mcp_user",
            password=os.environ["DB_PASSWORD"],
            host="localhost",
            port=5432,
            min_size=5,
            max_size=20
        )
    
    async def query(self, sql, *args):
        async with self.db_pool.acquire() as conn:
            return await conn.fetch(sql, *args)

Deployment

Docker Configuration

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY servers/ ./servers/
COPY .mcp.json .

ENV PYTHONUNBUFFERED=1

CMD ["python", "servers/restaurant_mcp.py"]

Systemd Service

# /etc/systemd/system/restaurant-mcp.service
[Unit]
Description=Restaurant MCP Server
After=network.target

[Service]
Type=simple
User=mcp
WorkingDirectory=/opt/restaurant-mcp
Environment="DATABASE_URL=postgresql://localhost/restaurant"
ExecStart=/usr/bin/python3 /opt/restaurant-mcp/servers/restaurant_mcp.py
Restart=always

[Install]
WantedBy=multi-user.target

Health Checks

@server.tool("health_check")
async def health_check() -> Dict:
    """MCP server health check endpoint"""
    checks = {
        "server": "ok",
        "database": "unknown",
        "cache": "unknown"
    }
    
    # Check database
    try:
        await db.execute("SELECT 1")
        checks["database"] = "ok"
    except:
        checks["database"] = "error"
    
    # Check cache
    try:
        cache.get("test")
        checks["cache"] = "ok"
    except:
        checks["cache"] = "error"
    
    overall_status = "healthy" if all(
        v == "ok" for v in checks.values()
    ) else "unhealthy"
    
    return {
        "status": overall_status,
        "checks": checks,
        "timestamp": datetime.now().isoformat()
    }