Files
leo-claude-mktplace/mcp-servers/gitea/mcp_server/gitea_client.py
lmiranda e41c067d93 feat: implement Gitea MCP Server with full test coverage
Phase 1 implementation complete:
- Complete MCP server with 8 tools (list_issues, get_issue, create_issue, update_issue, add_comment, get_labels, suggest_labels, aggregate_issues)
- Hybrid configuration system (system-level + project-level)
- Branch-aware security model (main/staging/development)
- Mode detection (project vs company/PMO)
- Intelligent label suggestion (44-label taxonomy)
- 42 unit tests (100% passing)
- Comprehensive documentation (README.md, TESTING.md)

Files implemented:
- mcp_server/config.py - Configuration loader
- mcp_server/gitea_client.py - Gitea API client
- mcp_server/server.py - MCP server entry point
- mcp_server/tools/issues.py - Issue operations
- mcp_server/tools/labels.py - Label management
- tests/ - Complete test suite (42 tests)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-06 16:23:52 -05:00

329 lines
9.5 KiB
Python

"""
Gitea API client for interacting with Gitea API.
Provides synchronous methods for:
- Issue CRUD operations
- Label management
- Repository operations
- PMO multi-repo aggregation
"""
import requests
import logging
from typing import List, Dict, Optional
from .config import GiteaConfig
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GiteaClient:
"""Client for interacting with Gitea API"""
def __init__(self):
"""Initialize Gitea client with configuration"""
config = GiteaConfig()
config_dict = config.load()
self.base_url = config_dict['api_url']
self.token = config_dict['api_token']
self.owner = config_dict['owner']
self.repo = config_dict.get('repo') # Optional for PMO
self.mode = config_dict['mode']
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'token {self.token}',
'Content-Type': 'application/json'
})
logger.info(f"Gitea client initialized for {self.owner} in {self.mode} mode")
def list_issues(
self,
state: str = 'open',
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> List[Dict]:
"""
List issues from Gitea repository.
Args:
state: Issue state (open, closed, all)
labels: Filter by labels
repo: Override configured repo (for PMO multi-repo)
Returns:
List of issue dictionaries
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues"
params = {'state': state}
if labels:
params['labels'] = ','.join(labels)
logger.info(f"Listing issues from {self.owner}/{target_repo} with state={state}")
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()
def get_issue(
self,
issue_number: int,
repo: Optional[str] = None
) -> Dict:
"""
Get specific issue details.
Args:
issue_number: Issue number
repo: Override configured repo (for PMO multi-repo)
Returns:
Issue dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues/{issue_number}"
logger.info(f"Getting issue #{issue_number} from {self.owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def create_issue(
self,
title: str,
body: str,
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> Dict:
"""
Create a new issue in Gitea.
Args:
title: Issue title
body: Issue description
labels: List of label names
repo: Override configured repo (for PMO multi-repo)
Returns:
Created issue dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues"
data = {
'title': title,
'body': body
}
if labels:
data['labels'] = labels
logger.info(f"Creating issue in {self.owner}/{target_repo}: {title}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def update_issue(
self,
issue_number: int,
title: Optional[str] = None,
body: Optional[str] = None,
state: Optional[str] = None,
labels: Optional[List[str]] = None,
repo: Optional[str] = None
) -> Dict:
"""
Update existing issue.
Args:
issue_number: Issue number
title: New title (optional)
body: New body (optional)
state: New state - 'open' or 'closed' (optional)
labels: New labels (optional)
repo: Override configured repo (for PMO multi-repo)
Returns:
Updated issue dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues/{issue_number}"
data = {}
if title is not None:
data['title'] = title
if body is not None:
data['body'] = body
if state is not None:
data['state'] = state
if labels is not None:
data['labels'] = labels
logger.info(f"Updating issue #{issue_number} in {self.owner}/{target_repo}")
response = self.session.patch(url, json=data)
response.raise_for_status()
return response.json()
def add_comment(
self,
issue_number: int,
comment: str,
repo: Optional[str] = None
) -> Dict:
"""
Add comment to issue.
Args:
issue_number: Issue number
comment: Comment text
repo: Override configured repo (for PMO multi-repo)
Returns:
Created comment dictionary
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/issues/{issue_number}/comments"
data = {'body': comment}
logger.info(f"Adding comment to issue #{issue_number} in {self.owner}/{target_repo}")
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def get_labels(
self,
repo: Optional[str] = None
) -> List[Dict]:
"""
Get all labels from repository.
Args:
repo: Override configured repo (for PMO multi-repo)
Returns:
List of label dictionaries
Raises:
ValueError: If repository not specified
requests.HTTPError: If API request fails
"""
target_repo = repo or self.repo
if not target_repo:
raise ValueError("Repository not specified")
url = f"{self.base_url}/repos/{self.owner}/{target_repo}/labels"
logger.info(f"Getting labels from {self.owner}/{target_repo}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def get_org_labels(self) -> List[Dict]:
"""
Get organization-level labels.
Returns:
List of organization label dictionaries
Raises:
requests.HTTPError: If API request fails
"""
url = f"{self.base_url}/orgs/{self.owner}/labels"
logger.info(f"Getting organization labels for {self.owner}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
# PMO-specific methods
def list_repos(self) -> List[Dict]:
"""
List all repositories in organization (PMO mode).
Returns:
List of repository dictionaries
Raises:
requests.HTTPError: If API request fails
"""
url = f"{self.base_url}/orgs/{self.owner}/repos"
logger.info(f"Listing all repositories for organization {self.owner}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
def aggregate_issues(
self,
state: str = 'open',
labels: Optional[List[str]] = None
) -> Dict[str, List[Dict]]:
"""
Fetch issues across all repositories (PMO mode).
Returns dict keyed by repository name.
Args:
state: Issue state (open, closed, all)
labels: Filter by labels
Returns:
Dictionary mapping repository names to issue lists
Raises:
requests.HTTPError: If API request fails
"""
repos = self.list_repos()
aggregated = {}
logger.info(f"Aggregating issues across {len(repos)} repositories")
for repo in repos:
repo_name = repo['name']
try:
issues = self.list_issues(
state=state,
labels=labels,
repo=repo_name
)
if issues:
aggregated[repo_name] = issues
logger.info(f"Found {len(issues)} issues in {repo_name}")
except Exception as e:
# Log error but continue with other repos
logger.error(f"Error fetching issues from {repo_name}: {e}")
return aggregated