generated from personal-projects/leo-claude-mktplace
feat: implement MCP Streamable HTTP protocol endpoints
- Add POST /mcp endpoint using StreamableHTTPServerTransport - Add HEAD /mcp endpoint returning protocol version header - Remove old custom REST endpoints (/tools/list, /tools/call) - Integrate marketplace tools via MCP Server decorators - Add comprehensive MCP endpoint tests - Update README with correct MCP protocol usage The implementation properly handles: - JSON-RPC 2.0 message format - SSE (Server-Sent Events) responses - Protocol version negotiation - Tool filtering integration - Authentication middleware Tests verify: - HEAD /mcp returns correct headers - POST /mcp handles initialize requests - Proper error handling for missing headers Closes #24 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -6,15 +6,23 @@ package and serves them over HTTP with authentication.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import asyncio
|
||||
from typing import Any
|
||||
import uvicorn
|
||||
from starlette.applications import Starlette
|
||||
from starlette.responses import JSONResponse
|
||||
from starlette.routing import Route
|
||||
from starlette.responses import JSONResponse, Response
|
||||
from starlette.routing import Route, Mount
|
||||
from mcp.server import Server
|
||||
from mcp.server.streamable_http import StreamableHTTPServerTransport
|
||||
from mcp.types import Tool, TextContent
|
||||
|
||||
from gitea_mcp_remote.config import load_settings
|
||||
from gitea_mcp_remote.middleware import BearerAuthMiddleware, HealthCheckBypassMiddleware
|
||||
from gitea_mcp_remote.filtering import ToolFilter
|
||||
|
||||
# Import marketplace package
|
||||
from mcp_server import get_tool_definitions, create_tool_dispatcher, GiteaClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -23,6 +31,30 @@ async def health_check(request):
|
||||
return JSONResponse({"status": "ok"})
|
||||
|
||||
|
||||
def create_mcp_server(tool_names: list[str] | None = None) -> Server:
|
||||
"""Create and configure the MCP server with tools from the marketplace."""
|
||||
mcp_server = Server("gitea-mcp-remote")
|
||||
|
||||
# Get tool definitions from marketplace
|
||||
tools = get_tool_definitions(tool_filter=tool_names)
|
||||
|
||||
# Create Gitea client and dispatcher
|
||||
gitea_client = GiteaClient()
|
||||
dispatcher = create_tool_dispatcher(gitea_client, tool_filter=tool_names)
|
||||
|
||||
@mcp_server.list_tools()
|
||||
async def list_tools() -> list[Tool]:
|
||||
"""Return available Gitea tools."""
|
||||
return tools
|
||||
|
||||
@mcp_server.call_tool()
|
||||
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
|
||||
"""Execute a tool with the given arguments."""
|
||||
return await dispatcher(name, arguments)
|
||||
|
||||
return mcp_server
|
||||
|
||||
|
||||
def create_app():
|
||||
"""Create the Starlette application with middleware."""
|
||||
settings = load_settings()
|
||||
@@ -38,12 +70,67 @@ def create_app():
|
||||
if tool_filter.enabled_tools:
|
||||
tool_names = list(tool_filter.enabled_tools)
|
||||
|
||||
# TODO: Issue #24 will add MCP protocol endpoints
|
||||
# from mcp_server import get_tool_definitions, create_tool_dispatcher, GiteaClient, GiteaConfig
|
||||
# Create MCP server with filtered tools
|
||||
mcp_server = create_mcp_server(tool_names)
|
||||
|
||||
# Store server for endpoint access
|
||||
app_state = {"mcp_server": mcp_server}
|
||||
|
||||
class MCPEndpoint:
|
||||
"""ASGI app wrapper for MCP protocol endpoint."""
|
||||
|
||||
async def __call__(self, scope, receive, send):
|
||||
"""Handle MCP requests - both POST and HEAD."""
|
||||
method = scope.get("method", "")
|
||||
|
||||
if method == "HEAD":
|
||||
# Return protocol version header
|
||||
await send({
|
||||
"type": "http.response.start",
|
||||
"status": 200,
|
||||
"headers": [
|
||||
[b"x-mcp-protocol-version", b"2024-11-05"],
|
||||
],
|
||||
})
|
||||
await send({
|
||||
"type": "http.response.body",
|
||||
"body": b"",
|
||||
})
|
||||
return
|
||||
|
||||
# For POST requests, use the StreamableHTTPServerTransport
|
||||
# Create transport for this request
|
||||
transport = StreamableHTTPServerTransport(mcp_session_id=None)
|
||||
|
||||
# Run the MCP server with this transport
|
||||
async with transport.connect() as (read_stream, write_stream):
|
||||
# Start server task
|
||||
server_task = asyncio.create_task(
|
||||
app_state["mcp_server"].run(
|
||||
read_stream,
|
||||
write_stream,
|
||||
app_state["mcp_server"].create_initialization_options()
|
||||
)
|
||||
)
|
||||
|
||||
# Handle the HTTP request
|
||||
try:
|
||||
await transport.handle_request(scope, receive, send)
|
||||
finally:
|
||||
# Cancel server task if still running
|
||||
if not server_task.done():
|
||||
server_task.cancel()
|
||||
try:
|
||||
await server_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# Create MCP endpoint instance
|
||||
mcp_endpoint = MCPEndpoint()
|
||||
|
||||
routes = [
|
||||
Route("/health", health_check, methods=["GET"]),
|
||||
# MCP endpoints will be added in Issue #24
|
||||
Route("/mcp", mcp_endpoint, methods=["GET", "POST", "HEAD"]),
|
||||
]
|
||||
|
||||
app = Starlette(routes=routes)
|
||||
|
||||
Reference in New Issue
Block a user