Files
gitea-mcp-remote/src/gitea_mcp_remote/middleware/auth.py
lmiranda 5075139841 refactor: Rename package to gitea_mcp_remote and update configuration
Issue #19 - Foundation for Sprint 01: Core Architecture Correction

Changes:
- Renamed package directory: gitea_http_wrapper -> gitea_mcp_remote
- Updated config/settings.py:
  - Made gitea_repo optional (allow None)
  - Added mcp_auth_mode field (default: "optional")
  - Changed HTTP defaults: 0.0.0.0:8080 (was 127.0.0.1:8000)
  - Removed get_gitea_mcp_env() method (no longer needed)
- Updated all import paths throughout codebase
- Updated filtering/filter.py: Changed ValueError to warning when both
  enabled_tools and disabled_tools are specified
- Updated test files with new import paths
- Updated test_filtering.py to test warning instead of ValueError
- Updated pyproject.toml, pytest.ini, and README.md references

All modules preserved - only import paths and configuration updated.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 17:59:57 -05:00

145 lines
4.9 KiB
Python

"""HTTP authentication middleware for MCP server."""
import logging
from typing import Awaitable, Callable
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
logger = logging.getLogger(__name__)
class BearerAuthMiddleware(BaseHTTPMiddleware):
"""
Middleware to enforce Bearer token authentication on HTTP requests.
This middleware validates the Authorization header for all requests.
If a token is configured, requests must include "Authorization: Bearer <token>".
If no token is configured, all requests are allowed (open access).
"""
def __init__(self, app, auth_token: str | None = None):
"""
Initialize authentication middleware.
Args:
app: ASGI application to wrap.
auth_token: Optional Bearer token for authentication.
If None, authentication is disabled.
"""
super().__init__(app)
self.auth_token = auth_token
self.auth_enabled = auth_token is not None
if self.auth_enabled:
logger.info("Bearer authentication enabled")
else:
logger.warning("Bearer authentication disabled - server is open access")
async def dispatch(
self, request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response:
"""
Process request and enforce authentication if enabled.
Args:
request: Incoming HTTP request.
call_next: Next middleware or route handler.
Returns:
Response from downstream handler or 401/403 error.
"""
# Skip authentication if disabled
if not self.auth_enabled:
return await call_next(request)
# Skip authentication if marked by HealthCheckBypassMiddleware
if getattr(request.state, "skip_auth", False):
return await call_next(request)
# Extract Authorization header
auth_header = request.headers.get("Authorization")
# Check if header is present
if not auth_header:
logger.warning(f"Missing Authorization header from {request.client.host}")
return JSONResponse(
status_code=401,
content={
"error": "Unauthorized",
"message": "Missing Authorization header",
},
)
# Check if header format is correct
if not auth_header.startswith("Bearer "):
logger.warning(f"Invalid Authorization format from {request.client.host}")
return JSONResponse(
status_code=401,
content={
"error": "Unauthorized",
"message": "Authorization header must use Bearer scheme",
},
)
# Extract token
provided_token = auth_header[7:] # Remove "Bearer " prefix
# Validate token
if provided_token != self.auth_token:
logger.warning(f"Invalid token from {request.client.host}")
return JSONResponse(
status_code=403,
content={
"error": "Forbidden",
"message": "Invalid authentication token",
},
)
# Token is valid, proceed to next handler
logger.debug(f"Authenticated request from {request.client.host}")
return await call_next(request)
class HealthCheckBypassMiddleware(BaseHTTPMiddleware):
"""
Middleware to bypass authentication for health check endpoints.
This allows monitoring systems to check server health without authentication.
"""
def __init__(self, app, health_check_paths: list[str] | None = None):
"""
Initialize health check bypass middleware.
Args:
app: ASGI application to wrap.
health_check_paths: List of paths to bypass authentication.
Defaults to ["/health", "/healthz", "/ping"].
"""
super().__init__(app)
self.health_check_paths = health_check_paths or ["/health", "/healthz", "/ping"]
async def dispatch(
self, request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response:
"""
Process request and bypass authentication for health checks.
Args:
request: Incoming HTTP request.
call_next: Next middleware or route handler.
Returns:
Response from downstream handler.
"""
# Check if request is for a health check endpoint
if request.url.path in self.health_check_paths:
logger.debug(f"Bypassing auth for health check: {request.url.path}")
# Mark request to skip authentication in BearerAuthMiddleware
request.state.skip_auth = True
# Continue to next middleware
return await call_next(request)