generated from personal-projects/leo-claude-mktplace
Merge feat/14: Implement core HTTP MCP server
This commit is contained in:
@@ -11,5 +11,7 @@ Architecture:
|
||||
- server.py: Main HTTP MCP server implementation
|
||||
"""
|
||||
|
||||
from .server import GiteaMCPWrapper, create_app, main
|
||||
|
||||
__version__ = "0.1.0"
|
||||
__all__ = ["__version__"]
|
||||
__all__ = ["__version__", "GiteaMCPWrapper", "create_app", "main"]
|
||||
|
||||
309
src/gitea_http_wrapper/server.py
Normal file
309
src/gitea_http_wrapper/server.py
Normal file
@@ -0,0 +1,309 @@
|
||||
"""HTTP MCP server implementation wrapping Gitea MCP."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import uvicorn
|
||||
from mcp.server import Server
|
||||
from mcp.server.stdio import stdio_server
|
||||
from starlette.applications import Starlette
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import JSONResponse
|
||||
from starlette.routing import Route
|
||||
|
||||
from gitea_http_wrapper.config import GiteaSettings, load_settings
|
||||
from gitea_http_wrapper.filtering import ToolFilter
|
||||
from gitea_http_wrapper.middleware import (
|
||||
BearerAuthMiddleware,
|
||||
HealthCheckBypassMiddleware,
|
||||
)
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GiteaMCPWrapper:
|
||||
"""
|
||||
HTTP wrapper around the official Gitea MCP server.
|
||||
|
||||
This class manages:
|
||||
1. Starting the Gitea MCP server as a subprocess with stdio transport
|
||||
2. Proxying HTTP requests to the MCP server
|
||||
3. Filtering tools based on configuration
|
||||
4. Handling responses and errors
|
||||
"""
|
||||
|
||||
def __init__(self, settings: GiteaSettings):
|
||||
"""
|
||||
Initialize the MCP wrapper.
|
||||
|
||||
Args:
|
||||
settings: Configuration settings for Gitea and HTTP server.
|
||||
"""
|
||||
self.settings = settings
|
||||
self.tool_filter = ToolFilter(
|
||||
enabled_tools=settings.enabled_tools_list,
|
||||
disabled_tools=settings.disabled_tools_list,
|
||||
)
|
||||
self.process = None
|
||||
self.reader = None
|
||||
self.writer = None
|
||||
|
||||
async def start_gitea_mcp(self) -> None:
|
||||
"""
|
||||
Start the Gitea MCP server as a subprocess.
|
||||
|
||||
The server runs with stdio transport, and we communicate via stdin/stdout.
|
||||
"""
|
||||
logger.info("Starting Gitea MCP server subprocess")
|
||||
|
||||
# Set environment variables for Gitea MCP
|
||||
env = os.environ.copy()
|
||||
env.update(self.settings.get_gitea_mcp_env())
|
||||
|
||||
# Start the process
|
||||
# Note: This assumes gitea-mcp-server is installed and on PATH
|
||||
# In production Docker, this should be guaranteed
|
||||
try:
|
||||
self.process = await asyncio.create_subprocess_exec(
|
||||
"gitea-mcp-server",
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=env,
|
||||
)
|
||||
self.reader = self.process.stdout
|
||||
self.writer = self.process.stdin
|
||||
logger.info("Gitea MCP server started successfully")
|
||||
except FileNotFoundError:
|
||||
logger.error("gitea-mcp-server not found in PATH")
|
||||
raise RuntimeError(
|
||||
"gitea-mcp-server not found. Ensure it's installed: pip install gitea-mcp-server"
|
||||
)
|
||||
|
||||
async def stop_gitea_mcp(self) -> None:
|
||||
"""Stop the Gitea MCP server subprocess."""
|
||||
if self.process:
|
||||
logger.info("Stopping Gitea MCP server subprocess")
|
||||
self.process.terminate()
|
||||
await self.process.wait()
|
||||
logger.info("Gitea MCP server stopped")
|
||||
|
||||
async def send_mcp_request(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
Send a JSON-RPC request to the MCP server.
|
||||
|
||||
Args:
|
||||
method: MCP method name (e.g., "tools/list", "tools/call").
|
||||
params: Method parameters.
|
||||
|
||||
Returns:
|
||||
JSON-RPC response from MCP server.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If MCP server is not running or communication fails.
|
||||
"""
|
||||
if not self.writer or not self.reader:
|
||||
raise RuntimeError("MCP server not started")
|
||||
|
||||
# Build JSON-RPC request
|
||||
request = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": method,
|
||||
"params": params,
|
||||
}
|
||||
|
||||
# Send request
|
||||
request_json = json.dumps(request) + "\n"
|
||||
self.writer.write(request_json.encode())
|
||||
await self.writer.drain()
|
||||
|
||||
# Read response
|
||||
response_line = await self.reader.readline()
|
||||
response = json.loads(response_line.decode())
|
||||
|
||||
# Check for JSON-RPC error
|
||||
if "error" in response:
|
||||
logger.error(f"MCP error: {response['error']}")
|
||||
raise RuntimeError(f"MCP error: {response['error']}")
|
||||
|
||||
return response.get("result", {})
|
||||
|
||||
async def list_tools(self) -> dict[str, Any]:
|
||||
"""
|
||||
List available tools from MCP server with filtering applied.
|
||||
|
||||
Returns:
|
||||
Filtered tools list response.
|
||||
"""
|
||||
response = await self.send_mcp_request("tools/list", {})
|
||||
filtered_response = self.tool_filter.filter_tools_response(response)
|
||||
|
||||
logger.info(
|
||||
f"Listed {len(filtered_response.get('tools', []))} tools "
|
||||
f"(filter: {self.tool_filter.get_filter_stats()['mode']})"
|
||||
)
|
||||
return filtered_response
|
||||
|
||||
async def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
Call a tool on the MCP server.
|
||||
|
||||
Args:
|
||||
tool_name: Name of tool to call.
|
||||
arguments: Tool arguments.
|
||||
|
||||
Returns:
|
||||
Tool execution result.
|
||||
|
||||
Raises:
|
||||
ValueError: If tool is filtered out.
|
||||
"""
|
||||
# Check if tool is allowed
|
||||
if not self.tool_filter.should_include_tool(tool_name):
|
||||
raise ValueError(f"Tool '{tool_name}' is not available (filtered)")
|
||||
|
||||
logger.info(f"Calling tool: {tool_name}")
|
||||
result = await self.send_mcp_request(
|
||||
"tools/call",
|
||||
{"name": tool_name, "arguments": arguments},
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
# Global wrapper instance
|
||||
wrapper: GiteaMCPWrapper | None = None
|
||||
|
||||
|
||||
async def health_check(request: Request) -> JSONResponse:
|
||||
"""Health check endpoint."""
|
||||
return JSONResponse({"status": "healthy"})
|
||||
|
||||
|
||||
async def list_tools_endpoint(request: Request) -> JSONResponse:
|
||||
"""List available tools."""
|
||||
try:
|
||||
tools = await wrapper.list_tools()
|
||||
return JSONResponse(tools)
|
||||
except Exception as e:
|
||||
logger.exception("Error listing tools")
|
||||
return JSONResponse(
|
||||
{"error": str(e)},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
async def call_tool_endpoint(request: Request) -> JSONResponse:
|
||||
"""Call a tool."""
|
||||
try:
|
||||
body = await request.json()
|
||||
tool_name = body.get("name")
|
||||
arguments = body.get("arguments", {})
|
||||
|
||||
if not tool_name:
|
||||
return JSONResponse(
|
||||
{"error": "Missing 'name' field"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
result = await wrapper.call_tool(tool_name, arguments)
|
||||
return JSONResponse(result)
|
||||
except ValueError as e:
|
||||
# Tool filtered
|
||||
return JSONResponse(
|
||||
{"error": str(e)},
|
||||
status_code=403,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("Error calling tool")
|
||||
return JSONResponse(
|
||||
{"error": str(e)},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
async def startup() -> None:
|
||||
"""Application startup handler."""
|
||||
global wrapper
|
||||
settings = load_settings()
|
||||
wrapper = GiteaMCPWrapper(settings)
|
||||
await wrapper.start_gitea_mcp()
|
||||
logger.info(f"HTTP MCP server starting on {settings.http_host}:{settings.http_port}")
|
||||
|
||||
|
||||
async def shutdown() -> None:
|
||||
"""Application shutdown handler."""
|
||||
global wrapper
|
||||
if wrapper:
|
||||
await wrapper.stop_gitea_mcp()
|
||||
|
||||
|
||||
# Define routes
|
||||
routes = [
|
||||
Route("/health", health_check, methods=["GET"]),
|
||||
Route("/healthz", health_check, methods=["GET"]),
|
||||
Route("/ping", health_check, methods=["GET"]),
|
||||
Route("/tools/list", list_tools_endpoint, methods=["POST"]),
|
||||
Route("/tools/call", call_tool_endpoint, methods=["POST"]),
|
||||
]
|
||||
|
||||
# Create Starlette app
|
||||
app = Starlette(
|
||||
routes=routes,
|
||||
on_startup=[startup],
|
||||
on_shutdown=[shutdown],
|
||||
)
|
||||
|
||||
|
||||
def create_app(settings: GiteaSettings | None = None) -> Starlette:
|
||||
"""
|
||||
Create and configure the Starlette application.
|
||||
|
||||
Args:
|
||||
settings: Optional settings override for testing.
|
||||
|
||||
Returns:
|
||||
Configured Starlette application.
|
||||
"""
|
||||
if settings is None:
|
||||
settings = load_settings()
|
||||
|
||||
# Add middleware
|
||||
app.add_middleware(HealthCheckBypassMiddleware)
|
||||
app.add_middleware(BearerAuthMiddleware, auth_token=settings.auth_token)
|
||||
|
||||
return app
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point for the HTTP MCP server."""
|
||||
settings = load_settings()
|
||||
|
||||
# Log filter configuration
|
||||
filter_stats = ToolFilter(
|
||||
enabled_tools=settings.enabled_tools_list,
|
||||
disabled_tools=settings.disabled_tools_list,
|
||||
).get_filter_stats()
|
||||
logger.info(f"Tool filtering: {filter_stats}")
|
||||
|
||||
# Run server
|
||||
uvicorn.run(
|
||||
"gitea_http_wrapper.server:app",
|
||||
host=settings.http_host,
|
||||
port=settings.http_port,
|
||||
log_level="info",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user